gifs/api
4:1bbbe113f599 Browse Files
Upload is no longer async, memstorage is parallel-safe. Upload no longer needs to be run async (it can be run inside a goroutine), so it now returns stuff instead of taking a channel as an argument. This will make it easier to implement, as all the async stuff is an abstraction above, and therefore doesn't need to be worried about for each reimplementation. The memstorage type is no longer exported and can now be safely used by multiple goroutines, thanks to the sync package.
1.1 --- a/storage.go Fri Oct 17 07:13:23 2014 -0400 1.2 +++ b/storage.go Fri Oct 17 07:17:29 2014 -0400 1.3 @@ -4,58 +4,124 @@ 1.4 "errors" 1.5 "io" 1.6 "io/ioutil" 1.7 - "log" 1.8 "net/http" 1.9 + "sync" 1.10 1.11 "code.google.com/p/google-api-go-client/googleapi" 1.12 - "code.google.com/p/google-api-go-client/storage/v1beta2" 1.13 + "code.google.com/p/google-api-go-client/storage/v1" 1.14 ) 1.15 1.16 var ( 1.17 - BucketNotFoundError = errors.New("bucket not found") 1.18 - BlobNotFoundError = errors.New("blob not found") 1.19 + ErrBucketNotFound = errors.New("bucket not found") 1.20 + ErrBlobNotFound = errors.New("blob not found") 1.21 ) 1.22 1.23 type Storage interface { 1.24 - Upload(bucket, tmp string, r io.Reader, errs chan error, done chan struct{}) 1.25 + Upload(bucket, tmp string, r io.Reader) error 1.26 Delete(bucket, tmp string) error 1.27 Move(srcBucket, src, dstBucket, dst string) error 1.28 Download(bucket, id string, w io.Writer) (int64, error) 1.29 } 1.30 1.31 +type memstorage struct { 1.32 + buckets map[string]membucket 1.33 + sync.RWMutex 1.34 +} 1.35 + 1.36 +type membucket map[string][]byte 1.37 + 1.38 +func NewMemStorage() *memstorage { 1.39 + return &memstorage{ 1.40 + buckets: map[string]membucket{}, 1.41 + } 1.42 +} 1.43 + 1.44 +func (m *memstorage) Upload(bucket, tmp string, r io.Reader) error { 1.45 + m.Lock() 1.46 + defer m.Unlock() 1.47 + 1.48 + if _, ok := m.buckets[bucket]; !ok { 1.49 + m.buckets[bucket] = membucket{} 1.50 + } 1.51 + bytes, err := ioutil.ReadAll(r) 1.52 + if err != nil { 1.53 + return err 1.54 + } 1.55 + m.buckets[bucket][tmp] = bytes 1.56 + return nil 1.57 +} 1.58 + 1.59 +func (m *memstorage) Delete(bucket, tmp string) error { 1.60 + m.Lock() 1.61 + defer m.Unlock() 1.62 + 1.63 + delete(m.buckets[bucket], tmp) 1.64 + return nil 1.65 +} 1.66 + 1.67 +func (m *memstorage) Move(srcBucket, src, dstBucket, dst string) error { 1.68 + m.Lock() 1.69 + defer m.Unlock() 1.70 + 1.71 + if _, ok := m.buckets[srcBucket]; !ok { 1.72 + return ErrBucketNotFound 1.73 + } 1.74 + if _, ok := m.buckets[srcBucket][src]; !ok { 1.75 + return ErrBlobNotFound 1.76 + } 1.77 + if _, ok := m.buckets[dstBucket]; !ok { 1.78 + m.buckets[dstBucket] = membucket{} 1.79 + } 1.80 + m.buckets[dstBucket][dst] = m.buckets[srcBucket][src] 1.81 + return m.Delete(srcBucket, src) 1.82 +} 1.83 + 1.84 +func (m *memstorage) Download(bucket, id string, w io.Writer) (int64, error) { 1.85 + m.RLock() 1.86 + defer m.RUnlock() 1.87 + 1.88 + if _, ok := m.buckets[bucket]; !ok { 1.89 + return 0, ErrBucketNotFound 1.90 + } 1.91 + if _, ok := m.buckets[bucket][id]; !ok { 1.92 + return 0, ErrBlobNotFound 1.93 + } 1.94 + n, err := w.Write(m.buckets[bucket][id]) 1.95 + return int64(n), err 1.96 +} 1.97 + 1.98 type GoogleCloudStorage struct { 1.99 *storage.Service 1.100 } 1.101 1.102 -func (gcs *GoogleCloudStorage) Upload(bucket, tmp string, r io.Reader, errs chan error, done chan struct{}) { 1.103 - if errs != nil { 1.104 - defer close(errs) 1.105 - } 1.106 - if done != nil { 1.107 - defer close(done) 1.108 - } 1.109 +func (gcs *GoogleCloudStorage) Upload(bucket, tmp string, r io.Reader) error { 1.110 object := &storage.Object{Name: tmp} 1.111 _, err := gcs.Objects.Insert(bucket, object).Media(r).Do() 1.112 - if err != nil && errs != nil { 1.113 - errs <- err 1.114 + if err != nil { 1.115 + // TODO: abstract out this error 1.116 + return err 1.117 } 1.118 + return nil 1.119 } 1.120 1.121 func (gcs *GoogleCloudStorage) Delete(bucket, tmp string) error { 1.122 + // TODO: abstract out this error 1.123 return gcs.Objects.Delete(bucket, tmp).Do() 1.124 } 1.125 1.126 func (gcs *GoogleCloudStorage) Move(srcBucket, src, dstBucket, dst string) error { 1.127 _, err := gcs.Objects.Get(dstBucket, dst).Do() 1.128 if err == nil { 1.129 - go gcs.del(srcBucket, src) 1.130 return nil 1.131 } 1.132 + // ignore 404 errors on the destination; we don't expect it to exist, after all 1.133 if e, ok := err.(*googleapi.Error); !ok || e.Code != 404 { 1.134 + // TODO: abstract out this error 1.135 return err 1.136 } 1.137 _, err = gcs.Objects.Copy(srcBucket, src, dstBucket, dst, nil).Do() 1.138 if err != nil { 1.139 + // TODO: abstract out this error 1.140 return err 1.141 } 1.142 objectAcl := &storage.ObjectAccessControl{ 1.143 @@ -63,80 +129,24 @@ 1.144 } 1.145 _, err = gcs.ObjectAccessControls.Insert(dstBucket, dst, objectAcl).Do() 1.146 if err != nil { 1.147 + // TODO: abstract out this error 1.148 return err 1.149 } 1.150 - go gcs.del(srcBucket, src) 1.151 return nil 1.152 } 1.153 1.154 -func (gcs *GoogleCloudStorage) del(bucket, tmp string) { 1.155 - err := gcs.Delete(bucket, tmp) 1.156 - if err != nil { 1.157 - log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err) 1.158 - } 1.159 -} 1.160 - 1.161 func (gcs *GoogleCloudStorage) Download(bucket, id string, w io.Writer) (int64, error) { 1.162 res, err := gcs.Objects.Get(bucket, id).Do() 1.163 if err != nil { 1.164 + // TODO: abstract out this error 1.165 return 0, err 1.166 } 1.167 resp, err := http.Get(res.MediaLink) 1.168 if err != nil { 1.169 + // TODO: abstract out this error 1.170 return 0, err 1.171 } 1.172 defer resp.Body.Close() 1.173 + // TODO: abstract out this error 1.174 return io.Copy(w, resp.Body) 1.175 } 1.176 - 1.177 -type Memstorage map[string]Bucket 1.178 - 1.179 -type Bucket map[string][]byte 1.180 - 1.181 -func (m Memstorage) Upload(bucket, tmp string, r io.Reader, errs chan error, done chan struct{}) { 1.182 - if errs != nil { 1.183 - defer close(errs) 1.184 - } 1.185 - if done != nil { 1.186 - defer close(done) 1.187 - } 1.188 - if _, ok := m[bucket]; !ok { 1.189 - m[bucket] = make(Bucket) 1.190 - } 1.191 - bytes, err := ioutil.ReadAll(r) 1.192 - if err != nil { 1.193 - errs <- err 1.194 - return 1.195 - } 1.196 - m[bucket][tmp] = bytes 1.197 -} 1.198 - 1.199 -func (m Memstorage) Delete(bucket, tmp string) error { 1.200 - delete(m[bucket], tmp) 1.201 - return nil 1.202 -} 1.203 - 1.204 -func (m Memstorage) Move(srcBucket, src, dstBucket, dst string) error { 1.205 - if _, ok := m[srcBucket]; !ok { 1.206 - return BucketNotFoundError 1.207 - } 1.208 - if _, ok := m[srcBucket][src]; !ok { 1.209 - return BlobNotFoundError 1.210 - } 1.211 - if _, ok := m[dstBucket]; !ok { 1.212 - m[dstBucket] = make(Bucket) 1.213 - } 1.214 - m[dstBucket][dst] = m[srcBucket][src] 1.215 - return m.Delete(srcBucket, src) 1.216 -} 1.217 - 1.218 -func (m Memstorage) Download(bucket, id string, w io.Writer) (int64, error) { 1.219 - if _, ok := m[bucket]; !ok { 1.220 - return 0, BucketNotFoundError 1.221 - } 1.222 - if _, ok := m[bucket][id]; !ok { 1.223 - return 0, BlobNotFoundError 1.224 - } 1.225 - n, err := w.Write(m[bucket][id]) 1.226 - return int64(n), err 1.227 -}