Create a listener that will create subscriptions.
We need a listener (as discussed in c4cfceb2f2fb) that will create a
Subscription whenever an auth.Profile is created. This is the beginning of that
effort. It hasn't been tested, and all the pieces aren't in place, but it's a
rough skeleton.
We have a Dockerfile that will correctly build a minimal container for the
listener.
We have a build-docker.sh script that will correctly build a binary that will be
used in the Dockerfile.
We have a ca-certificates.crt, which are pulled from Ubuntu, and are necessary
before we can safely consume SSL endpoints.
We created a small consumer script that listens for events off NSQ, and calls
the appropriate endpoint for our Subscriptions API. This is untested, and it
doesn't build at the moment, but that's awaiting changes in the
code.secondbit.org/auth.hg package.
Finally, we have a wrapper.sh file that will expose the Stripe secret key being
used from a Kubernetes secret file as an environment variable, instead.
6 "code.secondbit.org/uuid.hg"
9 "github.com/secondbit/pan"
12 // GetSQLTableName fulfills the pan.SQLTableNamer interface, allowing
13 // us to manipulate Subscriptions with pan.
14 func (s Subscription) GetSQLTableName() string {
15 return "subscriptions"
18 func (p Postgres) resetSQL() *pan.Query {
19 var subscription Subscription
20 query := pan.New(pan.POSTGRES, "TRUNCATE "+pan.GetTableName(subscription))
21 return query.FlushExpressions(" ")
24 func (p Postgres) Reset() error {
26 _, err := p.Exec(query.String(), query.Args...)
33 func (p Postgres) createSubscriptionSQL(subscription Subscription) *pan.Query {
34 fields, values := pan.GetFields(subscription)
35 query := pan.New(pan.POSTGRES, "INSERT INTO "+pan.GetTableName(subscription))
36 query.Include("(" + pan.QueryList(fields) + ")")
37 query.Include("VALUES")
38 query.Include("("+pan.VariableList(len(values))+")", values...)
39 return query.FlushExpressions(" ")
42 func (p Postgres) CreateSubscription(sub Subscription) error {
43 query := p.createSubscriptionSQL(sub)
44 _, err := p.Exec(query.String(), query.Args...)
45 if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_pkey" {
46 err = ErrSubscriptionAlreadyExists
47 } else if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_stripe_subscription_key" {
48 err = ErrStripeSubscriptionAlreadyExists
53 func (p Postgres) updateSubscriptionSQL(id uuid.ID, change SubscriptionChange) *pan.Query {
54 var subscription Subscription
55 query := pan.New(pan.POSTGRES, "UPDATE "+pan.GetTableName(subscription)+" SET")
56 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "StripeSubscription")+" = ?", change.StripeSubscription)
57 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "Plan")+" = ?", change.Plan)
58 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "Status")+" = ?", change.Status)
59 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "Canceling")+" = ?", change.Canceling)
60 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "TrialStart")+" = ?", change.TrialStart)
61 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "TrialEnd")+" = ?", change.TrialEnd)
62 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "PeriodStart")+" = ?", change.PeriodStart)
63 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "PeriodEnd")+" = ?", change.PeriodEnd)
64 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "CanceledAt")+" = ?", change.CanceledAt)
65 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "FailedChargeAttempts")+" = ?", change.FailedChargeAttempts)
66 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "LastFailedCharge")+" = ?", change.LastFailedCharge)
67 query.IncludeIfNotNil(pan.GetUnquotedColumn(subscription, "LastNotified")+" = ?", change.LastNotified)
68 query.FlushExpressions(", ")
70 query.Include(pan.GetUnquotedColumn(subscription, "UserID")+" = ?", id)
71 return query.FlushExpressions(" ")
74 func (p Postgres) UpdateSubscription(id uuid.ID, change SubscriptionChange) error {
76 return ErrSubscriptionChangeEmpty
79 query := p.updateSubscriptionSQL(id, change)
80 res, err := p.Exec(query.String(), query.Args...)
81 if e, ok := err.(*pq.Error); ok && e.Constraint == "subscriptions_stripe_subscription_key" {
82 return ErrStripeSubscriptionAlreadyExists
83 } else if err != nil {
86 rows, err := res.RowsAffected()
91 return ErrSubscriptionNotFound
96 func (p Postgres) deleteSubscriptionSQL(id uuid.ID) *pan.Query {
97 var subscription Subscription
98 query := pan.New(pan.POSTGRES, "DELETE FROM "+pan.GetTableName(subscription))
100 query.Include(pan.GetUnquotedColumn(subscription, "UserID")+" = ?", id)
101 return query.FlushExpressions(" ")
104 func (p Postgres) DeleteSubscription(id uuid.ID) error {
105 query := p.deleteSubscriptionSQL(id)
106 res, err := p.Exec(query.String(), query.Args...)
110 rows, err := res.RowsAffected()
115 return ErrSubscriptionNotFound
120 func (p Postgres) getSubscriptionsSQL(ids []uuid.ID) *pan.Query {
121 var subscription Subscription
122 fields, _ := pan.GetFields(subscription)
123 intIDs := make([]interface{}, len(ids))
124 for pos, id := range ids {
127 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM "+pan.GetTableName(subscription))
129 query.Include(pan.GetUnquotedColumn(subscription, "UserID") + " IN")
130 query.Include("("+pan.VariableList(len(intIDs))+")", intIDs...)
131 return query.FlushExpressions(" ")
134 func (p Postgres) GetSubscriptions(ids []uuid.ID) (map[string]Subscription, error) {
135 results := map[string]Subscription{}
137 return results, ErrNoSubscriptionID
139 query := p.getSubscriptionsSQL(ids)
140 rows, err := p.Query(query.String(), query.Args...)
145 var subscription Subscription
146 err := pan.Unmarshal(rows, &subscription)
150 results[subscription.UserID.String()] = subscription
152 if err := rows.Err(); err != nil {
158 func (p Postgres) getSubscriptionStatsCountSQL() *pan.Query {
159 var subscription Subscription
160 query := pan.New(pan.POSTGRES, "SELECT COUNT(*) FROM")
161 query.Include(pan.GetTableName(subscription))
162 return query.FlushExpressions(" ")
165 func (p Postgres) getSubscriptionStatsCancelingSQL() *pan.Query {
166 var subscription Subscription
167 query := pan.New(pan.POSTGRES, "SELECT COUNT(*) FROM")
168 query.Include(pan.GetTableName(subscription))
170 query.Include(pan.GetUnquotedColumn(subscription, "Canceling")+" = ?", true)
171 return query.FlushExpressions(" ")
174 func (p Postgres) getSubscriptionStatsFailingSQL() *pan.Query {
175 var subscription Subscription
176 query := pan.New(pan.POSTGRES, "SELECT COUNT(*) FROM")
177 query.Include(pan.GetTableName(subscription))
179 statuses := []interface{}{"past_due", "unpaid"}
180 query.Include(pan.GetUnquotedColumn(subscription, "Status")+" IN ("+pan.VariableList(len(statuses))+")", statuses...)
181 return query.FlushExpressions(" ")
184 func (p Postgres) getSubscriptionStatsPlansSQL() *pan.Query {
185 var subscription Subscription
186 fields := []interface{}{pan.GetUnquotedColumn(subscription, "Plan"), "COUNT(*)"}
187 query := pan.New(pan.POSTGRES, "SELECT "+pan.QueryList(fields)+" FROM")
188 query.Include(pan.GetTableName(subscription))
189 query.Include("GROUP BY " + pan.GetUnquotedColumn(subscription, "Plan"))
190 return query.FlushExpressions(" ")
193 func (p Postgres) GetSubscriptionStats() (SubscriptionStats, error) {
194 stats := SubscriptionStats{
195 Plans: map[string]int64{},
197 query := p.getSubscriptionStatsCountSQL()
198 err := p.QueryRow(query.String(), query.Args...).Scan(&stats.Number)
200 log.Printf("Error querying for total subscriptions: %+v\n", err)
203 query = p.getSubscriptionStatsCancelingSQL()
204 err = p.QueryRow(query.String(), query.Args...).Scan(&stats.Canceling)
206 log.Printf("Error querying for canceling subscriptions: %+v\n", err)
209 query = p.getSubscriptionStatsFailingSQL()
210 err = p.QueryRow(query.String(), query.Args...).Scan(&stats.Failing)
212 log.Printf("Error querying for failing subscriptions: %+v\n", err)
215 query = p.getSubscriptionStatsPlansSQL()
216 rows, err := p.Query(query.String(), query.Args...)
218 log.Printf("Error querying for plans: %+v\n", err)
224 err := rows.Scan(&plan, &count)
226 log.Printf("Error scanning database row for plans: %+v\n", err)
229 stats.Plans[plan] = count
231 if err := rows.Err(); err != nil {
232 log.Printf("Error querying for plans: %+v\n", err)