writebehind
2015-03-31
0:85f9751b15ea tip Browse Files
First pass implementation. Create our first implementation of this. Use a map of string=>int64 to store our values. The idea here is that users will create one cache for each metric they want to collect--each column that needs updating, in MySQL land. Release it under the MIT license, because why not? Very proud that the first commit carries 100% test coverage, no golint errors, no go vet errors, and has a benchmark.
LICENSE README.md doc.go writebehind.go writebehind_test.go
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/LICENSE Tue Mar 31 23:09:02 2015 -0400 1.3 @@ -0,0 +1,21 @@ 1.4 +The MIT License (MIT) 1.5 + 1.6 +Copyright (c) Second Bit, LLC 2015 1.7 + 1.8 +Permission is hereby granted, free of charge, to any person obtaining a copy 1.9 +of this software and associated documentation files (the "Software"), to deal 1.10 +in the Software without restriction, including without limitation the rights 1.11 +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 1.12 +copies of the Software, and to permit persons to whom the Software is 1.13 +furnished to do so, subject to the following conditions: 1.14 + 1.15 +The above copyright notice and this permission notice shall be included in 1.16 +all copies or substantial portions of the Software. 1.17 + 1.18 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 1.19 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 1.20 +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 1.21 +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 1.22 +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 1.23 +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 1.24 +THE SOFTWARE.
2.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 2.2 +++ b/README.md Tue Mar 31 23:09:02 2015 -0400 2.3 @@ -0,0 +1,16 @@ 2.4 +# writebehind 2.5 + 2.6 +writebehind provides a simple, in-memory cache for incrementing variables that change 2.7 +frequently. It is concurrency-safe, and syncs back to a more permanent store at a 2.8 +configurable interval. For more information, see the godoc. 2.9 + 2.10 +To report bugs and request features, please use the 2.11 +[writebehind Trello board](https://trello.com/b/aCm2vhw7/writebehind). 2.12 + 2.13 +Licensing information can be found in the LICENSE file. 2.14 + 2.15 +Patches are accepted. Provide a Mercurial repository with the changes you'd like merged, 2.16 +and we will be happy to pull in your changes. If you do not have a Mercurial host, you 2.17 +can get one free at [Bitbucket](https://www.bitbucket.org). 2.18 + 2.19 +If you have any questions, feel free to email [projects@secondbit.org](mailto:projects@secondbit.org).
3.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 3.2 +++ b/doc.go Tue Mar 31 23:09:02 2015 -0400 3.3 @@ -0,0 +1,14 @@ 3.4 +/* 3.5 +Package writebehind provides a cache that periodically syncs to a datastore. 3.6 + 3.7 +The Cache is really only useful for adding numbers that are expected to change 3.8 +too frequently for a full datastore write, e.g. view counts. 3.9 + 3.10 +Each Cache can only be used once; calling its Stop() method renders the Cache 3.11 +unusable, and a new Cache needs to be instantiated. Caches should, wherever 3.12 +possible, have their Stop() methods called, or they risk leaking goroutines. 3.13 + 3.14 +Cache implements the expvar.Var type, and instances can be passed to 3.15 +expvar.Publish to provide insight into the Cache while it's in operation. 3.16 +*/ 3.17 +package writebehind
4.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 4.2 +++ b/writebehind.go Tue Mar 31 23:09:02 2015 -0400 4.3 @@ -0,0 +1,156 @@ 4.4 +package writebehind 4.5 + 4.6 +import ( 4.7 + "bytes" 4.8 + "fmt" 4.9 + "sync" 4.10 + "time" 4.11 +) 4.12 + 4.13 +// Incrementer defines an interface that allows items that can be identified 4.14 +// using a string to be incremented by a value that can be stored as an int64. 4.15 +// It only concerns itself with basic addition, as supported by the Go runtime. 4.16 +// math/big.Int and other such types are not supported. 4.17 +type Incrementer interface { 4.18 + Increment(values map[string]int64) 4.19 +} 4.20 + 4.21 +// Cache is a type that is used to store values in-memory, to be synced to a 4.22 +// more permanent store later. It offers no read functionality; the only things 4.23 +// you can do with it are increment the count for an item and sync it to the 4.24 +// more permanent store. The more permanent store is written to using the 4.25 +// Incrementer interface, and must implement that interface. 4.26 +// 4.27 +// Cache is a concurrency-safe type, and is safe for use in multiple goroutines 4.28 +// at once. 4.29 +type Cache struct { 4.30 + incrementer Incrementer 4.31 + duration time.Duration 4.32 + values map[string]int64 4.33 + ticker *time.Ticker 4.34 + incoming chan incrementReq 4.35 + kill chan struct{} 4.36 + sync.Mutex 4.37 +} 4.38 + 4.39 +func (c *Cache) String() string { 4.40 + var b bytes.Buffer 4.41 + fmt.Fprintf(&b, "{") 4.42 + first := true 4.43 + for k, v := range c.values { 4.44 + if !first { 4.45 + fmt.Fprintf(&b, ", ") 4.46 + } 4.47 + fmt.Fprintf(&b, "%q: %v", k, v) 4.48 + first = false 4.49 + } 4.50 + fmt.Fprintf(&b, "}") 4.51 + return b.String() 4.52 + 4.53 +} 4.54 + 4.55 +// NeedsSync returns true if the Cache has any values to sync to the Incrementer, 4.56 +// and false if the Cache is empty and is safe to dispose of. 4.57 +func (c *Cache) NeedsSync() bool { 4.58 + return len(c.values) > 0 4.59 +} 4.60 + 4.61 +// Sync calls through to the Incrementer associated with the Cache, flushing the 4.62 +// Cache's values to the Incrementer. It can be called manually, at any time, 4.63 +// safely, but will be called automatically on the Duration specified in the Cache. 4.64 +func (c *Cache) Sync() { 4.65 + c.Lock() 4.66 + defer c.Unlock() 4.67 + if !c.NeedsSync() { 4.68 + return 4.69 + } 4.70 + c.incrementer.Increment(c.values) 4.71 + c.values = map[string]int64{} 4.72 +} 4.73 + 4.74 +// NewCache creates, instantiates, and returns a cache using the specified values. It 4.75 +// is the only way to obtain a usable Cache instance. 4.76 +func NewCache(incrementer Incrementer, duration time.Duration) *Cache { 4.77 + c := &Cache{ 4.78 + incrementer: incrementer, 4.79 + duration: duration, 4.80 + values: map[string]int64{}, 4.81 + incoming: make(chan incrementReq), 4.82 + kill: make(chan struct{}), 4.83 + ticker: time.NewTicker(duration), 4.84 + } 4.85 + go c.run() 4.86 + return c 4.87 +} 4.88 + 4.89 +// Stop halts the syncing behaviour of the Cache and frees up the resources used 4.90 +// in that syncing behaviour. The Cache's store of values remains intact, however. 4.91 +// It's important to note that the Stop method _does not_ implicitly write to the 4.92 +// Incrementer, and you should manually call the Sync() method after calling the 4.93 +// Stop method, to store any lingering values that were written after the last 4.94 +// Sync. 4.95 +func (c *Cache) Stop() { 4.96 + c.Lock() 4.97 + defer c.Unlock() 4.98 + close(c.kill) 4.99 + c.ticker.Stop() 4.100 +} 4.101 + 4.102 +func (c *Cache) run() { 4.103 + for { 4.104 + select { 4.105 + case <-c.ticker.C: 4.106 + c.Sync() 4.107 + case in := <-c.incoming: 4.108 + c.increment(in.key, in.value) 4.109 + case <-c.kill: 4.110 + return 4.111 + } 4.112 + } 4.113 +} 4.114 + 4.115 +func (c *Cache) increment(key string, value int64) { 4.116 + c.Lock() 4.117 + defer c.Unlock() 4.118 + c.values[key] = c.values[key] + value 4.119 +} 4.120 + 4.121 +// Increment updates the item that matches key in the Cache, adding value to it. 4.122 +// It key does not exist in the Cache, it is created and set to value. 4.123 +func (c *Cache) Increment(key string, value int64) { 4.124 + c.incoming <- incrementReq{key: key, value: value} 4.125 +} 4.126 + 4.127 +type incrementReq struct { 4.128 + key string 4.129 + value int64 4.130 +} 4.131 + 4.132 +// MemoryIncrementer is an implementation of the Incrementer interface that stores 4.133 +// values in memory. There is obviously no reason to do this outside of tests and 4.134 +// stubs, and those two reasons are why this type exists. Don't use it for other 4.135 +// things. 4.136 +type MemoryIncrementer struct { 4.137 + sync.RWMutex 4.138 + values map[string]int64 4.139 +} 4.140 + 4.141 +// Increment updates the items specified by the keys of the map by adding the 4.142 +// value of the map to the item's stored value. 4.143 +func (m *MemoryIncrementer) Increment(increments map[string]int64) { 4.144 + m.Lock() 4.145 + defer m.Unlock() 4.146 + if m.values == nil { 4.147 + m.values = map[string]int64{} 4.148 + } 4.149 + for k, v := range increments { 4.150 + m.values[k] = m.values[k] + v 4.151 + } 4.152 +} 4.153 + 4.154 +// Get is a concurrency-safe helper to return the value for key. 4.155 +func (m *MemoryIncrementer) Get(key string) int64 { 4.156 + m.RLock() 4.157 + defer m.RUnlock() 4.158 + return m.values[key] 4.159 +}
5.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 5.2 +++ b/writebehind_test.go Tue Mar 31 23:09:02 2015 -0400 5.3 @@ -0,0 +1,71 @@ 5.4 +package writebehind 5.5 + 5.6 +import ( 5.7 + "encoding/json" 5.8 + "testing" 5.9 + "time" 5.10 +) 5.11 + 5.12 +func TestSync(t *testing.T) { 5.13 + m := MemoryIncrementer{} 5.14 + c := NewCache(&m, time.Millisecond) 5.15 + defer c.Stop() 5.16 + c.Sync() // test that unnecessary sync is a no-op 5.17 + c.Increment("test1", 10) 5.18 + c.Increment("test2", 100) 5.19 + time.Sleep(time.Millisecond) 5.20 + if m.Get("test1") != 10 { 5.21 + t.Errorf("Expected test1 to be %d, got %d", 10, m.values["test1"]) 5.22 + } 5.23 + if m.Get("test2") != 100 { 5.24 + t.Errorf("Expected test2 to be %d, got %d", 100, m.values["test2"]) 5.25 + } 5.26 + c.Increment("test1", 10) 5.27 + c.Increment("test2", 100) 5.28 + time.Sleep(time.Millisecond) 5.29 + if m.Get("test1") != 20 { 5.30 + t.Errorf("Expected test1 to be %d, got %d", 20, m.values["test1"]) 5.31 + } 5.32 + if m.Get("test2") != 200 { 5.33 + t.Errorf("Expected test2 to be %d, got %d", 200, m.values["test2"]) 5.34 + } 5.35 +} 5.36 + 5.37 +func TestString(t *testing.T) { 5.38 + m := MemoryIncrementer{} 5.39 + c := NewCache(&m, time.Millisecond) 5.40 + defer c.Stop() 5.41 + c.Increment("test1", 10) 5.42 + c.Increment("test2", 100) 5.43 + c.Increment(`"test3"`, 1000) 5.44 + str := c.String() 5.45 + var result map[string]int64 5.46 + err := json.Unmarshal([]byte(str), &result) 5.47 + if err != nil { 5.48 + t.Errorf("Error unmarshalling returned JSON: %#+v", err) 5.49 + } 5.50 + if result["test1"] != 10 { 5.51 + t.Errorf("Expected test1 to be %d, got %d", 10, result["test1"]) 5.52 + } 5.53 + if result["test2"] != 100 { 5.54 + t.Errorf("Expected test2 to be %d, got %d", 100, result["test2"]) 5.55 + } 5.56 + if result["\"test3\""] != 1000 { 5.57 + t.Errorf("Expected \"test3\" to be %d, got %d", 1000, result["\"test3\""]) 5.58 + } 5.59 +} 5.60 + 5.61 +func BenchmarkSync(b *testing.B) { 5.62 + m := MemoryIncrementer{} 5.63 + c := NewCache(&m, time.Second*5) 5.64 + defer c.Stop() 5.65 + 5.66 + b.RunParallel(func(pb *testing.PB) { 5.67 + for pb.Next() { 5.68 + c.Increment("myvalue", 1) 5.69 + c.Increment("othervalue", 1) 5.70 + c.Increment("other other value", 1) 5.71 + c.Increment("weird value", 1) 5.72 + } 5.73 + }) 5.74 +}