ducky/subscriptions

Paddy 2015-06-30 Parent:b240b6123548 Child:fb2c0e498e37

5:fe8f092cc149 Go to Latest

ducky/subscriptions/subscription_postgres.go

Make subscriptions Kubernetes-ready. Update our .hgignore file to include the docker-ready subscripionsd binary (which differs from the local subscriptionsd binary only in that it's statically-compiled for linux). Add a replication controller for subscriptionsd that will spin up the appropriate pods for us and make sure our Stripe and Subscriptionsd secret volumes are attached, so the pods may configure themselves with that private info. Create a Stripe secret volume with a placeholder for the stripe secret key. Create a Subscriptionsd secret volume with the DSN sent to the base64 encoding of an empty string right now (which means subscriptionsd will store data in memory, but whatever.) Create a subscriptionsd service that will route traffic to the subscriptionsd pods created by the replication controller. Create a minimal Dockerfile to get the subscriptionsd binary running on kubernetes. Add a build-docker helper script that will compile a Docker-ready subscriptionsd binary (by compiling it in a Docker container). Copy a Ubuntu ca-certificates.crt file that we'll inject into our Dockerfile so the Stripe SSL can be resolved. Busybox doesn't have any certificates, by default. Create a wrapper script that will be run in the Docker container to read the secrets from our secret volumes and inject them as environment variables, which is what subscriptionsd understands.

History
1 package subscriptions
3 import (
4 "log"
6 "code.secondbit.org/uuid.hg"
8 "github.com/lib/pq"
9 "github.com/secondbit/pan"
10 )
12 // GetSQLTableName fulfills the pan.SQLTableNamer interface, allowing
13 // us to manipulate Subscriptions with pan.
14 func (s Subscription) GetSQLTableName() string {
15 return "subscriptions"
16 }
18 func (p Postgres) resetSQL() *pan.Query {
19 var subscription Subscription
20 query := pan.New(pan.POSTGRES, "TRUNCATE "+pan.GetTableName(subscription))
21 return query.FlushExpressions(" ")
22 }
24 func (p Postgres) Reset() error {
25 query := p.resetSQL()
26 _, err := p.Exec(query.String(), query.Args...)
27 if err != nil {
28 return err
29 }
30 return nil
31 }
33 func (p Postgres) createSubscriptionSQL(subscription Subscription) *pan.Query {
34 fields, values := pan.GetFields(subscription)
35 query := pan.New(pan.POSTGRES, "INSERT INTO "+pan.GetTableName(subscription))
36 query.Include("(" + pan.QueryList(fields) + ")")
37 query.Include("VALUES")
38 query.Include("("+pan.VariableList(len(values))+")", values...)
39 return query.FlushExpressions(" ")
40 }
42 func (p Postgres) CreateSubscription(sub Subscription) error {
43 query := p.createSubscriptionSQL(sub)
44 _, err := p.Exec(query.String(), query.Args...)
45 if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_pkey" {
46 err = ErrSubscriptionAlreadyExists
47 } else if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_stripe_subscription_key" {
48 err = ErrStripeSubscriptionAlreadyExists
49 }
50 return err
51 }
53 func (p Postgres) updateSubscriptionSQL(id uuid.ID, change SubscriptionChange) *pan.Query {
54 var subscription Subscription
55 query := pan.New(pan.POSTGRES, "UPDATE "+pan.GetTableName(subscription)+" SET")
56 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "StripeSubscription")+" = ?", change.StripeSubscription)
57 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "Plan")+" = ?", change.Plan)
58 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "Status")+" = ?", change.Status)
59 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "Canceling")+" = ?", change.Canceling)
60 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "TrialStart")+" = ?", change.TrialStart)
61 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "TrialEnd")+" = ?", change.TrialEnd)
62 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "PeriodStart")+" = ?", change.PeriodStart)
63 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "PeriodEnd")+" = ?", change.PeriodEnd)
64 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "CanceledAt")+" = ?", change.CanceledAt)
65 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "FailedChargeAttempts")+" = ?", change.FailedChargeAttempts)
66 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "LastFailedCharge")+" = ?", change.LastFailedCharge)
67 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "LastNotified")+" = ?", change.LastNotified)
68 query.FlushExpressions(", ")
69 query.IncludeWhere()
70 query.Include(pan.GetUnquotedColumn(subscription, "UserID")+" = ?", id)
71 return query.FlushExpressions(" ")
72 }
74 func (p Postgres) UpdateSubscription(id uuid.ID, change SubscriptionChange) error {
75 if change.IsEmpty() {
76 return ErrSubscriptionChangeEmpty
77 }
79 query := p.updateSubscriptionSQL(id, change)
80 res, err := p.Exec(query.String(), query.Args...)
81 if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_stripe_subscription_key" {
82 return ErrStripeSubscriptionAlreadyExists
83 } else if err != nil {
84 return err
85 }
86 rows, err := res.RowsAffected()
87 if err != nil {
88 return err
89 }
90 if rows < 1 {
91 return ErrSubscriptionNotFound
92 }
93 return nil
94 }
96 func (p Postgres) deleteSubscriptionSQL(id uuid.ID) *pan.Query {
97 var subscription Subscription
98 query := pan.New(pan.POSTGRES, "DELETE FROM "+pan.GetTableName(subscription))
99 query.IncludeWhere()
100 query.Include(pan.GetUnquotedColumn(subscription, "UserID")+" = ?", id)
101 return query.FlushExpressions(" ")
102 }
104 func (p Postgres) DeleteSubscription(id uuid.ID) error {
105 query := p.deleteSubscriptionSQL(id)
106 res, err := p.Exec(query.String(), query.Args...)
107 if err != nil {
108 return err
109 }
110 rows, err := res.RowsAffected()
111 if err != nil {
112 return err
113 }
114 if rows < 1 {
115 return ErrSubscriptionNotFound
116 }
117 return nil
118 }
120 func (p Postgres) getSubscriptionsSQL(ids []uuid.ID) *pan.Query {
121 var subscription Subscription
122 fields, _ := pan.GetFields(subscription)
123 intIDs := make([]interface{}, len(ids))
124 for pos, id := range ids {
125 intIDs[pos] = id
126 }
127 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(subscription))
128 query.IncludeWhere()
129 query.Include(pan.GetUnquotedColumn(subscription, "UserID") + " IN")
130 query.Include("("+pan.VariableList(len(intIDs))+")", intIDs...)
131 return query.FlushExpressions(" ")
132 }
134 func (p Postgres) GetSubscriptions(ids []uuid.ID) (map[string]Subscription, error) {
135 results := map[string]Subscription{}
136 if len(ids) < 1 {
137 return results, ErrNoSubscriptionID
138 }
139 query := p.getSubscriptionsSQL(ids)
140 rows, err := p.Query(query.String(), query.Args...)
141 if err != nil {
142 return results, err
143 }
144 for rows.Next() {
145 var subscription Subscription
146 err := pan.Unmarshal(rows, &subscription)
147 if err != nil {
148 return results, err
149 }
150 results[subscription.UserID.String()] = subscription
151 }
152 if err := rows.Err(); err != nil {
153 return results, err
154 }
155 return results, nil
156 }
158 func (p Postgres) getSubscriptionStatsCountSQL() *pan.Query {
159 var subscription Subscription
160 query := pan.New(pan.POSTGRES, "SELECT COUNT(*) FROM")
161 query.Include(pan.GetTableName(subscription))
162 return query.FlushExpressions(" ")
163 }
165 func (p Postgres) getSubscriptionStatsCancelingSQL() *pan.Query {
166 var subscription Subscription
167 query := pan.New(pan.POSTGRES, "SELECT COUNT(*) FROM")
168 query.Include(pan.GetTableName(subscription))
169 query.IncludeWhere()
170 query.Include(pan.GetUnquotedColumn(subscription, "Canceling")+" = ?", true)
171 return query.FlushExpressions(" ")
172 }
174 func (p Postgres) getSubscriptionStatsFailingSQL() *pan.Query {
175 var subscription Subscription
176 query := pan.New(pan.POSTGRES, "SELECT COUNT(*) FROM")
177 query.Include(pan.GetTableName(subscription))
178 query.IncludeWhere()
179 statuses := []interface{}{"past_due", "unpaid"}
180 query.Include(pan.GetUnquotedColumn(subscription, "Status")+" IN ("+pan.VariableList(len(statuses))+")", statuses...)
181 return query.FlushExpressions(" ")
182 }
184 func (p Postgres) getSubscriptionStatsPlansSQL() *pan.Query {
185 var subscription Subscription
186 fields := []interface{}{pan.GetUnquotedColumn(subscription, "Plan"), "COUNT(*)"}
187 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM")
188 query.Include(pan.GetTableName(subscription))
189 query.Include("GROUP BY " + pan.GetUnquotedColumn(subscription, "Plan"))
190 return query.FlushExpressions(" ")
191 }
193 func (p Postgres) GetSubscriptionStats() (SubscriptionStats, error) {
194 stats := SubscriptionStats{
195 Plans: map[string]int64{},
196 }
197 query := p.getSubscriptionStatsCountSQL()
198 err := p.QueryRow(query.String(), query.Args...).Scan(&stats.Number)
199 if err != nil {
200 log.Printf("Error querying for total subscriptions: %+v\n", err)
201 return stats, err
202 }
203 query = p.getSubscriptionStatsCancelingSQL()
204 err = p.QueryRow(query.String(), query.Args...).Scan(&stats.Canceling)
205 if err != nil {
206 log.Printf("Error querying for canceling subscriptions: %+v\n", err)
207 return stats, err
208 }
209 query = p.getSubscriptionStatsFailingSQL()
210 err = p.QueryRow(query.String(), query.Args...).Scan(&stats.Failing)
211 if err != nil {
212 log.Printf("Error querying for failing subscriptions: %+v\n", err)
213 return stats, err
214 }
215 query = p.getSubscriptionStatsPlansSQL()
216 rows, err := p.Query(query.String(), query.Args...)
217 if err != nil {
218 log.Printf("Error querying for plans: %+v\n", err)
219 return stats, err
220 }
221 for rows.Next() {
222 var plan string
223 var count int64
224 err := rows.Scan(&plan, &count)
225 if err != nil {
226 log.Printf("Error scanning database row for plans: %+v\n", err)
227 continue
228 }
229 stats.Plans[plan] = count
230 }
231 if err := rows.Err(); err != nil {
232 log.Printf("Error querying for plans: %+v\n", err)
233 return stats, err
234 }
235 return stats, nil
236 }