ducky/subscriptions
ducky/subscriptions/subscription_postgres.go
Implement PostgreSQL support, drop subscription IDs. Create a Postgres object that wraps database/sql, so we can attach methods to it and fulfill interfaces. Create a postgres_init.sql script that will create the subscriptions table in a PostgreSQL database. Make our period type fulfill the driver.Valuer and driver.Scanner types, so it can be stored in and retrieved from SQL databases. Create a SubscriptionStats type, and add a method to our subscriptionStore interface that will allow us to retrieve current stats about the Subscriptions it is storing. Deprecated the ID property of our Subscription type, and use the Subscription.UserID property instead as our primary key. Subscriptions should be unique per user and we generally will want to access Subscriptions in the context of the User they belong to, so the UserID is a better primary key. This also means we removed the getSubscriptionByUserID method (and implementations) from our subscriptionStore, as getSubscriptions now fills that role. Implement our getSubscriptionStats method in the memstore. Implement the subscriptionStore interface on our new Postgres type. Run the subscription store tests on our Postgres type, as well, if the PG_TEST_DB environment variable is set. Round all our timestamps in our tests to the nearest millisecond, as Postgres silently truncates all timestamps to the nearest millisecond, and it was causing false test failures. Remove the tests for our getSubscriptionStoreByUser method, as that was removed.
| paddy@1 | 1 package subscriptions |
| paddy@1 | 2 |
| paddy@1 | 3 import ( |
| paddy@1 | 4 "database/sql" |
| paddy@1 | 5 "time" |
| paddy@1 | 6 |
| paddy@1 | 7 "code.secondbit.org/uuid.hg" |
| paddy@1 | 8 "github.com/lib/pq" |
| paddy@1 | 9 "github.com/secondbit/pan" |
| paddy@1 | 10 ) |
| paddy@1 | 11 |
| paddy@1 | 12 // GetSQLTableName fulfills the pan.SQLTableNamer interface, allowing |
| paddy@1 | 13 // us to manipulate Subscriptions with pan. |
| paddy@1 | 14 func (s Subscription) GetSQLTableName() string { |
| paddy@1 | 15 return "subscriptions" |
| paddy@1 | 16 } |
| paddy@1 | 17 |
| paddy@1 | 18 func (p Postgres) resetSQL() *pan.Query { |
| paddy@1 | 19 var sub Subscription |
| paddy@1 | 20 query := pan.New(pan.POSTGRES, "TRUNCATE "+pan.GetTableName(sub)) |
| paddy@1 | 21 return query.FlushExpressions(" ") |
| paddy@1 | 22 } |
| paddy@1 | 23 |
| paddy@1 | 24 func (p Postgres) reset() error { |
| paddy@1 | 25 query := p.resetSQL() |
| paddy@1 | 26 _, err := p.Exec(query.String(), query.Args...) |
| paddy@1 | 27 if err != nil { |
| paddy@1 | 28 return err |
| paddy@1 | 29 } |
| paddy@1 | 30 return nil |
| paddy@1 | 31 } |
| paddy@1 | 32 |
| paddy@1 | 33 func (p Postgres) createSubscriptionSQL(sub Subscription) *pan.Query { |
| paddy@1 | 34 fields, values := pan.GetFields(sub) |
| paddy@1 | 35 query := pan.New(pan.POSTGRES, "INSERT INTO "+pan.GetTableName(sub)) |
| paddy@1 | 36 query.Include("(" + pan.QueryList(fields) + ")") |
| paddy@1 | 37 query.Include("VALUES") |
| paddy@1 | 38 query.Include("("+pan.VariableList(len(values))+")", values...) |
| paddy@1 | 39 return query.FlushExpressions(" ") |
| paddy@1 | 40 } |
| paddy@1 | 41 |
| paddy@1 | 42 func (p Postgres) createSubscription(sub Subscription) error { |
| paddy@1 | 43 query := p.createSubscriptionSQL(sub) |
| paddy@1 | 44 _, err := p.Exec(query.String(), query.Args...) |
| paddy@1 | 45 if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_pkey" { |
| paddy@1 | 46 err = ErrSubscriptionAlreadyExists |
| paddy@1 | 47 } else if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_stripe_customer_key" { |
| paddy@1 | 48 err = ErrStripeCustomerAlreadyExists |
| paddy@1 | 49 } |
| paddy@1 | 50 return err |
| paddy@1 | 51 } |
| paddy@1 | 52 |
| paddy@1 | 53 func (p Postgres) updateSubscriptionSQL(id uuid.ID, change SubscriptionChange) *pan.Query { |
| paddy@1 | 54 var sub Subscription |
| paddy@1 | 55 query := pan.New(pan.POSTGRES, "UPDATE "+pan.GetTableName(sub)+" SET") |
| paddy@1 | 56 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "StripeCustomer")+" = ?", change.StripeCustomer) |
| paddy@1 | 57 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "Amount")+" = ?", change.Amount) |
| paddy@1 | 58 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "Period")+" = ?", change.Period) |
| paddy@1 | 59 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "BeginCharging")+" = ?", change.BeginCharging) |
| paddy@1 | 60 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "LastCharged")+" = ?", change.LastCharged) |
| paddy@1 | 61 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "LastNotified")+" = ?", change.LastNotified) |
| paddy@1 | 62 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "InLockout")+" = ?", change.InLockout) |
| paddy@1 | 63 query.FlushExpressions(", ") |
| paddy@1 | 64 query.IncludeWhere() |
| paddy@1 | 65 query.Include(pan.GetUnquotedColumn(sub, "UserID")+" = ?", id) |
| paddy@1 | 66 return query.FlushExpressions(" ") |
| paddy@1 | 67 } |
| paddy@1 | 68 |
| paddy@1 | 69 func (p Postgres) updateSubscription(id uuid.ID, change SubscriptionChange) error { |
| paddy@1 | 70 if change.IsEmpty() { |
| paddy@1 | 71 return ErrSubscriptionChangeEmpty |
| paddy@1 | 72 } |
| paddy@1 | 73 |
| paddy@1 | 74 query := p.updateSubscriptionSQL(id, change) |
| paddy@1 | 75 res, err := p.Exec(query.String(), query.Args...) |
| paddy@1 | 76 if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_stripe_customer_key" { |
| paddy@1 | 77 return ErrStripeCustomerAlreadyExists |
| paddy@1 | 78 } else if err != nil { |
| paddy@1 | 79 return err |
| paddy@1 | 80 } |
| paddy@1 | 81 rows, err := res.RowsAffected() |
| paddy@1 | 82 if err != nil { |
| paddy@1 | 83 return err |
| paddy@1 | 84 } |
| paddy@1 | 85 if rows < 1 { |
| paddy@1 | 86 return ErrSubscriptionNotFound |
| paddy@1 | 87 } |
| paddy@1 | 88 return nil |
| paddy@1 | 89 } |
| paddy@1 | 90 |
| paddy@1 | 91 func (p Postgres) deleteSubscriptionSQL(id uuid.ID) *pan.Query { |
| paddy@1 | 92 var sub Subscription |
| paddy@1 | 93 query := pan.New(pan.POSTGRES, "DELETE FROM "+pan.GetTableName(sub)) |
| paddy@1 | 94 query.IncludeWhere() |
| paddy@1 | 95 query.Include(pan.GetUnquotedColumn(sub, "UserID")+" = ?", id) |
| paddy@1 | 96 return query.FlushExpressions(" ") |
| paddy@1 | 97 } |
| paddy@1 | 98 |
| paddy@1 | 99 func (p Postgres) deleteSubscription(id uuid.ID) error { |
| paddy@1 | 100 query := p.deleteSubscriptionSQL(id) |
| paddy@1 | 101 res, err := p.Exec(query.String(), query.Args...) |
| paddy@1 | 102 if err != nil { |
| paddy@1 | 103 return err |
| paddy@1 | 104 } |
| paddy@1 | 105 rows, err := res.RowsAffected() |
| paddy@1 | 106 if err != nil { |
| paddy@1 | 107 return err |
| paddy@1 | 108 } |
| paddy@1 | 109 if rows < 1 { |
| paddy@1 | 110 return ErrSubscriptionNotFound |
| paddy@1 | 111 } |
| paddy@1 | 112 return nil |
| paddy@1 | 113 } |
| paddy@1 | 114 |
| paddy@1 | 115 func (p Postgres) listSubscriptionsLastChargedBeforeSQL(cutoff time.Time) *pan.Query { |
| paddy@1 | 116 var sub Subscription |
| paddy@1 | 117 fields, _ := pan.GetFields(sub) |
| paddy@1 | 118 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(sub)) |
| paddy@1 | 119 query.IncludeWhere() |
| paddy@1 | 120 query.Include(pan.GetUnquotedColumn(sub, "LastCharged")+" < ?", cutoff) |
| paddy@1 | 121 query.IncludeOrder(pan.GetUnquotedColumn(sub, "LastCharged") + " ASC") |
| paddy@1 | 122 return query.FlushExpressions(" ") |
| paddy@1 | 123 } |
| paddy@1 | 124 |
| paddy@1 | 125 func (p Postgres) listSubscriptionsLastChargedBefore(cutoff time.Time) ([]Subscription, error) { |
| paddy@1 | 126 var results []Subscription |
| paddy@1 | 127 query := p.listSubscriptionsLastChargedBeforeSQL(cutoff) |
| paddy@1 | 128 rows, err := p.Query(query.String(), query.Args...) |
| paddy@1 | 129 if err != nil { |
| paddy@1 | 130 return results, err |
| paddy@1 | 131 } |
| paddy@1 | 132 for rows.Next() { |
| paddy@1 | 133 var sub Subscription |
| paddy@1 | 134 err := pan.Unmarshal(rows, &sub) |
| paddy@1 | 135 if err != nil { |
| paddy@1 | 136 return results, err |
| paddy@1 | 137 } |
| paddy@1 | 138 results = append(results, sub) |
| paddy@1 | 139 } |
| paddy@1 | 140 if err := rows.Err(); err != nil { |
| paddy@1 | 141 return results, err |
| paddy@1 | 142 } |
| paddy@1 | 143 return results, nil |
| paddy@1 | 144 } |
| paddy@1 | 145 |
| paddy@1 | 146 func (p Postgres) getSubscriptionsSQL(ids []uuid.ID) *pan.Query { |
| paddy@1 | 147 var sub Subscription |
| paddy@1 | 148 fields, _ := pan.GetFields(sub) |
| paddy@1 | 149 intIDs := make([]interface{}, len(ids)) |
| paddy@1 | 150 for pos, id := range ids { |
| paddy@1 | 151 intIDs[pos] = id |
| paddy@1 | 152 } |
| paddy@1 | 153 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(sub)) |
| paddy@1 | 154 query.IncludeWhere() |
| paddy@1 | 155 query.Include(pan.GetUnquotedColumn(sub, "UserID") + " IN") |
| paddy@1 | 156 query.Include("("+pan.VariableList(len(intIDs))+")", intIDs...) |
| paddy@1 | 157 return query.FlushExpressions(" ") |
| paddy@1 | 158 } |
| paddy@1 | 159 |
| paddy@1 | 160 func (p Postgres) getSubscriptions(ids []uuid.ID) (map[string]Subscription, error) { |
| paddy@1 | 161 results := map[string]Subscription{} |
| paddy@1 | 162 if len(ids) < 1 { |
| paddy@1 | 163 return results, ErrNoSubscriptionID |
| paddy@1 | 164 } |
| paddy@1 | 165 query := p.getSubscriptionsSQL(ids) |
| paddy@1 | 166 rows, err := p.Query(query.String(), query.Args...) |
| paddy@1 | 167 if err != nil { |
| paddy@1 | 168 return results, err |
| paddy@1 | 169 } |
| paddy@1 | 170 for rows.Next() { |
| paddy@1 | 171 var sub Subscription |
| paddy@1 | 172 err := pan.Unmarshal(rows, &sub) |
| paddy@1 | 173 if err != nil { |
| paddy@1 | 174 return results, err |
| paddy@1 | 175 } |
| paddy@1 | 176 results[sub.UserID.String()] = sub |
| paddy@1 | 177 } |
| paddy@1 | 178 if err := rows.Err(); err != nil { |
| paddy@1 | 179 return results, err |
| paddy@1 | 180 } |
| paddy@1 | 181 return results, nil |
| paddy@1 | 182 } |
| paddy@1 | 183 |
| paddy@1 | 184 func (p Postgres) getSubscriptionStatsSQL() *pan.Query { |
| paddy@1 | 185 var sub Subscription |
| paddy@1 | 186 amountColumn := pan.GetUnquotedColumn(sub, "Amount") |
| paddy@1 | 187 query := pan.New(pan.POSTGRES, "SELECT") |
| paddy@1 | 188 query.Include("COUNT(*), SUM(" + amountColumn + "), AVG(" + amountColumn + ")") |
| paddy@1 | 189 query.Include("FROM " + pan.GetTableName(sub)) |
| paddy@1 | 190 return query.FlushExpressions(" ") |
| paddy@1 | 191 } |
| paddy@1 | 192 |
| paddy@1 | 193 func (p Postgres) getSubscriptionStats() (SubscriptionStats, error) { |
| paddy@1 | 194 query := p.getSubscriptionStatsSQL() |
| paddy@1 | 195 rows, err := p.Query(query.String(), query.Args...) |
| paddy@1 | 196 if err != nil { |
| paddy@1 | 197 return SubscriptionStats{}, err |
| paddy@1 | 198 } |
| paddy@1 | 199 var stats SubscriptionStats |
| paddy@1 | 200 for rows.Next() { |
| paddy@1 | 201 var number, total sql.NullInt64 |
| paddy@1 | 202 var mean sql.NullFloat64 |
| paddy@1 | 203 if err := rows.Scan(number, total, mean); err != nil { |
| paddy@1 | 204 return stats, err |
| paddy@1 | 205 } |
| paddy@1 | 206 if number.Valid { |
| paddy@1 | 207 stats.Number = number.Int64 |
| paddy@1 | 208 } |
| paddy@1 | 209 if total.Valid { |
| paddy@1 | 210 stats.TotalAmount = total.Int64 |
| paddy@1 | 211 } |
| paddy@1 | 212 if mean.Valid { |
| paddy@1 | 213 stats.MeanAmount = mean.Float64 |
| paddy@1 | 214 } |
| paddy@1 | 215 } |
| paddy@1 | 216 if err := rows.Err(); err != nil { |
| paddy@1 | 217 return stats, err |
| paddy@1 | 218 } |
| paddy@1 | 219 return stats, nil |
| paddy@1 | 220 } |