events

Paddy 2015-07-14 Child:df95f058a7b5

2:987595765978 Go to Latest

events/nsq.go

Create NSQ helpers. Create an NSQPublisher helper that will allow us to publish events in the system in a consistent way. It basically just uses our dictated Event type, to make sure everything's using the same format. Create an NSQSubscriber helper that will allow us to listen for events in the system. It basically creates an nsq.Consumer, adds a handler to it, connects it to lookupds, and has a helper to block until the consumer is told to stop. Eventually, if github.com/bitly/nsq/issues/601 goes well, we should also have this poll our DNS to check for new nsqlookupds, and automatically connect to them.

History
paddy@2 1 package events
paddy@2 2
paddy@2 3 import (
paddy@2 4 "encoding/json"
paddy@2 5 "errors"
paddy@2 6 "fmt"
paddy@2 7
paddy@2 8 "github.com/bitly/go-nsq"
paddy@2 9 )
paddy@2 10
paddy@2 11 var (
paddy@2 12 ErrNilPublisher = errors.New("event publisher is nil")
paddy@2 13 )
paddy@2 14
paddy@2 15 type NSQPublisher nsq.Producer
paddy@2 16
paddy@2 17 func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) {
paddy@2 18 cfg := nsq.NewConfig()
paddy@2 19 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
paddy@2 20 err := cfg.Validate()
paddy@2 21 if err != nil {
paddy@2 22 return nil, err
paddy@2 23 }
paddy@2 24 producer, err := nsq.NewProducer(address, cfg)
paddy@2 25 if err != nil {
paddy@2 26 return nil, err
paddy@2 27 }
paddy@2 28 return (*NSQPublisher)(producer), nil
paddy@2 29 }
paddy@2 30
paddy@2 31 func (n *NSQPublisher) Publish(topic string, e Event) error {
paddy@2 32 if n == nil {
paddy@2 33 return ErrNilPublisher
paddy@2 34 }
paddy@2 35 data, err := json.Marshal(e)
paddy@2 36 if err != nil {
paddy@2 37 return err
paddy@2 38 }
paddy@2 39 return (*nsq.Producer)(n).Publish(topic, data)
paddy@2 40 }
paddy@2 41
paddy@2 42 type NSQSubscriber nsq.Consumer
paddy@2 43
paddy@2 44 func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) {
paddy@2 45 cfg := nsq.NewConfig()
paddy@2 46 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
paddy@2 47 err := cfg.Validate()
paddy@2 48 if err != nil {
paddy@2 49 return nil, err
paddy@2 50 }
paddy@2 51 consumer, err := nsq.NewConsumer(topic, channel, cfg)
paddy@2 52 if err != nil {
paddy@2 53 return nil, err
paddy@2 54 }
paddy@2 55 consumer.AddHandler(handler)
paddy@2 56 err = consumer.ConnectToNSQLookupds(lookupds)
paddy@2 57 if err != nil {
paddy@2 58 return nil, err
paddy@2 59 }
paddy@2 60 return (*NSQSubscriber)(consumer), nil
paddy@2 61 }
paddy@2 62
paddy@2 63 func (sub *NSQSubscriber) Block() {
paddy@2 64 <-(*nsq.Consumer)(sub).StopChan
paddy@2 65 }