events
2015-12-14
Parent:987595765978
events/nsq.go
Update to new nsqio repo. The go-nsq client is now located under a nsqio org on Github, so we need to update the import path.
1 package events
3 import (
4 "encoding/json"
5 "errors"
6 "fmt"
8 "github.com/nsqio/go-nsq"
9 )
11 var (
12 ErrNilPublisher = errors.New("event publisher is nil")
13 )
15 type NSQPublisher nsq.Producer
17 func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) {
18 cfg := nsq.NewConfig()
19 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
20 err := cfg.Validate()
21 if err != nil {
22 return nil, err
23 }
24 producer, err := nsq.NewProducer(address, cfg)
25 if err != nil {
26 return nil, err
27 }
28 return (*NSQPublisher)(producer), nil
29 }
31 func (n *NSQPublisher) Publish(topic string, e Event) error {
32 if n == nil {
33 return ErrNilPublisher
34 }
35 data, err := json.Marshal(e)
36 if err != nil {
37 return err
38 }
39 return (*nsq.Producer)(n).Publish(topic, data)
40 }
42 type NSQSubscriber nsq.Consumer
44 func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) {
45 cfg := nsq.NewConfig()
46 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
47 err := cfg.Validate()
48 if err != nil {
49 return nil, err
50 }
51 consumer, err := nsq.NewConsumer(topic, channel, cfg)
52 if err != nil {
53 return nil, err
54 }
55 consumer.AddHandler(handler)
56 err = consumer.ConnectToNSQLookupds(lookupds)
57 if err != nil {
58 return nil, err
59 }
60 return (*NSQSubscriber)(consumer), nil
61 }
63 func (sub *NSQSubscriber) Block() {
64 <-(*nsq.Consumer)(sub).StopChan
65 }