events
2015-12-14
Parent:987595765978
events/nsq.go
Update to new nsqio repo. The go-nsq client is now located under a nsqio org on Github, so we need to update the import path.
| paddy@2 | 1 package events |
| paddy@2 | 2 |
| paddy@2 | 3 import ( |
| paddy@2 | 4 "encoding/json" |
| paddy@2 | 5 "errors" |
| paddy@2 | 6 "fmt" |
| paddy@2 | 7 |
| paddy@5 | 8 "github.com/nsqio/go-nsq" |
| paddy@2 | 9 ) |
| paddy@2 | 10 |
| paddy@2 | 11 var ( |
| paddy@2 | 12 ErrNilPublisher = errors.New("event publisher is nil") |
| paddy@2 | 13 ) |
| paddy@2 | 14 |
| paddy@2 | 15 type NSQPublisher nsq.Producer |
| paddy@2 | 16 |
| paddy@2 | 17 func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) { |
| paddy@2 | 18 cfg := nsq.NewConfig() |
| paddy@2 | 19 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION) |
| paddy@2 | 20 err := cfg.Validate() |
| paddy@2 | 21 if err != nil { |
| paddy@2 | 22 return nil, err |
| paddy@2 | 23 } |
| paddy@2 | 24 producer, err := nsq.NewProducer(address, cfg) |
| paddy@2 | 25 if err != nil { |
| paddy@2 | 26 return nil, err |
| paddy@2 | 27 } |
| paddy@2 | 28 return (*NSQPublisher)(producer), nil |
| paddy@2 | 29 } |
| paddy@2 | 30 |
| paddy@2 | 31 func (n *NSQPublisher) Publish(topic string, e Event) error { |
| paddy@2 | 32 if n == nil { |
| paddy@2 | 33 return ErrNilPublisher |
| paddy@2 | 34 } |
| paddy@2 | 35 data, err := json.Marshal(e) |
| paddy@2 | 36 if err != nil { |
| paddy@2 | 37 return err |
| paddy@2 | 38 } |
| paddy@2 | 39 return (*nsq.Producer)(n).Publish(topic, data) |
| paddy@2 | 40 } |
| paddy@2 | 41 |
| paddy@2 | 42 type NSQSubscriber nsq.Consumer |
| paddy@2 | 43 |
| paddy@2 | 44 func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) { |
| paddy@2 | 45 cfg := nsq.NewConfig() |
| paddy@2 | 46 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION) |
| paddy@2 | 47 err := cfg.Validate() |
| paddy@2 | 48 if err != nil { |
| paddy@2 | 49 return nil, err |
| paddy@2 | 50 } |
| paddy@2 | 51 consumer, err := nsq.NewConsumer(topic, channel, cfg) |
| paddy@2 | 52 if err != nil { |
| paddy@2 | 53 return nil, err |
| paddy@2 | 54 } |
| paddy@2 | 55 consumer.AddHandler(handler) |
| paddy@2 | 56 err = consumer.ConnectToNSQLookupds(lookupds) |
| paddy@2 | 57 if err != nil { |
| paddy@2 | 58 return nil, err |
| paddy@2 | 59 } |
| paddy@2 | 60 return (*NSQSubscriber)(consumer), nil |
| paddy@2 | 61 } |
| paddy@2 | 62 |
| paddy@2 | 63 func (sub *NSQSubscriber) Block() { |
| paddy@2 | 64 <-(*nsq.Consumer)(sub).StopChan |
| paddy@2 | 65 } |