events
4:2d030022b4b5
Go to Latest
events/nsq.go
Create our Model helper.
Create a Model interface that any type can implement by just providing getters
that will be used to build Events produced around that type.
Add a PublishModelEvent helper that takes a Publisher, a Model, and an action
(as a string) and builds a consistent Event around the Model, then publishes it
to the Publisher.
This is a helpful wrapper in most of our systems, and will probably handle 99%
of our NSQ publishes, because most of our infrastructure is going to be
something like "a new profile was created" or "a new login was created" or "a
profile was updated" and we just want to disseminate that information across all
our services, to push based on it, or trigger actions. This abstracts it out
nicely to a standard format and a nice, easy way to send those messages.
8 "github.com/bitly/go-nsq"
12 ErrNilPublisher = errors.New("event publisher is nil")
15 type NSQPublisher nsq.Producer
17 func NewNSQPublisher(userAgent, address string) (*NSQPublisher, error) {
18 cfg := nsq.NewConfig()
19 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
24 producer, err := nsq.NewProducer(address, cfg)
28 return (*NSQPublisher)(producer), nil
31 func (n *NSQPublisher) Publish(topic string, e Event) error {
33 return ErrNilPublisher
35 data, err := json.Marshal(e)
39 return (*nsq.Producer)(n).Publish(topic, data)
42 type NSQSubscriber nsq.Consumer
44 func NewNSQSubscriber(lookupds []string, handler nsq.Handler, topic, channel, userAgent string) (*NSQSubscriber, error) {
45 cfg := nsq.NewConfig()
46 cfg.UserAgent = fmt.Sprintf("%s go-nsq/%s", userAgent, nsq.VERSION)
51 consumer, err := nsq.NewConsumer(topic, channel, cfg)
55 consumer.AddHandler(handler)
56 err = consumer.ConnectToNSQLookupds(lookupds)
60 return (*NSQSubscriber)(consumer), nil
63 func (sub *NSQSubscriber) Block() {
64 <-(*nsq.Consumer)(sub).StopChan