Implement PostgreSQL support, drop subscription IDs.
Create a Postgres object that wraps database/sql, so we can attach methods to it
and fulfill interfaces.
Create a postgres_init.sql script that will create the subscriptions table in a
PostgreSQL database.
Make our period type fulfill the driver.Valuer and driver.Scanner types, so it
can be stored in and retrieved from SQL databases.
Create a SubscriptionStats type, and add a method to our subscriptionStore
interface that will allow us to retrieve current stats about the Subscriptions
it is storing.
Deprecated the ID property of our Subscription type, and use the
Subscription.UserID property instead as our primary key. Subscriptions should be
unique per user and we generally will want to access Subscriptions in the
context of the User they belong to, so the UserID is a better primary key. This
also means we removed the getSubscriptionByUserID method (and implementations)
from our subscriptionStore, as getSubscriptions now fills that role.
Implement our getSubscriptionStats method in the memstore.
Implement the subscriptionStore interface on our new Postgres type.
Run the subscription store tests on our Postgres type, as well, if the
PG_TEST_DB environment variable is set.
Round all our timestamps in our tests to the nearest millisecond, as Postgres
silently truncates all timestamps to the nearest millisecond, and it was causing
false test failures.
Remove the tests for our getSubscriptionStoreByUser method, as that was removed.
7 "code.secondbit.org/uuid.hg"
9 "github.com/secondbit/pan"
12 // GetSQLTableName fulfills the pan.SQLTableNamer interface, allowing
13 // us to manipulate Subscriptions with pan.
14 func (s Subscription) GetSQLTableName() string {
15 return "subscriptions"
18 func (p Postgres) resetSQL() *pan.Query {
20 query := pan.New(pan.POSTGRES, "TRUNCATE "+pan.GetTableName(sub))
21 return query.FlushExpressions(" ")
24 func (p Postgres) reset() error {
26 _, err := p.Exec(query.String(), query.Args...)
33 func (p Postgres) createSubscriptionSQL(sub Subscription) *pan.Query {
34 fields, values := pan.GetFields(sub)
35 query := pan.New(pan.POSTGRES, "INSERT INTO "+pan.GetTableName(sub))
36 query.Include("(" + pan.QueryList(fields) + ")")
37 query.Include("VALUES")
38 query.Include("("+pan.VariableList(len(values))+")", values...)
39 return query.FlushExpressions(" ")
42 func (p Postgres) createSubscription(sub Subscription) error {
43 query := p.createSubscriptionSQL(sub)
44 _, err := p.Exec(query.String(), query.Args...)
45 if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_pkey" {
46 err = ErrSubscriptionAlreadyExists
47 } else if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_stripe_customer_key" {
48 err = ErrStripeCustomerAlreadyExists
53 func (p Postgres) updateSubscriptionSQL(id uuid.ID, change SubscriptionChange) *pan.Query {
55 query := pan.New(pan.POSTGRES, "UPDATE "+pan.GetTableName(sub)+" SET")
56 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "StripeCustomer")+" = ?", change.StripeCustomer)
57 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "Amount")+" = ?", change.Amount)
58 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "Period")+" = ?", change.Period)
59 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "BeginCharging")+" = ?", change.BeginCharging)
60 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "LastCharged")+" = ?", change.LastCharged)
61 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "LastNotified")+" = ?", change.LastNotified)
62 query.IncludeIfNotNil(pan.GetUnquotedColumn(sub, "InLockout")+" = ?", change.InLockout)
63 query.FlushExpressions(", ")
65 query.Include(pan.GetUnquotedColumn(sub, "UserID")+" = ?", id)
66 return query.FlushExpressions(" ")
69 func (p Postgres) updateSubscription(id uuid.ID, change SubscriptionChange) error {
71 return ErrSubscriptionChangeEmpty
74 query := p.updateSubscriptionSQL(id, change)
75 res, err := p.Exec(query.String(), query.Args...)
76 if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_stripe_customer_key" {
77 return ErrStripeCustomerAlreadyExists
78 } else if err != nil {
81 rows, err := res.RowsAffected()
86 return ErrSubscriptionNotFound
91 func (p Postgres) deleteSubscriptionSQL(id uuid.ID) *pan.Query {
93 query := pan.New(pan.POSTGRES, "DELETE FROM "+pan.GetTableName(sub))
95 query.Include(pan.GetUnquotedColumn(sub, "UserID")+" = ?", id)
96 return query.FlushExpressions(" ")
99 func (p Postgres) deleteSubscription(id uuid.ID) error {
100 query := p.deleteSubscriptionSQL(id)
101 res, err := p.Exec(query.String(), query.Args...)
105 rows, err := res.RowsAffected()
110 return ErrSubscriptionNotFound
115 func (p Postgres) listSubscriptionsLastChargedBeforeSQL(cutoff time.Time) *pan.Query {
117 fields, _ := pan.GetFields(sub)
118 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(sub))
120 query.Include(pan.GetUnquotedColumn(sub, "LastCharged")+" < ?", cutoff)
121 query.IncludeOrder(pan.GetUnquotedColumn(sub, "LastCharged") + " ASC")
122 return query.FlushExpressions(" ")
125 func (p Postgres) listSubscriptionsLastChargedBefore(cutoff time.Time) ([]Subscription, error) {
126 var results []Subscription
127 query := p.listSubscriptionsLastChargedBeforeSQL(cutoff)
128 rows, err := p.Query(query.String(), query.Args...)
134 err := pan.Unmarshal(rows, &sub)
138 results = append(results, sub)
140 if err := rows.Err(); err != nil {
146 func (p Postgres) getSubscriptionsSQL(ids []uuid.ID) *pan.Query {
148 fields, _ := pan.GetFields(sub)
149 intIDs := make([]interface{}, len(ids))
150 for pos, id := range ids {
153 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(sub))
155 query.Include(pan.GetUnquotedColumn(sub, "UserID") + " IN")
156 query.Include("("+pan.VariableList(len(intIDs))+")", intIDs...)
157 return query.FlushExpressions(" ")
160 func (p Postgres) getSubscriptions(ids []uuid.ID) (map[string]Subscription, error) {
161 results := map[string]Subscription{}
163 return results, ErrNoSubscriptionID
165 query := p.getSubscriptionsSQL(ids)
166 rows, err := p.Query(query.String(), query.Args...)
172 err := pan.Unmarshal(rows, &sub)
176 results[sub.UserID.String()] = sub
178 if err := rows.Err(); err != nil {
184 func (p Postgres) getSubscriptionStatsSQL() *pan.Query {
186 amountColumn := pan.GetUnquotedColumn(sub, "Amount")
187 query := pan.New(pan.POSTGRES, "SELECT")
188 query.Include("COUNT(*), SUM(" + amountColumn + "), AVG(" + amountColumn + ")")
189 query.Include("FROM " + pan.GetTableName(sub))
190 return query.FlushExpressions(" ")
193 func (p Postgres) getSubscriptionStats() (SubscriptionStats, error) {
194 query := p.getSubscriptionStatsSQL()
195 rows, err := p.Query(query.String(), query.Args...)
197 return SubscriptionStats{}, err
199 var stats SubscriptionStats
201 var number, total sql.NullInt64
202 var mean sql.NullFloat64
203 if err := rows.Scan(number, total, mean); err != nil {
207 stats.Number = number.Int64
210 stats.TotalAmount = total.Int64
213 stats.MeanAmount = mean.Float64
216 if err := rows.Err(); err != nil {