writebehind

Paddy 2015-03-31

0:85f9751b15ea tip Browse Files

First pass implementation. Create our first implementation of this. Use a map of string=>int64 to store our values. The idea here is that users will create one cache for each metric they want to collect--each column that needs updating, in MySQL land. Release it under the MIT license, because why not? Very proud that the first commit carries 100% test coverage, no golint errors, no go vet errors, and has a benchmark.

LICENSE README.md doc.go writebehind.go writebehind_test.go

     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/LICENSE	Tue Mar 31 23:09:02 2015 -0400
     1.3 @@ -0,0 +1,21 @@
     1.4 +The MIT License (MIT)
     1.5 +
     1.6 +Copyright (c) Second Bit, LLC 2015
     1.7 +
     1.8 +Permission is hereby granted, free of charge, to any person obtaining a copy
     1.9 +of this software and associated documentation files (the "Software"), to deal
    1.10 +in the Software without restriction, including without limitation the rights
    1.11 +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    1.12 +copies of the Software, and to permit persons to whom the Software is
    1.13 +furnished to do so, subject to the following conditions:
    1.14 +
    1.15 +The above copyright notice and this permission notice shall be included in
    1.16 +all copies or substantial portions of the Software.
    1.17 +
    1.18 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    1.19 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    1.20 +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    1.21 +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    1.22 +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    1.23 +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    1.24 +THE SOFTWARE.
     2.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     2.2 +++ b/README.md	Tue Mar 31 23:09:02 2015 -0400
     2.3 @@ -0,0 +1,16 @@
     2.4 +# writebehind
     2.5 +
     2.6 +writebehind provides a simple, in-memory cache for incrementing variables that change
     2.7 +frequently. It is concurrency-safe, and syncs back to a more permanent store at a
     2.8 +configurable interval. For more information, see the godoc.
     2.9 +
    2.10 +To report bugs and request features, please use the
    2.11 +[writebehind Trello board](https://trello.com/b/aCm2vhw7/writebehind).
    2.12 +
    2.13 +Licensing information can be found in the LICENSE file.
    2.14 +
    2.15 +Patches are accepted. Provide a Mercurial repository with the changes you'd like merged,
    2.16 +and we will be happy to pull in your changes. If you do not have a Mercurial host, you
    2.17 +can get one free at [Bitbucket](https://www.bitbucket.org).
    2.18 +
    2.19 +If you have any questions, feel free to email [projects@secondbit.org](mailto:projects@secondbit.org).
     3.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     3.2 +++ b/doc.go	Tue Mar 31 23:09:02 2015 -0400
     3.3 @@ -0,0 +1,14 @@
     3.4 +/*
     3.5 +Package writebehind provides a cache that periodically syncs to a datastore.
     3.6 +
     3.7 +The Cache is really only useful for adding numbers that are expected to change
     3.8 +too frequently for a full datastore write, e.g. view counts.
     3.9 +
    3.10 +Each Cache can only be used once; calling its Stop() method renders the Cache
    3.11 +unusable, and a new Cache needs to be instantiated. Caches should, wherever
    3.12 +possible, have their Stop() methods called, or they risk leaking goroutines.
    3.13 +
    3.14 +Cache implements the expvar.Var type, and instances can be passed to
    3.15 +expvar.Publish to provide insight into the Cache while it's in operation.
    3.16 +*/
    3.17 +package writebehind
     4.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     4.2 +++ b/writebehind.go	Tue Mar 31 23:09:02 2015 -0400
     4.3 @@ -0,0 +1,156 @@
     4.4 +package writebehind
     4.5 +
     4.6 +import (
     4.7 +	"bytes"
     4.8 +	"fmt"
     4.9 +	"sync"
    4.10 +	"time"
    4.11 +)
    4.12 +
    4.13 +// Incrementer defines an interface that allows items that can be identified
    4.14 +// using a string to be incremented by a value that can be stored as an int64.
    4.15 +// It only concerns itself with basic addition, as supported by the Go runtime.
    4.16 +// math/big.Int and other such types are not supported.
    4.17 +type Incrementer interface {
    4.18 +	Increment(values map[string]int64)
    4.19 +}
    4.20 +
    4.21 +// Cache is a type that is used to store values in-memory, to be synced to a
    4.22 +// more permanent store later. It offers no read functionality; the only things
    4.23 +// you can do with it are increment the count for an item and sync it to the
    4.24 +// more permanent store. The more permanent store is written to using the
    4.25 +// Incrementer interface, and must implement that interface.
    4.26 +//
    4.27 +// Cache is a concurrency-safe type, and is safe for use in multiple goroutines
    4.28 +// at once.
    4.29 +type Cache struct {
    4.30 +	incrementer Incrementer
    4.31 +	duration    time.Duration
    4.32 +	values      map[string]int64
    4.33 +	ticker      *time.Ticker
    4.34 +	incoming    chan incrementReq
    4.35 +	kill        chan struct{}
    4.36 +	sync.Mutex
    4.37 +}
    4.38 +
    4.39 +func (c *Cache) String() string {
    4.40 +	var b bytes.Buffer
    4.41 +	fmt.Fprintf(&b, "{")
    4.42 +	first := true
    4.43 +	for k, v := range c.values {
    4.44 +		if !first {
    4.45 +			fmt.Fprintf(&b, ", ")
    4.46 +		}
    4.47 +		fmt.Fprintf(&b, "%q: %v", k, v)
    4.48 +		first = false
    4.49 +	}
    4.50 +	fmt.Fprintf(&b, "}")
    4.51 +	return b.String()
    4.52 +
    4.53 +}
    4.54 +
    4.55 +// NeedsSync returns true if the Cache has any values to sync to the Incrementer,
    4.56 +// and false if the Cache is empty and is safe to dispose of.
    4.57 +func (c *Cache) NeedsSync() bool {
    4.58 +	return len(c.values) > 0
    4.59 +}
    4.60 +
    4.61 +// Sync calls through to the Incrementer associated with the Cache, flushing the
    4.62 +// Cache's values to the Incrementer. It can be called manually, at any time,
    4.63 +// safely, but will be called automatically on the Duration specified in the Cache.
    4.64 +func (c *Cache) Sync() {
    4.65 +	c.Lock()
    4.66 +	defer c.Unlock()
    4.67 +	if !c.NeedsSync() {
    4.68 +		return
    4.69 +	}
    4.70 +	c.incrementer.Increment(c.values)
    4.71 +	c.values = map[string]int64{}
    4.72 +}
    4.73 +
    4.74 +// NewCache creates, instantiates, and returns a cache using the specified values. It
    4.75 +// is the only way to obtain a usable Cache instance.
    4.76 +func NewCache(incrementer Incrementer, duration time.Duration) *Cache {
    4.77 +	c := &Cache{
    4.78 +		incrementer: incrementer,
    4.79 +		duration:    duration,
    4.80 +		values:      map[string]int64{},
    4.81 +		incoming:    make(chan incrementReq),
    4.82 +		kill:        make(chan struct{}),
    4.83 +		ticker:      time.NewTicker(duration),
    4.84 +	}
    4.85 +	go c.run()
    4.86 +	return c
    4.87 +}
    4.88 +
    4.89 +// Stop halts the syncing behaviour of the Cache and frees up the resources used
    4.90 +// in that syncing behaviour. The Cache's store of values remains intact, however.
    4.91 +// It's important to note that the Stop method _does not_ implicitly write to the
    4.92 +// Incrementer, and you should manually call the Sync() method after calling the
    4.93 +// Stop method, to store any lingering values that were written after the last
    4.94 +// Sync.
    4.95 +func (c *Cache) Stop() {
    4.96 +	c.Lock()
    4.97 +	defer c.Unlock()
    4.98 +	close(c.kill)
    4.99 +	c.ticker.Stop()
   4.100 +}
   4.101 +
   4.102 +func (c *Cache) run() {
   4.103 +	for {
   4.104 +		select {
   4.105 +		case <-c.ticker.C:
   4.106 +			c.Sync()
   4.107 +		case in := <-c.incoming:
   4.108 +			c.increment(in.key, in.value)
   4.109 +		case <-c.kill:
   4.110 +			return
   4.111 +		}
   4.112 +	}
   4.113 +}
   4.114 +
   4.115 +func (c *Cache) increment(key string, value int64) {
   4.116 +	c.Lock()
   4.117 +	defer c.Unlock()
   4.118 +	c.values[key] = c.values[key] + value
   4.119 +}
   4.120 +
   4.121 +// Increment updates the item that matches key in the Cache, adding value to it.
   4.122 +// It key does not exist in the Cache, it is created and set to value.
   4.123 +func (c *Cache) Increment(key string, value int64) {
   4.124 +	c.incoming <- incrementReq{key: key, value: value}
   4.125 +}
   4.126 +
   4.127 +type incrementReq struct {
   4.128 +	key   string
   4.129 +	value int64
   4.130 +}
   4.131 +
   4.132 +// MemoryIncrementer is an implementation of the Incrementer interface that stores
   4.133 +// values in memory. There is obviously no reason to do this outside of tests and
   4.134 +// stubs, and those two reasons are why this type exists. Don't use it for other
   4.135 +// things.
   4.136 +type MemoryIncrementer struct {
   4.137 +	sync.RWMutex
   4.138 +	values map[string]int64
   4.139 +}
   4.140 +
   4.141 +// Increment updates the items specified by the keys of the map by adding the
   4.142 +// value of the map to the item's stored value.
   4.143 +func (m *MemoryIncrementer) Increment(increments map[string]int64) {
   4.144 +	m.Lock()
   4.145 +	defer m.Unlock()
   4.146 +	if m.values == nil {
   4.147 +		m.values = map[string]int64{}
   4.148 +	}
   4.149 +	for k, v := range increments {
   4.150 +		m.values[k] = m.values[k] + v
   4.151 +	}
   4.152 +}
   4.153 +
   4.154 +// Get is a concurrency-safe helper to return the value for key.
   4.155 +func (m *MemoryIncrementer) Get(key string) int64 {
   4.156 +	m.RLock()
   4.157 +	defer m.RUnlock()
   4.158 +	return m.values[key]
   4.159 +}
     5.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     5.2 +++ b/writebehind_test.go	Tue Mar 31 23:09:02 2015 -0400
     5.3 @@ -0,0 +1,71 @@
     5.4 +package writebehind
     5.5 +
     5.6 +import (
     5.7 +	"encoding/json"
     5.8 +	"testing"
     5.9 +	"time"
    5.10 +)
    5.11 +
    5.12 +func TestSync(t *testing.T) {
    5.13 +	m := MemoryIncrementer{}
    5.14 +	c := NewCache(&m, time.Millisecond)
    5.15 +	defer c.Stop()
    5.16 +	c.Sync() // test that unnecessary sync is a no-op
    5.17 +	c.Increment("test1", 10)
    5.18 +	c.Increment("test2", 100)
    5.19 +	time.Sleep(time.Millisecond)
    5.20 +	if m.Get("test1") != 10 {
    5.21 +		t.Errorf("Expected test1 to be %d, got %d", 10, m.values["test1"])
    5.22 +	}
    5.23 +	if m.Get("test2") != 100 {
    5.24 +		t.Errorf("Expected test2 to be %d, got %d", 100, m.values["test2"])
    5.25 +	}
    5.26 +	c.Increment("test1", 10)
    5.27 +	c.Increment("test2", 100)
    5.28 +	time.Sleep(time.Millisecond)
    5.29 +	if m.Get("test1") != 20 {
    5.30 +		t.Errorf("Expected test1 to be %d, got %d", 20, m.values["test1"])
    5.31 +	}
    5.32 +	if m.Get("test2") != 200 {
    5.33 +		t.Errorf("Expected test2 to be %d, got %d", 200, m.values["test2"])
    5.34 +	}
    5.35 +}
    5.36 +
    5.37 +func TestString(t *testing.T) {
    5.38 +	m := MemoryIncrementer{}
    5.39 +	c := NewCache(&m, time.Millisecond)
    5.40 +	defer c.Stop()
    5.41 +	c.Increment("test1", 10)
    5.42 +	c.Increment("test2", 100)
    5.43 +	c.Increment(`"test3"`, 1000)
    5.44 +	str := c.String()
    5.45 +	var result map[string]int64
    5.46 +	err := json.Unmarshal([]byte(str), &result)
    5.47 +	if err != nil {
    5.48 +		t.Errorf("Error unmarshalling returned JSON: %#+v", err)
    5.49 +	}
    5.50 +	if result["test1"] != 10 {
    5.51 +		t.Errorf("Expected test1 to be %d, got %d", 10, result["test1"])
    5.52 +	}
    5.53 +	if result["test2"] != 100 {
    5.54 +		t.Errorf("Expected test2 to be %d, got %d", 100, result["test2"])
    5.55 +	}
    5.56 +	if result["\"test3\""] != 1000 {
    5.57 +		t.Errorf("Expected \"test3\" to be %d, got %d", 1000, result["\"test3\""])
    5.58 +	}
    5.59 +}
    5.60 +
    5.61 +func BenchmarkSync(b *testing.B) {
    5.62 +	m := MemoryIncrementer{}
    5.63 +	c := NewCache(&m, time.Second*5)
    5.64 +	defer c.Stop()
    5.65 +
    5.66 +	b.RunParallel(func(pb *testing.PB) {
    5.67 +		for pb.Next() {
    5.68 +			c.Increment("myvalue", 1)
    5.69 +			c.Increment("othervalue", 1)
    5.70 +			c.Increment("other other value", 1)
    5.71 +			c.Increment("weird value", 1)
    5.72 +		}
    5.73 +	})
    5.74 +}