writebehind
2015-03-31
writebehind/writebehind.go
First pass implementation. Create our first implementation of this. Use a map of string=>int64 to store our values. The idea here is that users will create one cache for each metric they want to collect--each column that needs updating, in MySQL land. Release it under the MIT license, because why not? Very proud that the first commit carries 100% test coverage, no golint errors, no go vet errors, and has a benchmark.
1 package writebehind
3 import (
4 "bytes"
5 "fmt"
6 "sync"
7 "time"
8 )
10 // Incrementer defines an interface that allows items that can be identified
11 // using a string to be incremented by a value that can be stored as an int64.
12 // It only concerns itself with basic addition, as supported by the Go runtime.
13 // math/big.Int and other such types are not supported.
14 type Incrementer interface {
15 Increment(values map[string]int64)
16 }
18 // Cache is a type that is used to store values in-memory, to be synced to a
19 // more permanent store later. It offers no read functionality; the only things
20 // you can do with it are increment the count for an item and sync it to the
21 // more permanent store. The more permanent store is written to using the
22 // Incrementer interface, and must implement that interface.
23 //
24 // Cache is a concurrency-safe type, and is safe for use in multiple goroutines
25 // at once.
26 type Cache struct {
27 incrementer Incrementer
28 duration time.Duration
29 values map[string]int64
30 ticker *time.Ticker
31 incoming chan incrementReq
32 kill chan struct{}
33 sync.Mutex
34 }
36 func (c *Cache) String() string {
37 var b bytes.Buffer
38 fmt.Fprintf(&b, "{")
39 first := true
40 for k, v := range c.values {
41 if !first {
42 fmt.Fprintf(&b, ", ")
43 }
44 fmt.Fprintf(&b, "%q: %v", k, v)
45 first = false
46 }
47 fmt.Fprintf(&b, "}")
48 return b.String()
50 }
52 // NeedsSync returns true if the Cache has any values to sync to the Incrementer,
53 // and false if the Cache is empty and is safe to dispose of.
54 func (c *Cache) NeedsSync() bool {
55 return len(c.values) > 0
56 }
58 // Sync calls through to the Incrementer associated with the Cache, flushing the
59 // Cache's values to the Incrementer. It can be called manually, at any time,
60 // safely, but will be called automatically on the Duration specified in the Cache.
61 func (c *Cache) Sync() {
62 c.Lock()
63 defer c.Unlock()
64 if !c.NeedsSync() {
65 return
66 }
67 c.incrementer.Increment(c.values)
68 c.values = map[string]int64{}
69 }
71 // NewCache creates, instantiates, and returns a cache using the specified values. It
72 // is the only way to obtain a usable Cache instance.
73 func NewCache(incrementer Incrementer, duration time.Duration) *Cache {
74 c := &Cache{
75 incrementer: incrementer,
76 duration: duration,
77 values: map[string]int64{},
78 incoming: make(chan incrementReq),
79 kill: make(chan struct{}),
80 ticker: time.NewTicker(duration),
81 }
82 go c.run()
83 return c
84 }
86 // Stop halts the syncing behaviour of the Cache and frees up the resources used
87 // in that syncing behaviour. The Cache's store of values remains intact, however.
88 // It's important to note that the Stop method _does not_ implicitly write to the
89 // Incrementer, and you should manually call the Sync() method after calling the
90 // Stop method, to store any lingering values that were written after the last
91 // Sync.
92 func (c *Cache) Stop() {
93 c.Lock()
94 defer c.Unlock()
95 close(c.kill)
96 c.ticker.Stop()
97 }
99 func (c *Cache) run() {
100 for {
101 select {
102 case <-c.ticker.C:
103 c.Sync()
104 case in := <-c.incoming:
105 c.increment(in.key, in.value)
106 case <-c.kill:
107 return
108 }
109 }
110 }
112 func (c *Cache) increment(key string, value int64) {
113 c.Lock()
114 defer c.Unlock()
115 c.values[key] = c.values[key] + value
116 }
118 // Increment updates the item that matches key in the Cache, adding value to it.
119 // It key does not exist in the Cache, it is created and set to value.
120 func (c *Cache) Increment(key string, value int64) {
121 c.incoming <- incrementReq{key: key, value: value}
122 }
124 type incrementReq struct {
125 key string
126 value int64
127 }
129 // MemoryIncrementer is an implementation of the Incrementer interface that stores
130 // values in memory. There is obviously no reason to do this outside of tests and
131 // stubs, and those two reasons are why this type exists. Don't use it for other
132 // things.
133 type MemoryIncrementer struct {
134 sync.RWMutex
135 values map[string]int64
136 }
138 // Increment updates the items specified by the keys of the map by adding the
139 // value of the map to the item's stored value.
140 func (m *MemoryIncrementer) Increment(increments map[string]int64) {
141 m.Lock()
142 defer m.Unlock()
143 if m.values == nil {
144 m.values = map[string]int64{}
145 }
146 for k, v := range increments {
147 m.values[k] = m.values[k] + v
148 }
149 }
151 // Get is a concurrency-safe helper to return the value for key.
152 func (m *MemoryIncrementer) Get(key string) int64 {
153 m.RLock()
154 defer m.RUnlock()
155 return m.values[key]
156 }