events

Paddy 2015-07-14 Parent:e86251b04826 Child:ce1212549d47

2:987595765978 Browse Files

Create NSQ helpers. Create an NSQPublisher helper that will allow us to publish events in the system in a consistent way. It basically just uses our dictated Event type, to make sure everything's using the same format. Create an NSQSubscriber helper that will allow us to listen for events in the system. It basically creates an nsq.Consumer, adds a handler to it, connects it to lookupds, and has a helper to block until the consumer is told to stop. Eventually, if github.com/bitly/nsq/issues/601 goes well, we should also have this poll our DNS to check for new nsqlookupds, and automatically connect to them.

nsq.go

     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/nsq.go	Tue Jul 14 00:05:45 2015 -0400
     1.3 @@ -0,0 +1,65 @@
     1.4 +package events
     1.5 +
     1.6 +import (
     1.7 +	"encoding/json"
     1.8 +	"errors"
     1.9 +	"fmt"
    1.10 +
    1.11 +	"github.com/bitly/go-nsq"
    1.12 +)
    1.13 +
    1.14 +var (
    1.15 +	ErrNilPublisher = errors.New("event publisher is nil")
    1.16 +)
    1.17 +
    1.18 +type NSQPublisher nsq.Producer
    1.19 +
    1.20 +func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) {
    1.21 +	cfg := nsq.NewConfig()
    1.22 +	cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
    1.23 +	err := cfg.Validate()
    1.24 +	if err != nil {
    1.25 +		return nil, err
    1.26 +	}
    1.27 +	producer, err := nsq.NewProducer(address, cfg)
    1.28 +	if err != nil {
    1.29 +		return nil, err
    1.30 +	}
    1.31 +	return (*NSQPublisher)(producer), nil
    1.32 +}
    1.33 +
    1.34 +func (n *NSQPublisher) Publish(topic string, e Event) error {
    1.35 +	if n == nil {
    1.36 +		return ErrNilPublisher
    1.37 +	}
    1.38 +	data, err := json.Marshal(e)
    1.39 +	if err != nil {
    1.40 +		return err
    1.41 +	}
    1.42 +	return (*nsq.Producer)(n).Publish(topic, data)
    1.43 +}
    1.44 +
    1.45 +type NSQSubscriber nsq.Consumer
    1.46 +
    1.47 +func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) {
    1.48 +	cfg := nsq.NewConfig()
    1.49 +	cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
    1.50 +	err := cfg.Validate()
    1.51 +	if err != nil {
    1.52 +		return nil, err
    1.53 +	}
    1.54 +	consumer, err := nsq.NewConsumer(topic, channel, cfg)
    1.55 +	if err != nil {
    1.56 +		return nil, err
    1.57 +	}
    1.58 +	consumer.AddHandler(handler)
    1.59 +	err = consumer.ConnectToNSQLookupds(lookupds)
    1.60 +	if err != nil {
    1.61 +		return nil, err
    1.62 +	}
    1.63 +	return (*NSQSubscriber)(consumer), nil
    1.64 +}
    1.65 +
    1.66 +func (sub *NSQSubscriber) Block() {
    1.67 +	<-(*nsq.Consumer)(sub).StopChan
    1.68 +}