writebehind
2015-03-31
writebehind/writebehind.go
First pass implementation. Create our first implementation of this. Use a map of string=>int64 to store our values. The idea here is that users will create one cache for each metric they want to collect--each column that needs updating, in MySQL land. Release it under the MIT license, because why not? Very proud that the first commit carries 100% test coverage, no golint errors, no go vet errors, and has a benchmark.
| paddy@0 | 1 package writebehind |
| paddy@0 | 2 |
| paddy@0 | 3 import ( |
| paddy@0 | 4 "bytes" |
| paddy@0 | 5 "fmt" |
| paddy@0 | 6 "sync" |
| paddy@0 | 7 "time" |
| paddy@0 | 8 ) |
| paddy@0 | 9 |
| paddy@0 | 10 // Incrementer defines an interface that allows items that can be identified |
| paddy@0 | 11 // using a string to be incremented by a value that can be stored as an int64. |
| paddy@0 | 12 // It only concerns itself with basic addition, as supported by the Go runtime. |
| paddy@0 | 13 // math/big.Int and other such types are not supported. |
| paddy@0 | 14 type Incrementer interface { |
| paddy@0 | 15 Increment(values map[string]int64) |
| paddy@0 | 16 } |
| paddy@0 | 17 |
| paddy@0 | 18 // Cache is a type that is used to store values in-memory, to be synced to a |
| paddy@0 | 19 // more permanent store later. It offers no read functionality; the only things |
| paddy@0 | 20 // you can do with it are increment the count for an item and sync it to the |
| paddy@0 | 21 // more permanent store. The more permanent store is written to using the |
| paddy@0 | 22 // Incrementer interface, and must implement that interface. |
| paddy@0 | 23 // |
| paddy@0 | 24 // Cache is a concurrency-safe type, and is safe for use in multiple goroutines |
| paddy@0 | 25 // at once. |
| paddy@0 | 26 type Cache struct { |
| paddy@0 | 27 incrementer Incrementer |
| paddy@0 | 28 duration time.Duration |
| paddy@0 | 29 values map[string]int64 |
| paddy@0 | 30 ticker *time.Ticker |
| paddy@0 | 31 incoming chan incrementReq |
| paddy@0 | 32 kill chan struct{} |
| paddy@0 | 33 sync.Mutex |
| paddy@0 | 34 } |
| paddy@0 | 35 |
| paddy@0 | 36 func (c *Cache) String() string { |
| paddy@0 | 37 var b bytes.Buffer |
| paddy@0 | 38 fmt.Fprintf(&b, "{") |
| paddy@0 | 39 first := true |
| paddy@0 | 40 for k, v := range c.values { |
| paddy@0 | 41 if !first { |
| paddy@0 | 42 fmt.Fprintf(&b, ", ") |
| paddy@0 | 43 } |
| paddy@0 | 44 fmt.Fprintf(&b, "%q: %v", k, v) |
| paddy@0 | 45 first = false |
| paddy@0 | 46 } |
| paddy@0 | 47 fmt.Fprintf(&b, "}") |
| paddy@0 | 48 return b.String() |
| paddy@0 | 49 |
| paddy@0 | 50 } |
| paddy@0 | 51 |
| paddy@0 | 52 // NeedsSync returns true if the Cache has any values to sync to the Incrementer, |
| paddy@0 | 53 // and false if the Cache is empty and is safe to dispose of. |
| paddy@0 | 54 func (c *Cache) NeedsSync() bool { |
| paddy@0 | 55 return len(c.values) > 0 |
| paddy@0 | 56 } |
| paddy@0 | 57 |
| paddy@0 | 58 // Sync calls through to the Incrementer associated with the Cache, flushing the |
| paddy@0 | 59 // Cache's values to the Incrementer. It can be called manually, at any time, |
| paddy@0 | 60 // safely, but will be called automatically on the Duration specified in the Cache. |
| paddy@0 | 61 func (c *Cache) Sync() { |
| paddy@0 | 62 c.Lock() |
| paddy@0 | 63 defer c.Unlock() |
| paddy@0 | 64 if !c.NeedsSync() { |
| paddy@0 | 65 return |
| paddy@0 | 66 } |
| paddy@0 | 67 c.incrementer.Increment(c.values) |
| paddy@0 | 68 c.values = map[string]int64{} |
| paddy@0 | 69 } |
| paddy@0 | 70 |
| paddy@0 | 71 // NewCache creates, instantiates, and returns a cache using the specified values. It |
| paddy@0 | 72 // is the only way to obtain a usable Cache instance. |
| paddy@0 | 73 func NewCache(incrementer Incrementer, duration time.Duration) *Cache { |
| paddy@0 | 74 c := &Cache{ |
| paddy@0 | 75 incrementer: incrementer, |
| paddy@0 | 76 duration: duration, |
| paddy@0 | 77 values: map[string]int64{}, |
| paddy@0 | 78 incoming: make(chan incrementReq), |
| paddy@0 | 79 kill: make(chan struct{}), |
| paddy@0 | 80 ticker: time.NewTicker(duration), |
| paddy@0 | 81 } |
| paddy@0 | 82 go c.run() |
| paddy@0 | 83 return c |
| paddy@0 | 84 } |
| paddy@0 | 85 |
| paddy@0 | 86 // Stop halts the syncing behaviour of the Cache and frees up the resources used |
| paddy@0 | 87 // in that syncing behaviour. The Cache's store of values remains intact, however. |
| paddy@0 | 88 // It's important to note that the Stop method _does not_ implicitly write to the |
| paddy@0 | 89 // Incrementer, and you should manually call the Sync() method after calling the |
| paddy@0 | 90 // Stop method, to store any lingering values that were written after the last |
| paddy@0 | 91 // Sync. |
| paddy@0 | 92 func (c *Cache) Stop() { |
| paddy@0 | 93 c.Lock() |
| paddy@0 | 94 defer c.Unlock() |
| paddy@0 | 95 close(c.kill) |
| paddy@0 | 96 c.ticker.Stop() |
| paddy@0 | 97 } |
| paddy@0 | 98 |
| paddy@0 | 99 func (c *Cache) run() { |
| paddy@0 | 100 for { |
| paddy@0 | 101 select { |
| paddy@0 | 102 case <-c.ticker.C: |
| paddy@0 | 103 c.Sync() |
| paddy@0 | 104 case in := <-c.incoming: |
| paddy@0 | 105 c.increment(in.key, in.value) |
| paddy@0 | 106 case <-c.kill: |
| paddy@0 | 107 return |
| paddy@0 | 108 } |
| paddy@0 | 109 } |
| paddy@0 | 110 } |
| paddy@0 | 111 |
| paddy@0 | 112 func (c *Cache) increment(key string, value int64) { |
| paddy@0 | 113 c.Lock() |
| paddy@0 | 114 defer c.Unlock() |
| paddy@0 | 115 c.values[key] = c.values[key] + value |
| paddy@0 | 116 } |
| paddy@0 | 117 |
| paddy@0 | 118 // Increment updates the item that matches key in the Cache, adding value to it. |
| paddy@0 | 119 // It key does not exist in the Cache, it is created and set to value. |
| paddy@0 | 120 func (c *Cache) Increment(key string, value int64) { |
| paddy@0 | 121 c.incoming <- incrementReq{key: key, value: value} |
| paddy@0 | 122 } |
| paddy@0 | 123 |
| paddy@0 | 124 type incrementReq struct { |
| paddy@0 | 125 key string |
| paddy@0 | 126 value int64 |
| paddy@0 | 127 } |
| paddy@0 | 128 |
| paddy@0 | 129 // MemoryIncrementer is an implementation of the Incrementer interface that stores |
| paddy@0 | 130 // values in memory. There is obviously no reason to do this outside of tests and |
| paddy@0 | 131 // stubs, and those two reasons are why this type exists. Don't use it for other |
| paddy@0 | 132 // things. |
| paddy@0 | 133 type MemoryIncrementer struct { |
| paddy@0 | 134 sync.RWMutex |
| paddy@0 | 135 values map[string]int64 |
| paddy@0 | 136 } |
| paddy@0 | 137 |
| paddy@0 | 138 // Increment updates the items specified by the keys of the map by adding the |
| paddy@0 | 139 // value of the map to the item's stored value. |
| paddy@0 | 140 func (m *MemoryIncrementer) Increment(increments map[string]int64) { |
| paddy@0 | 141 m.Lock() |
| paddy@0 | 142 defer m.Unlock() |
| paddy@0 | 143 if m.values == nil { |
| paddy@0 | 144 m.values = map[string]int64{} |
| paddy@0 | 145 } |
| paddy@0 | 146 for k, v := range increments { |
| paddy@0 | 147 m.values[k] = m.values[k] + v |
| paddy@0 | 148 } |
| paddy@0 | 149 } |
| paddy@0 | 150 |
| paddy@0 | 151 // Get is a concurrency-safe helper to return the value for key. |
| paddy@0 | 152 func (m *MemoryIncrementer) Get(key string) int64 { |
| paddy@0 | 153 m.RLock() |
| paddy@0 | 154 defer m.RUnlock() |
| paddy@0 | 155 return m.values[key] |
| paddy@0 | 156 } |