events

Paddy 2015-07-15 Parent:987595765978 Child:df95f058a7b5

3:ce1212549d47 Go to Latest

events/nsq.go

Create Publisher interface and stdout publisher. Create a Publisher interface that allows us to use different implementations and switch between them easily. The NSQ publisher already implements the interface, so it was really just copying that. Create an implmentation of the Publisher interface that just logs the data to stdout.

History
paddy@2 1 package events
paddy@2 2
paddy@2 3 import (
paddy@2 4 "encoding/json"
paddy@2 5 "errors"
paddy@2 6 "fmt"
paddy@2 7
paddy@2 8 "github.com/bitly/go-nsq"
paddy@2 9 )
paddy@2 10
paddy@2 11 var (
paddy@2 12 ErrNilPublisher = errors.New("event publisher is nil")
paddy@2 13 )
paddy@2 14
paddy@2 15 type NSQPublisher nsq.Producer
paddy@2 16
paddy@2 17 func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) {
paddy@2 18 cfg := nsq.NewConfig()
paddy@2 19 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
paddy@2 20 err := cfg.Validate()
paddy@2 21 if err != nil {
paddy@2 22 return nil, err
paddy@2 23 }
paddy@2 24 producer, err := nsq.NewProducer(address, cfg)
paddy@2 25 if err != nil {
paddy@2 26 return nil, err
paddy@2 27 }
paddy@2 28 return (*NSQPublisher)(producer), nil
paddy@2 29 }
paddy@2 30
paddy@2 31 func (n *NSQPublisher) Publish(topic string, e Event) error {
paddy@2 32 if n == nil {
paddy@2 33 return ErrNilPublisher
paddy@2 34 }
paddy@2 35 data, err := json.Marshal(e)
paddy@2 36 if err != nil {
paddy@2 37 return err
paddy@2 38 }
paddy@2 39 return (*nsq.Producer)(n).Publish(topic, data)
paddy@2 40 }
paddy@2 41
paddy@2 42 type NSQSubscriber nsq.Consumer
paddy@2 43
paddy@2 44 func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) {
paddy@2 45 cfg := nsq.NewConfig()
paddy@2 46 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
paddy@2 47 err := cfg.Validate()
paddy@2 48 if err != nil {
paddy@2 49 return nil, err
paddy@2 50 }
paddy@2 51 consumer, err := nsq.NewConsumer(topic, channel, cfg)
paddy@2 52 if err != nil {
paddy@2 53 return nil, err
paddy@2 54 }
paddy@2 55 consumer.AddHandler(handler)
paddy@2 56 err = consumer.ConnectToNSQLookupds(lookupds)
paddy@2 57 if err != nil {
paddy@2 58 return nil, err
paddy@2 59 }
paddy@2 60 return (*NSQSubscriber)(consumer), nil
paddy@2 61 }
paddy@2 62
paddy@2 63 func (sub *NSQSubscriber) Block() {
paddy@2 64 <-(*nsq.Consumer)(sub).StopChan
paddy@2 65 }