events
events/nsq.go
Create Publisher interface and stdout publisher. Create a Publisher interface that allows us to use different implementations and switch between them easily. The NSQ publisher already implements the interface, so it was really just copying that. Create an implmentation of the Publisher interface that just logs the data to stdout.
| paddy@2 | 1 package events |
| paddy@2 | 2 |
| paddy@2 | 3 import ( |
| paddy@2 | 4 "encoding/json" |
| paddy@2 | 5 "errors" |
| paddy@2 | 6 "fmt" |
| paddy@2 | 7 |
| paddy@2 | 8 "github.com/bitly/go-nsq" |
| paddy@2 | 9 ) |
| paddy@2 | 10 |
| paddy@2 | 11 var ( |
| paddy@2 | 12 ErrNilPublisher = errors.New("event publisher is nil") |
| paddy@2 | 13 ) |
| paddy@2 | 14 |
| paddy@2 | 15 type NSQPublisher nsq.Producer |
| paddy@2 | 16 |
| paddy@2 | 17 func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) { |
| paddy@2 | 18 cfg := nsq.NewConfig() |
| paddy@2 | 19 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION) |
| paddy@2 | 20 err := cfg.Validate() |
| paddy@2 | 21 if err != nil { |
| paddy@2 | 22 return nil, err |
| paddy@2 | 23 } |
| paddy@2 | 24 producer, err := nsq.NewProducer(address, cfg) |
| paddy@2 | 25 if err != nil { |
| paddy@2 | 26 return nil, err |
| paddy@2 | 27 } |
| paddy@2 | 28 return (*NSQPublisher)(producer), nil |
| paddy@2 | 29 } |
| paddy@2 | 30 |
| paddy@2 | 31 func (n *NSQPublisher) Publish(topic string, e Event) error { |
| paddy@2 | 32 if n == nil { |
| paddy@2 | 33 return ErrNilPublisher |
| paddy@2 | 34 } |
| paddy@2 | 35 data, err := json.Marshal(e) |
| paddy@2 | 36 if err != nil { |
| paddy@2 | 37 return err |
| paddy@2 | 38 } |
| paddy@2 | 39 return (*nsq.Producer)(n).Publish(topic, data) |
| paddy@2 | 40 } |
| paddy@2 | 41 |
| paddy@2 | 42 type NSQSubscriber nsq.Consumer |
| paddy@2 | 43 |
| paddy@2 | 44 func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) { |
| paddy@2 | 45 cfg := nsq.NewConfig() |
| paddy@2 | 46 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION) |
| paddy@2 | 47 err := cfg.Validate() |
| paddy@2 | 48 if err != nil { |
| paddy@2 | 49 return nil, err |
| paddy@2 | 50 } |
| paddy@2 | 51 consumer, err := nsq.NewConsumer(topic, channel, cfg) |
| paddy@2 | 52 if err != nil { |
| paddy@2 | 53 return nil, err |
| paddy@2 | 54 } |
| paddy@2 | 55 consumer.AddHandler(handler) |
| paddy@2 | 56 err = consumer.ConnectToNSQLookupds(lookupds) |
| paddy@2 | 57 if err != nil { |
| paddy@2 | 58 return nil, err |
| paddy@2 | 59 } |
| paddy@2 | 60 return (*NSQSubscriber)(consumer), nil |
| paddy@2 | 61 } |
| paddy@2 | 62 |
| paddy@2 | 63 func (sub *NSQSubscriber) Block() { |
| paddy@2 | 64 <-(*nsq.Consumer)(sub).StopChan |
| paddy@2 | 65 } |