events
2015-07-14
Child:df95f058a7b5
events/nsq.go
Create NSQ helpers. Create an NSQPublisher helper that will allow us to publish events in the system in a consistent way. It basically just uses our dictated Event type, to make sure everything's using the same format. Create an NSQSubscriber helper that will allow us to listen for events in the system. It basically creates an nsq.Consumer, adds a handler to it, connects it to lookupds, and has a helper to block until the consumer is told to stop. Eventually, if github.com/bitly/nsq/issues/601 goes well, we should also have this poll our DNS to check for new nsqlookupds, and automatically connect to them.
1 package events
3 import (
4 "encoding/json"
5 "errors"
6 "fmt"
8 "github.com/bitly/go-nsq"
9 )
11 var (
12 ErrNilPublisher = errors.New("event publisher is nil")
13 )
15 type NSQPublisher nsq.Producer
17 func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) {
18 cfg := nsq.NewConfig()
19 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
20 err := cfg.Validate()
21 if err != nil {
22 return nil, err
23 }
24 producer, err := nsq.NewProducer(address, cfg)
25 if err != nil {
26 return nil, err
27 }
28 return (*NSQPublisher)(producer), nil
29 }
31 func (n *NSQPublisher) Publish(topic string, e Event) error {
32 if n == nil {
33 return ErrNilPublisher
34 }
35 data, err := json.Marshal(e)
36 if err != nil {
37 return err
38 }
39 return (*nsq.Producer)(n).Publish(topic, data)
40 }
42 type NSQSubscriber nsq.Consumer
44 func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) {
45 cfg := nsq.NewConfig()
46 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
47 err := cfg.Validate()
48 if err != nil {
49 return nil, err
50 }
51 consumer, err := nsq.NewConsumer(topic, channel, cfg)
52 if err != nil {
53 return nil, err
54 }
55 consumer.AddHandler(handler)
56 err = consumer.ConnectToNSQLookupds(lookupds)
57 if err != nil {
58 return nil, err
59 }
60 return (*NSQSubscriber)(consumer), nil
61 }
63 func (sub *NSQSubscriber) Block() {
64 <-(*nsq.Consumer)(sub).StopChan
65 }