writebehind
2015-03-31
writebehind/writebehind.go
First pass implementation. Create our first implementation of this. Use a map of string=>int64 to store our values. The idea here is that users will create one cache for each metric they want to collect--each column that needs updating, in MySQL land. Release it under the MIT license, because why not? Very proud that the first commit carries 100% test coverage, no golint errors, no go vet errors, and has a benchmark.
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/writebehind.go Tue Mar 31 23:09:02 2015 -0400 1.3 @@ -0,0 +1,156 @@ 1.4 +package writebehind 1.5 + 1.6 +import ( 1.7 + "bytes" 1.8 + "fmt" 1.9 + "sync" 1.10 + "time" 1.11 +) 1.12 + 1.13 +// Incrementer defines an interface that allows items that can be identified 1.14 +// using a string to be incremented by a value that can be stored as an int64. 1.15 +// It only concerns itself with basic addition, as supported by the Go runtime. 1.16 +// math/big.Int and other such types are not supported. 1.17 +type Incrementer interface { 1.18 + Increment(values map[string]int64) 1.19 +} 1.20 + 1.21 +// Cache is a type that is used to store values in-memory, to be synced to a 1.22 +// more permanent store later. It offers no read functionality; the only things 1.23 +// you can do with it are increment the count for an item and sync it to the 1.24 +// more permanent store. The more permanent store is written to using the 1.25 +// Incrementer interface, and must implement that interface. 1.26 +// 1.27 +// Cache is a concurrency-safe type, and is safe for use in multiple goroutines 1.28 +// at once. 1.29 +type Cache struct { 1.30 + incrementer Incrementer 1.31 + duration time.Duration 1.32 + values map[string]int64 1.33 + ticker *time.Ticker 1.34 + incoming chan incrementReq 1.35 + kill chan struct{} 1.36 + sync.Mutex 1.37 +} 1.38 + 1.39 +func (c *Cache) String() string { 1.40 + var b bytes.Buffer 1.41 + fmt.Fprintf(&b, "{") 1.42 + first := true 1.43 + for k, v := range c.values { 1.44 + if !first { 1.45 + fmt.Fprintf(&b, ", ") 1.46 + } 1.47 + fmt.Fprintf(&b, "%q: %v", k, v) 1.48 + first = false 1.49 + } 1.50 + fmt.Fprintf(&b, "}") 1.51 + return b.String() 1.52 + 1.53 +} 1.54 + 1.55 +// NeedsSync returns true if the Cache has any values to sync to the Incrementer, 1.56 +// and false if the Cache is empty and is safe to dispose of. 1.57 +func (c *Cache) NeedsSync() bool { 1.58 + return len(c.values) > 0 1.59 +} 1.60 + 1.61 +// Sync calls through to the Incrementer associated with the Cache, flushing the 1.62 +// Cache's values to the Incrementer. It can be called manually, at any time, 1.63 +// safely, but will be called automatically on the Duration specified in the Cache. 1.64 +func (c *Cache) Sync() { 1.65 + c.Lock() 1.66 + defer c.Unlock() 1.67 + if !c.NeedsSync() { 1.68 + return 1.69 + } 1.70 + c.incrementer.Increment(c.values) 1.71 + c.values = map[string]int64{} 1.72 +} 1.73 + 1.74 +// NewCache creates, instantiates, and returns a cache using the specified values. It 1.75 +// is the only way to obtain a usable Cache instance. 1.76 +func NewCache(incrementer Incrementer, duration time.Duration) *Cache { 1.77 + c := &Cache{ 1.78 + incrementer: incrementer, 1.79 + duration: duration, 1.80 + values: map[string]int64{}, 1.81 + incoming: make(chan incrementReq), 1.82 + kill: make(chan struct{}), 1.83 + ticker: time.NewTicker(duration), 1.84 + } 1.85 + go c.run() 1.86 + return c 1.87 +} 1.88 + 1.89 +// Stop halts the syncing behaviour of the Cache and frees up the resources used 1.90 +// in that syncing behaviour. The Cache's store of values remains intact, however. 1.91 +// It's important to note that the Stop method _does not_ implicitly write to the 1.92 +// Incrementer, and you should manually call the Sync() method after calling the 1.93 +// Stop method, to store any lingering values that were written after the last 1.94 +// Sync. 1.95 +func (c *Cache) Stop() { 1.96 + c.Lock() 1.97 + defer c.Unlock() 1.98 + close(c.kill) 1.99 + c.ticker.Stop() 1.100 +} 1.101 + 1.102 +func (c *Cache) run() { 1.103 + for { 1.104 + select { 1.105 + case <-c.ticker.C: 1.106 + c.Sync() 1.107 + case in := <-c.incoming: 1.108 + c.increment(in.key, in.value) 1.109 + case <-c.kill: 1.110 + return 1.111 + } 1.112 + } 1.113 +} 1.114 + 1.115 +func (c *Cache) increment(key string, value int64) { 1.116 + c.Lock() 1.117 + defer c.Unlock() 1.118 + c.values[key] = c.values[key] + value 1.119 +} 1.120 + 1.121 +// Increment updates the item that matches key in the Cache, adding value to it. 1.122 +// It key does not exist in the Cache, it is created and set to value. 1.123 +func (c *Cache) Increment(key string, value int64) { 1.124 + c.incoming <- incrementReq{key: key, value: value} 1.125 +} 1.126 + 1.127 +type incrementReq struct { 1.128 + key string 1.129 + value int64 1.130 +} 1.131 + 1.132 +// MemoryIncrementer is an implementation of the Incrementer interface that stores 1.133 +// values in memory. There is obviously no reason to do this outside of tests and 1.134 +// stubs, and those two reasons are why this type exists. Don't use it for other 1.135 +// things. 1.136 +type MemoryIncrementer struct { 1.137 + sync.RWMutex 1.138 + values map[string]int64 1.139 +} 1.140 + 1.141 +// Increment updates the items specified by the keys of the map by adding the 1.142 +// value of the map to the item's stored value. 1.143 +func (m *MemoryIncrementer) Increment(increments map[string]int64) { 1.144 + m.Lock() 1.145 + defer m.Unlock() 1.146 + if m.values == nil { 1.147 + m.values = map[string]int64{} 1.148 + } 1.149 + for k, v := range increments { 1.150 + m.values[k] = m.values[k] + v 1.151 + } 1.152 +} 1.153 + 1.154 +// Get is a concurrency-safe helper to return the value for key. 1.155 +func (m *MemoryIncrementer) Get(key string) int64 { 1.156 + m.RLock() 1.157 + defer m.RUnlock() 1.158 + return m.values[key] 1.159 +}