writebehind

Paddy 2015-03-31

0:85f9751b15ea Go to Latest

writebehind/writebehind.go

First pass implementation. Create our first implementation of this. Use a map of string=>int64 to store our values. The idea here is that users will create one cache for each metric they want to collect--each column that needs updating, in MySQL land. Release it under the MIT license, because why not? Very proud that the first commit carries 100% test coverage, no golint errors, no go vet errors, and has a benchmark.

History
paddy@0 1 package writebehind
paddy@0 2
paddy@0 3 import (
paddy@0 4 "bytes"
paddy@0 5 "fmt"
paddy@0 6 "sync"
paddy@0 7 "time"
paddy@0 8 )
paddy@0 9
paddy@0 10 // Incrementer defines an interface that allows items that can be identified
paddy@0 11 // using a string to be incremented by a value that can be stored as an int64.
paddy@0 12 // It only concerns itself with basic addition, as supported by the Go runtime.
paddy@0 13 // math/big.Int and other such types are not supported.
paddy@0 14 type Incrementer interface {
paddy@0 15 Increment(values map[string]int64)
paddy@0 16 }
paddy@0 17
paddy@0 18 // Cache is a type that is used to store values in-memory, to be synced to a
paddy@0 19 // more permanent store later. It offers no read functionality; the only things
paddy@0 20 // you can do with it are increment the count for an item and sync it to the
paddy@0 21 // more permanent store. The more permanent store is written to using the
paddy@0 22 // Incrementer interface, and must implement that interface.
paddy@0 23 //
paddy@0 24 // Cache is a concurrency-safe type, and is safe for use in multiple goroutines
paddy@0 25 // at once.
paddy@0 26 type Cache struct {
paddy@0 27 incrementer Incrementer
paddy@0 28 duration time.Duration
paddy@0 29 values map[string]int64
paddy@0 30 ticker *time.Ticker
paddy@0 31 incoming chan incrementReq
paddy@0 32 kill chan struct{}
paddy@0 33 sync.Mutex
paddy@0 34 }
paddy@0 35
paddy@0 36 func (c *Cache) String() string {
paddy@0 37 var b bytes.Buffer
paddy@0 38 fmt.Fprintf(&b, "{")
paddy@0 39 first := true
paddy@0 40 for k, v := range c.values {
paddy@0 41 if !first {
paddy@0 42 fmt.Fprintf(&b, ", ")
paddy@0 43 }
paddy@0 44 fmt.Fprintf(&b, "%q: %v", k, v)
paddy@0 45 first = false
paddy@0 46 }
paddy@0 47 fmt.Fprintf(&b, "}")
paddy@0 48 return b.String()
paddy@0 49
paddy@0 50 }
paddy@0 51
paddy@0 52 // NeedsSync returns true if the Cache has any values to sync to the Incrementer,
paddy@0 53 // and false if the Cache is empty and is safe to dispose of.
paddy@0 54 func (c *Cache) NeedsSync() bool {
paddy@0 55 return len(c.values) > 0
paddy@0 56 }
paddy@0 57
paddy@0 58 // Sync calls through to the Incrementer associated with the Cache, flushing the
paddy@0 59 // Cache's values to the Incrementer. It can be called manually, at any time,
paddy@0 60 // safely, but will be called automatically on the Duration specified in the Cache.
paddy@0 61 func (c *Cache) Sync() {
paddy@0 62 c.Lock()
paddy@0 63 defer c.Unlock()
paddy@0 64 if !c.NeedsSync() {
paddy@0 65 return
paddy@0 66 }
paddy@0 67 c.incrementer.Increment(c.values)
paddy@0 68 c.values = map[string]int64{}
paddy@0 69 }
paddy@0 70
paddy@0 71 // NewCache creates, instantiates, and returns a cache using the specified values. It
paddy@0 72 // is the only way to obtain a usable Cache instance.
paddy@0 73 func NewCache(incrementer Incrementer, duration time.Duration) *Cache {
paddy@0 74 c := &Cache{
paddy@0 75 incrementer: incrementer,
paddy@0 76 duration: duration,
paddy@0 77 values: map[string]int64{},
paddy@0 78 incoming: make(chan incrementReq),
paddy@0 79 kill: make(chan struct{}),
paddy@0 80 ticker: time.NewTicker(duration),
paddy@0 81 }
paddy@0 82 go c.run()
paddy@0 83 return c
paddy@0 84 }
paddy@0 85
paddy@0 86 // Stop halts the syncing behaviour of the Cache and frees up the resources used
paddy@0 87 // in that syncing behaviour. The Cache's store of values remains intact, however.
paddy@0 88 // It's important to note that the Stop method _does not_ implicitly write to the
paddy@0 89 // Incrementer, and you should manually call the Sync() method after calling the
paddy@0 90 // Stop method, to store any lingering values that were written after the last
paddy@0 91 // Sync.
paddy@0 92 func (c *Cache) Stop() {
paddy@0 93 c.Lock()
paddy@0 94 defer c.Unlock()
paddy@0 95 close(c.kill)
paddy@0 96 c.ticker.Stop()
paddy@0 97 }
paddy@0 98
paddy@0 99 func (c *Cache) run() {
paddy@0 100 for {
paddy@0 101 select {
paddy@0 102 case <-c.ticker.C:
paddy@0 103 c.Sync()
paddy@0 104 case in := <-c.incoming:
paddy@0 105 c.increment(in.key, in.value)
paddy@0 106 case <-c.kill:
paddy@0 107 return
paddy@0 108 }
paddy@0 109 }
paddy@0 110 }
paddy@0 111
paddy@0 112 func (c *Cache) increment(key string, value int64) {
paddy@0 113 c.Lock()
paddy@0 114 defer c.Unlock()
paddy@0 115 c.values[key] = c.values[key] + value
paddy@0 116 }
paddy@0 117
paddy@0 118 // Increment updates the item that matches key in the Cache, adding value to it.
paddy@0 119 // It key does not exist in the Cache, it is created and set to value.
paddy@0 120 func (c *Cache) Increment(key string, value int64) {
paddy@0 121 c.incoming <- incrementReq{key: key, value: value}
paddy@0 122 }
paddy@0 123
paddy@0 124 type incrementReq struct {
paddy@0 125 key string
paddy@0 126 value int64
paddy@0 127 }
paddy@0 128
paddy@0 129 // MemoryIncrementer is an implementation of the Incrementer interface that stores
paddy@0 130 // values in memory. There is obviously no reason to do this outside of tests and
paddy@0 131 // stubs, and those two reasons are why this type exists. Don't use it for other
paddy@0 132 // things.
paddy@0 133 type MemoryIncrementer struct {
paddy@0 134 sync.RWMutex
paddy@0 135 values map[string]int64
paddy@0 136 }
paddy@0 137
paddy@0 138 // Increment updates the items specified by the keys of the map by adding the
paddy@0 139 // value of the map to the item's stored value.
paddy@0 140 func (m *MemoryIncrementer) Increment(increments map[string]int64) {
paddy@0 141 m.Lock()
paddy@0 142 defer m.Unlock()
paddy@0 143 if m.values == nil {
paddy@0 144 m.values = map[string]int64{}
paddy@0 145 }
paddy@0 146 for k, v := range increments {
paddy@0 147 m.values[k] = m.values[k] + v
paddy@0 148 }
paddy@0 149 }
paddy@0 150
paddy@0 151 // Get is a concurrency-safe helper to return the value for key.
paddy@0 152 func (m *MemoryIncrementer) Get(key string) int64 {
paddy@0 153 m.RLock()
paddy@0 154 defer m.RUnlock()
paddy@0 155 return m.values[key]
paddy@0 156 }