package events

import (
	"encoding/json"
	"errors"
	"fmt"

	"github.com/bitly/go-nsq"
)

var (
	ErrNilPublisher = errors.New("event publisher is nil")
)

type NSQPublisher nsq.Producer

func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) {
	cfg := nsq.NewConfig()
	cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
	err := cfg.Validate()
	if err != nil {
		return nil, err
	}
	producer, err := nsq.NewProducer(address, cfg)
	if err != nil {
		return nil, err
	}
	return (*NSQPublisher)(producer), nil
}

func (n *NSQPublisher) Publish(topic string, e Event) error {
	if n == nil {
		return ErrNilPublisher
	}
	data, err := json.Marshal(e)
	if err != nil {
		return err
	}
	return (*nsq.Producer)(n).Publish(topic, data)
}

type NSQSubscriber nsq.Consumer

func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) {
	cfg := nsq.NewConfig()
	cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
	err := cfg.Validate()
	if err != nil {
		return nil, err
	}
	consumer, err := nsq.NewConsumer(topic, channel, cfg)
	if err != nil {
		return nil, err
	}
	consumer.AddHandler(handler)
	err = consumer.ConnectToNSQLookupds(lookupds)
	if err != nil {
		return nil, err
	}
	return (*NSQSubscriber)(consumer), nil
}

func (sub *NSQSubscriber) Block() {
	<-(*nsq.Consumer)(sub).StopChan
}
