writebehind

Paddy 2015-03-31

0:85f9751b15ea Go to Latest

writebehind/writebehind.go

First pass implementation. Create our first implementation of this. Use a map of string=>int64 to store our values. The idea here is that users will create one cache for each metric they want to collect--each column that needs updating, in MySQL land. Release it under the MIT license, because why not? Very proud that the first commit carries 100% test coverage, no golint errors, no go vet errors, and has a benchmark.

History
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/writebehind.go	Tue Mar 31 23:09:02 2015 -0400
     1.3 @@ -0,0 +1,156 @@
     1.4 +package writebehind
     1.5 +
     1.6 +import (
     1.7 +	"bytes"
     1.8 +	"fmt"
     1.9 +	"sync"
    1.10 +	"time"
    1.11 +)
    1.12 +
    1.13 +// Incrementer defines an interface that allows items that can be identified
    1.14 +// using a string to be incremented by a value that can be stored as an int64.
    1.15 +// It only concerns itself with basic addition, as supported by the Go runtime.
    1.16 +// math/big.Int and other such types are not supported.
    1.17 +type Incrementer interface {
    1.18 +	Increment(values map[string]int64)
    1.19 +}
    1.20 +
    1.21 +// Cache is a type that is used to store values in-memory, to be synced to a
    1.22 +// more permanent store later. It offers no read functionality; the only things
    1.23 +// you can do with it are increment the count for an item and sync it to the
    1.24 +// more permanent store. The more permanent store is written to using the
    1.25 +// Incrementer interface, and must implement that interface.
    1.26 +//
    1.27 +// Cache is a concurrency-safe type, and is safe for use in multiple goroutines
    1.28 +// at once.
    1.29 +type Cache struct {
    1.30 +	incrementer Incrementer
    1.31 +	duration    time.Duration
    1.32 +	values      map[string]int64
    1.33 +	ticker      *time.Ticker
    1.34 +	incoming    chan incrementReq
    1.35 +	kill        chan struct{}
    1.36 +	sync.Mutex
    1.37 +}
    1.38 +
    1.39 +func (c *Cache) String() string {
    1.40 +	var b bytes.Buffer
    1.41 +	fmt.Fprintf(&b, "{")
    1.42 +	first := true
    1.43 +	for k, v := range c.values {
    1.44 +		if !first {
    1.45 +			fmt.Fprintf(&b, ", ")
    1.46 +		}
    1.47 +		fmt.Fprintf(&b, "%q: %v", k, v)
    1.48 +		first = false
    1.49 +	}
    1.50 +	fmt.Fprintf(&b, "}")
    1.51 +	return b.String()
    1.52 +
    1.53 +}
    1.54 +
    1.55 +// NeedsSync returns true if the Cache has any values to sync to the Incrementer,
    1.56 +// and false if the Cache is empty and is safe to dispose of.
    1.57 +func (c *Cache) NeedsSync() bool {
    1.58 +	return len(c.values) > 0
    1.59 +}
    1.60 +
    1.61 +// Sync calls through to the Incrementer associated with the Cache, flushing the
    1.62 +// Cache's values to the Incrementer. It can be called manually, at any time,
    1.63 +// safely, but will be called automatically on the Duration specified in the Cache.
    1.64 +func (c *Cache) Sync() {
    1.65 +	c.Lock()
    1.66 +	defer c.Unlock()
    1.67 +	if !c.NeedsSync() {
    1.68 +		return
    1.69 +	}
    1.70 +	c.incrementer.Increment(c.values)
    1.71 +	c.values = map[string]int64{}
    1.72 +}
    1.73 +
    1.74 +// NewCache creates, instantiates, and returns a cache using the specified values. It
    1.75 +// is the only way to obtain a usable Cache instance.
    1.76 +func NewCache(incrementer Incrementer, duration time.Duration) *Cache {
    1.77 +	c := &Cache{
    1.78 +		incrementer: incrementer,
    1.79 +		duration:    duration,
    1.80 +		values:      map[string]int64{},
    1.81 +		incoming:    make(chan incrementReq),
    1.82 +		kill:        make(chan struct{}),
    1.83 +		ticker:      time.NewTicker(duration),
    1.84 +	}
    1.85 +	go c.run()
    1.86 +	return c
    1.87 +}
    1.88 +
    1.89 +// Stop halts the syncing behaviour of the Cache and frees up the resources used
    1.90 +// in that syncing behaviour. The Cache's store of values remains intact, however.
    1.91 +// It's important to note that the Stop method _does not_ implicitly write to the
    1.92 +// Incrementer, and you should manually call the Sync() method after calling the
    1.93 +// Stop method, to store any lingering values that were written after the last
    1.94 +// Sync.
    1.95 +func (c *Cache) Stop() {
    1.96 +	c.Lock()
    1.97 +	defer c.Unlock()
    1.98 +	close(c.kill)
    1.99 +	c.ticker.Stop()
   1.100 +}
   1.101 +
   1.102 +func (c *Cache) run() {
   1.103 +	for {
   1.104 +		select {
   1.105 +		case <-c.ticker.C:
   1.106 +			c.Sync()
   1.107 +		case in := <-c.incoming:
   1.108 +			c.increment(in.key, in.value)
   1.109 +		case <-c.kill:
   1.110 +			return
   1.111 +		}
   1.112 +	}
   1.113 +}
   1.114 +
   1.115 +func (c *Cache) increment(key string, value int64) {
   1.116 +	c.Lock()
   1.117 +	defer c.Unlock()
   1.118 +	c.values[key] = c.values[key] + value
   1.119 +}
   1.120 +
   1.121 +// Increment updates the item that matches key in the Cache, adding value to it.
   1.122 +// It key does not exist in the Cache, it is created and set to value.
   1.123 +func (c *Cache) Increment(key string, value int64) {
   1.124 +	c.incoming <- incrementReq{key: key, value: value}
   1.125 +}
   1.126 +
   1.127 +type incrementReq struct {
   1.128 +	key   string
   1.129 +	value int64
   1.130 +}
   1.131 +
   1.132 +// MemoryIncrementer is an implementation of the Incrementer interface that stores
   1.133 +// values in memory. There is obviously no reason to do this outside of tests and
   1.134 +// stubs, and those two reasons are why this type exists. Don't use it for other
   1.135 +// things.
   1.136 +type MemoryIncrementer struct {
   1.137 +	sync.RWMutex
   1.138 +	values map[string]int64
   1.139 +}
   1.140 +
   1.141 +// Increment updates the items specified by the keys of the map by adding the
   1.142 +// value of the map to the item's stored value.
   1.143 +func (m *MemoryIncrementer) Increment(increments map[string]int64) {
   1.144 +	m.Lock()
   1.145 +	defer m.Unlock()
   1.146 +	if m.values == nil {
   1.147 +		m.values = map[string]int64{}
   1.148 +	}
   1.149 +	for k, v := range increments {
   1.150 +		m.values[k] = m.values[k] + v
   1.151 +	}
   1.152 +}
   1.153 +
   1.154 +// Get is a concurrency-safe helper to return the value for key.
   1.155 +func (m *MemoryIncrementer) Get(key string) int64 {
   1.156 +	m.RLock()
   1.157 +	defer m.RUnlock()
   1.158 +	return m.values[key]
   1.159 +}