events

Paddy 2015-12-14 Parent:987595765978

5:df95f058a7b5 Go to Latest

events/nsq.go

Update to new nsqio repo. The go-nsq client is now located under a nsqio org on Github, so we need to update the import path.

History
paddy@2 1 package events
paddy@2 2
paddy@2 3 import (
paddy@2 4 "encoding/json"
paddy@2 5 "errors"
paddy@2 6 "fmt"
paddy@2 7
paddy@5 8 "github.com/nsqio/go-nsq"
paddy@2 9 )
paddy@2 10
paddy@2 11 var (
paddy@2 12 ErrNilPublisher = errors.New("event publisher is nil")
paddy@2 13 )
paddy@2 14
paddy@2 15 type NSQPublisher nsq.Producer
paddy@2 16
paddy@2 17 func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) {
paddy@2 18 cfg := nsq.NewConfig()
paddy@2 19 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
paddy@2 20 err := cfg.Validate()
paddy@2 21 if err != nil {
paddy@2 22 return nil, err
paddy@2 23 }
paddy@2 24 producer, err := nsq.NewProducer(address, cfg)
paddy@2 25 if err != nil {
paddy@2 26 return nil, err
paddy@2 27 }
paddy@2 28 return (*NSQPublisher)(producer), nil
paddy@2 29 }
paddy@2 30
paddy@2 31 func (n *NSQPublisher) Publish(topic string, e Event) error {
paddy@2 32 if n == nil {
paddy@2 33 return ErrNilPublisher
paddy@2 34 }
paddy@2 35 data, err := json.Marshal(e)
paddy@2 36 if err != nil {
paddy@2 37 return err
paddy@2 38 }
paddy@2 39 return (*nsq.Producer)(n).Publish(topic, data)
paddy@2 40 }
paddy@2 41
paddy@2 42 type NSQSubscriber nsq.Consumer
paddy@2 43
paddy@2 44 func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) {
paddy@2 45 cfg := nsq.NewConfig()
paddy@2 46 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
paddy@2 47 err := cfg.Validate()
paddy@2 48 if err != nil {
paddy@2 49 return nil, err
paddy@2 50 }
paddy@2 51 consumer, err := nsq.NewConsumer(topic, channel, cfg)
paddy@2 52 if err != nil {
paddy@2 53 return nil, err
paddy@2 54 }
paddy@2 55 consumer.AddHandler(handler)
paddy@2 56 err = consumer.ConnectToNSQLookupds(lookupds)
paddy@2 57 if err != nil {
paddy@2 58 return nil, err
paddy@2 59 }
paddy@2 60 return (*NSQSubscriber)(consumer), nil
paddy@2 61 }
paddy@2 62
paddy@2 63 func (sub *NSQSubscriber) Block() {
paddy@2 64 <-(*nsq.Consumer)(sub).StopChan
paddy@2 65 }