Update to use a generic event emitter.
Rather can creating a purpose-built event emitter for each and every event we
need to emit (I'm looking at you, login verification event) which is _downright
silly_, we're now using a generic event publisher that's based on saying "HEY A
MODEL UPDATED".
This means we need to change all our setup code in authd to use
events.NewNSQPublisher or events.NewStdoutPublisher instead of our homegrown
solutions. Which also means updating our config to take an events.Publisher
instead of our LoginVerificationNotifier (blergh).
Our Context also now uses an events.Publisher instead of a
LoginVerificationNotifier. Party all around! We also replaced our
SendLoginVerification helper method on Context with a SendModelEvent helper
method on Context, which is just a light wrapper around
events.PublishModelEvent.
Of course, all this means we need to update our email_verification listener to
listen to the correct channel (based on the model we want updates about) and
filter down to a Created action or our new custom action for "the customer wants
their verification resent", which I'm OK making a special case and not generic,
because c'mon. But we had a subtle change to all our constants, some of which
are unofficial constants now. I'm unsure how I feel about this.
We also updated our email_verification listener so that we're unmarshalling to a
custom loginEvent, which is just an events.Event that overwrites the Data
property to be an auth.Login instance. This is to make sure we don't need to
wrangle a map[string]interface{}, which is no fun. I'm also OK with
special-casing like this, because it's 1) a tiny amount of code, 2) properly
utilising composition, and 3) the only way I can think of to cleanly accomplish
what I want.
I also added a note about GetLogin's deficient handling of logins, namely that
it doesn't recognise admins and return Verification codes to them, which would
be a useful property for internal tools to take advantage of. Ah well.
I updated the Profile and Login implementations so they're now event.Model
instances, mainly by just exporting some strings from them through getters that
will let us automatically build an Event from them. This lets us use the
PublishModelEvent helper.
I updated our CreateProfileHandler to properly mangle the login Verification
property, and to fire off the ActionCreated events for the new Login and the new
Profile.
I updated our GetLoginHandler and UpdateLoginHandler to properly mangle the
loginVerification property. God that's annoying. :-/
You'll note I didn't start publishing the events.ActionUpdated or
events.ActionDeleted events for Profiles or Logins yet, and didn't bother
publishing any events for literally any other type. That's because I'm a lazy
piece of crap and will end up publishing them when I absolutely have to. Part of
that is because if a channel isn't created/being read for a topic, the messages
will just stack up in NSQ, and I don't want that. But mostly I'm lazy.
Finally, I got to delete the entire profile_verification.go file, because we're
no longer special-casing that. Hooray!
4 "code.secondbit.org/uuid.hg"
6 "github.com/secondbit/pan"
9 func (c Client) GetSQLTableName() string {
13 func (e Endpoint) GetSQLTableName() string {
17 func (p *postgres) getClientSQL(id uuid.ID) *pan.Query {
19 fields, _ := pan.GetFields(client)
20 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(client))
22 query.Include(pan.GetUnquotedColumn(client, "ID")+" = ? AND "+pan.GetUnquotedColumn(client, "Deleted")+" = ?", id, false)
23 return query.FlushExpressions(" ")
26 func (p *postgres) getClient(id uuid.ID) (Client, error) {
27 query := p.getClientSQL(id)
28 rows, err := p.db.Query(query.String(), query.Args...)
35 err := pan.Unmarshal(rows, &client)
41 if err = rows.Err(); err != nil {
45 return client, ErrClientNotFound
50 func (p *postgres) saveClientSQL(client Client) *pan.Query {
51 fields, values := pan.GetFields(client)
52 query := pan.New(pan.POSTGRES, "INSERT INTO "+pan.GetTableName(client))
53 query.Include("(" + pan.QueryList(fields) + ")")
54 query.Include("VALUES")
55 query.Include("("+pan.VariableList(len(values))+")", values...)
56 return query.FlushExpressions(" ")
59 func (p *postgres) saveClient(client Client) error {
60 query := p.saveClientSQL(client)
61 _, err := p.db.Exec(query.String(), query.Args...)
62 if e, ok := err.(*pq.Error); ok && e.Constraint == "clients_pkey" {
63 err = ErrClientAlreadyExists
68 func (p *postgres) updateClientSQL(id uuid.ID, change ClientChange) *pan.Query {
70 query := pan.New(pan.POSTGRES, "UPDATE "+pan.GetTableName(client)+" SET ")
71 query.IncludeIfNotNil(pan.GetUnquotedColumn(client, "Secret")+" = ?", change.Secret)
72 query.IncludeIfNotNil(pan.GetUnquotedColumn(client, "OwnerID")+" = ?", change.OwnerID)
73 query.IncludeIfNotNil(pan.GetUnquotedColumn(client, "Name")+" = ?", change.Name)
74 query.IncludeIfNotNil(pan.GetUnquotedColumn(client, "Logo")+" = ?", change.Logo)
75 query.IncludeIfNotNil(pan.GetUnquotedColumn(client, "Website")+" = ?", change.Website)
76 query.IncludeIfNotNil(pan.GetUnquotedColumn(client, "Deleted")+" = ?", change.Deleted)
77 query.FlushExpressions(", ")
79 query.Include(pan.GetUnquotedColumn(client, "ID")+" = ?", id)
80 return query.FlushExpressions(" ")
83 func (p *postgres) updateClient(id uuid.ID, change ClientChange) error {
87 query := p.updateClientSQL(id, change)
88 _, err := p.db.Exec(query.String(), query.Args...)
92 func (p *postgres) listClientsByOwnerSQL(ownerID uuid.ID, num, offset int) *pan.Query {
94 fields, _ := pan.GetFields(client)
95 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(client))
97 query.Include(pan.GetUnquotedColumn(client, "OwnerID")+" = ? AND "+pan.GetUnquotedColumn(client, "Deleted")+" = ?", ownerID, false)
99 query.IncludeLimit(int64(num))
101 query.IncludeOffset(int64(offset))
102 return query.FlushExpressions(" ")
105 func (p *postgres) listClientsByOwner(ownerID uuid.ID, num, offset int) ([]Client, error) {
106 query := p.listClientsByOwnerSQL(ownerID, num, offset)
107 rows, err := p.db.Query(query.String(), query.Args...)
109 return []Client{}, err
114 err = pan.Unmarshal(rows, &client)
118 clients = append(clients, client)
120 if err = rows.Err(); err != nil {
126 func (p *postgres) deleteClientsByOwnerSQL(ownerID uuid.ID) *pan.Query {
128 query := pan.New(pan.POSTGRES, "UPDATE "+pan.GetTableName(client)+" SET")
129 query.Include(pan.GetUnquotedColumn(client, "Deleted")+"= ?", true)
131 query.Include(pan.GetUnquotedColumn(client, "OwnerID")+" = ?", ownerID)
132 return query.FlushExpressions(" ")
135 func (p *postgres) deleteClientsByOwner(ownerID uuid.ID) error {
136 query := p.deleteClientsByOwnerSQL(ownerID)
137 _, err := p.db.Exec(query.String(), query.Args...)
141 func (p *postgres) addEndpointsSQL(endpoints []Endpoint) *pan.Query {
142 fields, _ := pan.GetFields(endpoints[0])
143 query := pan.New(pan.POSTGRES, "INSERT INTO "+pan.GetTableName(endpoints[0]))
144 query.Include("(" + pan.QueryList(fields) + ")")
145 query.Include("VALUES")
146 query.FlushExpressions(" ")
147 for _, endpoint := range endpoints {
148 _, values := pan.GetFields(endpoint)
149 query.Include("("+pan.VariableList(len(values))+")", values...)
151 return query.FlushExpressions(", ")
154 func (p *postgres) addEndpoints(endpoints []Endpoint) error {
155 if len(endpoints) < 1 {
158 query := p.addEndpointsSQL(endpoints)
159 _, err := p.db.Exec(query.String(), query.Args...)
160 if e, ok := err.(*pq.Error); ok && e.Constraint == "endpoints_pkey" {
161 return ErrEndpointAlreadyExists
166 func (p *postgres) removeEndpointSQL(client, endpoint uuid.ID) *pan.Query {
168 query := pan.New(pan.POSTGRES, "DELETE FROM "+pan.GetTableName(e))
170 query.Include(pan.GetUnquotedColumn(e, "ID")+" = ? AND "+pan.GetUnquotedColumn(e, "ClientID")+" = ?", endpoint, client)
171 return query.FlushExpressions(" ")
174 func (p *postgres) removeEndpoint(client, endpoint uuid.ID) error {
175 query := p.removeEndpointSQL(client, endpoint)
176 res, err := p.db.Exec(query.String(), query.Args...)
180 rows, err := res.RowsAffected()
185 return ErrEndpointNotFound
190 func (p *postgres) removeEndpointsByClientIDSQL(client uuid.ID) *pan.Query {
192 query := pan.New(pan.POSTGRES, "DELETE FROM "+pan.GetTableName(e))
194 query.Include(pan.GetUnquotedColumn(e, "ClientID")+" = ?", client)
195 return query.FlushExpressions(" ")
198 func (p *postgres) removeEndpointsByClientID(client uuid.ID) error {
199 query := p.removeEndpointsByClientIDSQL(client)
200 res, err := p.db.Exec(query.String(), query.Args...)
204 rows, err := res.RowsAffected()
209 return ErrClientNotFound
214 func (p *postgres) getEndpointSQL(client, endpoint uuid.ID) *pan.Query {
216 fields, _ := pan.GetFields(e)
217 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(e))
219 query.FlushExpressions(" ")
220 query.Include(pan.GetUnquotedColumn(e, "ID")+" = ?", endpoint)
221 query.Include(pan.GetUnquotedColumn(e, "ClientID")+" = ?", client)
222 return query.FlushExpressions(" AND ")
225 func (p *postgres) getEndpoint(client, endpoint uuid.ID) (Endpoint, error) {
226 query := p.getEndpointSQL(client, endpoint)
227 rows, err := p.db.Query(query.String(), query.Args...)
229 return Endpoint{}, err
234 err := pan.Unmarshal(rows, &e)
240 if err = rows.Err(); err != nil {
244 return e, ErrEndpointNotFound
249 func (p *postgres) checkEndpointSQL(client uuid.ID, endpoint string) *pan.Query {
251 fields, _ := pan.GetFields(e)
252 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(e))
254 query.FlushExpressions(" ")
255 query.Include(pan.GetUnquotedColumn(e, "ClientID")+" = ?", client)
256 query.Include(pan.GetUnquotedColumn(e, "NormalizedURI")+" = ?", endpoint)
257 return query.FlushExpressions(" AND ")
260 func (p *postgres) checkEndpoint(client uuid.ID, endpoint string) (bool, error) {
261 query := p.checkEndpointSQL(client, endpoint)
262 rows, err := p.db.Query(query.String(), query.Args...)
270 if err = rows.Err(); err != nil {
276 func (p *postgres) listEndpointsSQL(client uuid.ID, num, offset int) *pan.Query {
277 var endpoint Endpoint
278 fields, _ := pan.GetFields(endpoint)
279 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(endpoint))
281 query.Include(pan.GetUnquotedColumn(endpoint, "ClientID")+" = ?", client)
282 query.IncludeLimit(int64(num))
283 query.IncludeOffset(int64(offset))
284 return query.FlushExpressions(" ")
287 func (p *postgres) listEndpoints(client uuid.ID, num, offset int) ([]Endpoint, error) {
288 query := p.listEndpointsSQL(client, num, offset)
289 rows, err := p.db.Query(query.String(), query.Args...)
291 return []Endpoint{}, err
293 var endpoints []Endpoint
295 var endpoint Endpoint
296 err = pan.Unmarshal(rows, &endpoint)
298 return endpoints, err
300 endpoints = append(endpoints, endpoint)
302 if err = rows.Err(); err != nil {
303 return endpoints, err
305 return endpoints, nil
308 func (p *postgres) countEndpointsSQL(client uuid.ID) *pan.Query {
309 var endpoint Endpoint
310 query := pan.New(pan.POSTGRES, "SELECT COUNT(*) FROM "+pan.GetTableName(endpoint))
312 query.Include(pan.GetUnquotedColumn(endpoint, "ClientID")+" = ?", client)
313 return query.FlushExpressions(" ")
316 func (p *postgres) countEndpoints(client uuid.ID) (int64, error) {
317 query := p.countEndpointsSQL(client)
318 rows, err := p.db.Query(query.String(), query.Args...)
324 err = pan.Unmarshal(rows, &results)
329 if err = rows.Err(); err != nil {