gifs/api

Paddy 2014-10-17 Parent:08ec88016e2f

4:1bbbe113f599 Go to Latest

gifs/api/storage.go

Upload is no longer async, memstorage is parallel-safe. Upload no longer needs to be run async (it can be run inside a goroutine), so it now returns stuff instead of taking a channel as an argument. This will make it easier to implement, as all the async stuff is an abstraction above, and therefore doesn't need to be worried about for each reimplementation. The memstorage type is no longer exported and can now be safely used by multiple goroutines, thanks to the sync package.

History
     1.1 --- a/storage.go	Fri Oct 17 07:13:23 2014 -0400
     1.2 +++ b/storage.go	Fri Oct 17 07:17:29 2014 -0400
     1.3 @@ -4,58 +4,124 @@
     1.4  	"errors"
     1.5  	"io"
     1.6  	"io/ioutil"
     1.7 -	"log"
     1.8  	"net/http"
     1.9 +	"sync"
    1.10  
    1.11  	"code.google.com/p/google-api-go-client/googleapi"
    1.12 -	"code.google.com/p/google-api-go-client/storage/v1beta2"
    1.13 +	"code.google.com/p/google-api-go-client/storage/v1"
    1.14  )
    1.15  
    1.16  var (
    1.17 -	BucketNotFoundError = errors.New("bucket not found")
    1.18 -	BlobNotFoundError   = errors.New("blob not found")
    1.19 +	ErrBucketNotFound = errors.New("bucket not found")
    1.20 +	ErrBlobNotFound   = errors.New("blob not found")
    1.21  )
    1.22  
    1.23  type Storage interface {
    1.24 -	Upload(bucket, tmp string, r io.Reader, errs chan error, done chan struct{})
    1.25 +	Upload(bucket, tmp string, r io.Reader) error
    1.26  	Delete(bucket, tmp string) error
    1.27  	Move(srcBucket, src, dstBucket, dst string) error
    1.28  	Download(bucket, id string, w io.Writer) (int64, error)
    1.29  }
    1.30  
    1.31 +type memstorage struct {
    1.32 +	buckets map[string]membucket
    1.33 +	sync.RWMutex
    1.34 +}
    1.35 +
    1.36 +type membucket map[string][]byte
    1.37 +
    1.38 +func NewMemStorage() *memstorage {
    1.39 +	return &memstorage{
    1.40 +		buckets: map[string]membucket{},
    1.41 +	}
    1.42 +}
    1.43 +
    1.44 +func (m *memstorage) Upload(bucket, tmp string, r io.Reader) error {
    1.45 +	m.Lock()
    1.46 +	defer m.Unlock()
    1.47 +
    1.48 +	if _, ok := m.buckets[bucket]; !ok {
    1.49 +		m.buckets[bucket] = membucket{}
    1.50 +	}
    1.51 +	bytes, err := ioutil.ReadAll(r)
    1.52 +	if err != nil {
    1.53 +		return err
    1.54 +	}
    1.55 +	m.buckets[bucket][tmp] = bytes
    1.56 +	return nil
    1.57 +}
    1.58 +
    1.59 +func (m *memstorage) Delete(bucket, tmp string) error {
    1.60 +	m.Lock()
    1.61 +	defer m.Unlock()
    1.62 +
    1.63 +	delete(m.buckets[bucket], tmp)
    1.64 +	return nil
    1.65 +}
    1.66 +
    1.67 +func (m *memstorage) Move(srcBucket, src, dstBucket, dst string) error {
    1.68 +	m.Lock()
    1.69 +	defer m.Unlock()
    1.70 +
    1.71 +	if _, ok := m.buckets[srcBucket]; !ok {
    1.72 +		return ErrBucketNotFound
    1.73 +	}
    1.74 +	if _, ok := m.buckets[srcBucket][src]; !ok {
    1.75 +		return ErrBlobNotFound
    1.76 +	}
    1.77 +	if _, ok := m.buckets[dstBucket]; !ok {
    1.78 +		m.buckets[dstBucket] = membucket{}
    1.79 +	}
    1.80 +	m.buckets[dstBucket][dst] = m.buckets[srcBucket][src]
    1.81 +	return m.Delete(srcBucket, src)
    1.82 +}
    1.83 +
    1.84 +func (m *memstorage) Download(bucket, id string, w io.Writer) (int64, error) {
    1.85 +	m.RLock()
    1.86 +	defer m.RUnlock()
    1.87 +
    1.88 +	if _, ok := m.buckets[bucket]; !ok {
    1.89 +		return 0, ErrBucketNotFound
    1.90 +	}
    1.91 +	if _, ok := m.buckets[bucket][id]; !ok {
    1.92 +		return 0, ErrBlobNotFound
    1.93 +	}
    1.94 +	n, err := w.Write(m.buckets[bucket][id])
    1.95 +	return int64(n), err
    1.96 +}
    1.97 +
    1.98  type GoogleCloudStorage struct {
    1.99  	*storage.Service
   1.100  }
   1.101  
   1.102 -func (gcs *GoogleCloudStorage) Upload(bucket, tmp string, r io.Reader, errs chan error, done chan struct{}) {
   1.103 -	if errs != nil {
   1.104 -		defer close(errs)
   1.105 -	}
   1.106 -	if done != nil {
   1.107 -		defer close(done)
   1.108 -	}
   1.109 +func (gcs *GoogleCloudStorage) Upload(bucket, tmp string, r io.Reader) error {
   1.110  	object := &storage.Object{Name: tmp}
   1.111  	_, err := gcs.Objects.Insert(bucket, object).Media(r).Do()
   1.112 -	if err != nil && errs != nil {
   1.113 -		errs <- err
   1.114 +	if err != nil {
   1.115 +		// TODO: abstract out this error
   1.116 +		return err
   1.117  	}
   1.118 +	return nil
   1.119  }
   1.120  
   1.121  func (gcs *GoogleCloudStorage) Delete(bucket, tmp string) error {
   1.122 +	// TODO: abstract out this error
   1.123  	return gcs.Objects.Delete(bucket, tmp).Do()
   1.124  }
   1.125  
   1.126  func (gcs *GoogleCloudStorage) Move(srcBucket, src, dstBucket, dst string) error {
   1.127  	_, err := gcs.Objects.Get(dstBucket, dst).Do()
   1.128  	if err == nil {
   1.129 -		go gcs.del(srcBucket, src)
   1.130  		return nil
   1.131  	}
   1.132 +	// ignore 404 errors on the destination; we don't expect it to exist, after all
   1.133  	if e, ok := err.(*googleapi.Error); !ok || e.Code != 404 {
   1.134 +		// TODO: abstract out this error
   1.135  		return err
   1.136  	}
   1.137  	_, err = gcs.Objects.Copy(srcBucket, src, dstBucket, dst, nil).Do()
   1.138  	if err != nil {
   1.139 +		// TODO: abstract out this error
   1.140  		return err
   1.141  	}
   1.142  	objectAcl := &storage.ObjectAccessControl{
   1.143 @@ -63,80 +129,24 @@
   1.144  	}
   1.145  	_, err = gcs.ObjectAccessControls.Insert(dstBucket, dst, objectAcl).Do()
   1.146  	if err != nil {
   1.147 +		// TODO: abstract out this error
   1.148  		return err
   1.149  	}
   1.150 -	go gcs.del(srcBucket, src)
   1.151  	return nil
   1.152  }
   1.153  
   1.154 -func (gcs *GoogleCloudStorage) del(bucket, tmp string) {
   1.155 -	err := gcs.Delete(bucket, tmp)
   1.156 -	if err != nil {
   1.157 -		log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err)
   1.158 -	}
   1.159 -}
   1.160 -
   1.161  func (gcs *GoogleCloudStorage) Download(bucket, id string, w io.Writer) (int64, error) {
   1.162  	res, err := gcs.Objects.Get(bucket, id).Do()
   1.163  	if err != nil {
   1.164 +		// TODO: abstract out this error
   1.165  		return 0, err
   1.166  	}
   1.167  	resp, err := http.Get(res.MediaLink)
   1.168  	if err != nil {
   1.169 +		// TODO: abstract out this error
   1.170  		return 0, err
   1.171  	}
   1.172  	defer resp.Body.Close()
   1.173 +	// TODO: abstract out this error
   1.174  	return io.Copy(w, resp.Body)
   1.175  }
   1.176 -
   1.177 -type Memstorage map[string]Bucket
   1.178 -
   1.179 -type Bucket map[string][]byte
   1.180 -
   1.181 -func (m Memstorage) Upload(bucket, tmp string, r io.Reader, errs chan error, done chan struct{}) {
   1.182 -	if errs != nil {
   1.183 -		defer close(errs)
   1.184 -	}
   1.185 -	if done != nil {
   1.186 -		defer close(done)
   1.187 -	}
   1.188 -	if _, ok := m[bucket]; !ok {
   1.189 -		m[bucket] = make(Bucket)
   1.190 -	}
   1.191 -	bytes, err := ioutil.ReadAll(r)
   1.192 -	if err != nil {
   1.193 -		errs <- err
   1.194 -		return
   1.195 -	}
   1.196 -	m[bucket][tmp] = bytes
   1.197 -}
   1.198 -
   1.199 -func (m Memstorage) Delete(bucket, tmp string) error {
   1.200 -	delete(m[bucket], tmp)
   1.201 -	return nil
   1.202 -}
   1.203 -
   1.204 -func (m Memstorage) Move(srcBucket, src, dstBucket, dst string) error {
   1.205 -	if _, ok := m[srcBucket]; !ok {
   1.206 -		return BucketNotFoundError
   1.207 -	}
   1.208 -	if _, ok := m[srcBucket][src]; !ok {
   1.209 -		return BlobNotFoundError
   1.210 -	}
   1.211 -	if _, ok := m[dstBucket]; !ok {
   1.212 -		m[dstBucket] = make(Bucket)
   1.213 -	}
   1.214 -	m[dstBucket][dst] = m[srcBucket][src]
   1.215 -	return m.Delete(srcBucket, src)
   1.216 -}
   1.217 -
   1.218 -func (m Memstorage) Download(bucket, id string, w io.Writer) (int64, error) {
   1.219 -	if _, ok := m[bucket]; !ok {
   1.220 -		return 0, BucketNotFoundError
   1.221 -	}
   1.222 -	if _, ok := m[bucket][id]; !ok {
   1.223 -		return 0, BlobNotFoundError
   1.224 -	}
   1.225 -	n, err := w.Write(m[bucket][id])
   1.226 -	return int64(n), err
   1.227 -}