gifs/api
2014-08-27
Child:d3ba1115bfd0
0:08ec88016e2f Browse Files
Spike out functionality and tests. Create our interfaces around storing data and retrieving it. Create an in-memory implementation of our interfaces, for testing and rapid dev purposes. Begin sketching out what our unit tests look like. Create our Google Cloud Storage datastore implementation. Sketch out an idea for a usage collection process to keep track of which users are actually using stuff.
.hgignore LICENSE context.go datastore.go datastore_test.go memstore.go storage.go upload.go usage.go
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/.hgignore Wed Aug 27 22:34:02 2014 -0400 1.3 @@ -0,0 +1,3 @@ 1.4 +.swp 1.5 +.DS_Store 1.6 +gifsd/gifsd
2.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 2.2 +++ b/LICENSE Wed Aug 27 22:34:02 2014 -0400 2.3 @@ -0,0 +1,20 @@ 2.4 +The MIT License (MIT) 2.5 + 2.6 +Copyright (c) 2014 Second Bit, LLC 2.7 + 2.8 +Permission is hereby granted, free of charge, to any person obtaining a copy of 2.9 +this software and associated documentation files (the "Software"), to deal in 2.10 +the Software without restriction, including without limitation the rights to 2.11 +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of 2.12 +the Software, and to permit persons to whom the Software is furnished to do so, 2.13 +subject to the following conditions: 2.14 + 2.15 +The above copyright notice and this permission notice shall be included in all 2.16 +copies or substantial portions of the Software. 2.17 + 2.18 +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 2.19 +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS 2.20 +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR 2.21 +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER 2.22 +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 2.23 +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 3.2 +++ b/context.go Wed Aug 27 22:34:02 2014 -0400 3.3 @@ -0,0 +1,43 @@ 3.4 +package api 3.5 + 3.6 +import ( 3.7 + "code.google.com/p/goauth2/oauth/jwt" 3.8 + "code.google.com/p/google-api-go-client/storage/v1beta2" 3.9 + _ "github.com/go-sql-driver/mysql" 3.10 +) 3.11 + 3.12 +type Context struct { 3.13 + Storage Storage 3.14 + Datastore Datastore 3.15 + UsageTracker *UsageTracker 3.16 + Bucket string 3.17 + RootDomain string 3.18 +} 3.19 + 3.20 +func NewMemStorage() Storage { 3.21 + return Memstorage{} 3.22 +} 3.23 + 3.24 +func NewMemDatastore() Datastore { 3.25 + return &Memstore{ 3.26 + collections: map[string]Collection{}, 3.27 + domains: map[string]Domain{}, 3.28 + items: map[string]Item{}, 3.29 + users: map[string]User{}, 3.30 + } 3.31 +} 3.32 + 3.33 +func NewGCSStorage(gcsClientEmail, gcsTokenURI string, gcsPemBytes []byte) (Storage, error) { 3.34 + t := jwt.NewToken(gcsClientEmail, storage.DevstorageFull_controlScope, gcsPemBytes) 3.35 + t.ClaimSet.Aud = gcsTokenURI 3.36 + transport, err := jwt.NewTransport(t) 3.37 + if err != nil { 3.38 + return nil, err 3.39 + } 3.40 + client := transport.Client() 3.41 + gcsService, err := storage.New(client) 3.42 + if err != nil { 3.43 + return nil, err 3.44 + } 3.45 + return &GoogleCloudStorage{gcsService}, nil 3.46 +}
4.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 4.2 +++ b/datastore.go Wed Aug 27 22:34:02 2014 -0400 4.3 @@ -0,0 +1,103 @@ 4.4 +package api 4.5 + 4.6 +import ( 4.7 + "errors" 4.8 + "time" 4.9 + "unicode" 4.10 + 4.11 + "code.google.com/p/go.text/unicode/norm" 4.12 + "secondbit.org/uuid" 4.13 +) 4.14 + 4.15 +var ( 4.16 + CollectionNotFoundError = errors.New("collection not found") 4.17 + DomainNotFoundError = errors.New("domain not found") 4.18 + DomainAlreadyExistsError = errors.New("domain already attached to a collection") 4.19 + ItemNotFoundError = errors.New("item not found") 4.20 + ItemAlreadyExistsError = errors.New("item already exists") 4.21 + UserNotFoundError = errors.New("user not found") 4.22 + UserAlreadyExistsError = errors.New("user already exists") 4.23 + LoginNotFoundError = errors.New("login not found") 4.24 + LoginAlreadyExistsError = errors.New("login already exists") 4.25 +) 4.26 + 4.27 +type Datastore interface { 4.28 + CreateCollection(c Collection) error 4.29 + UpdateCollection(c Collection) error 4.30 + GetCollectionByDomain(domain string) (Collection, error) 4.31 + GetCollectionByID(id uuid.ID) (Collection, error) 4.32 + GetCollectionsByUser(id uuid.ID) ([]Collection, error) 4.33 + AddDomainToCollection(id uuid.ID, domain string) error 4.34 + RemoveDomainFromCollection(id uuid.ID, domain string) error 4.35 + GetDomainsByCollection(id uuid.ID) ([]Domain, error) 4.36 + DeleteCollection(c Collection) error 4.37 + 4.38 + GetItemsByCollectionDomain(domain string, num, offset int) ([]Item, error) 4.39 + GetItemsByCollectionID(id uuid.ID, num, offset int) ([]Item, error) 4.40 + AddItemToCollection(id uuid.ID, item Item) error 4.41 + GetItemByName(collectionID uuid.ID, name string) (Item, error) 4.42 + DeleteItem(item Item) error 4.43 + 4.44 + GetUserByID(id uuid.ID) (User, error) 4.45 + GetUserByLogin(loginType, value, passphrase string) (User, error) 4.46 + AddLoginToUser(login Login, user uuid.ID) error 4.47 + RemoveLoginFromUser(loginType, value string, user uuid.ID) error 4.48 + GetLoginsByUser(user uuid.ID) ([]Login, error) 4.49 + CreateUser(u User) error 4.50 + UpdateUser(u User) error 4.51 + DeleteUser(u User) error 4.52 +} 4.53 + 4.54 +func slugify(in string) string { 4.55 + buf := make([]rune, 0, len(in)) 4.56 + needsDash := false 4.57 + for _, r := range norm.NFKD.String(in) { 4.58 + if unicode.IsSpace(r) || unicode.IsPunct(r) { 4.59 + if needsDash { 4.60 + buf = append(buf, '-') 4.61 + needsDash = false 4.62 + } 4.63 + } else { 4.64 + buf = append(buf, r) 4.65 + needsDash = true 4.66 + } 4.67 + } 4.68 + return string(buf) 4.69 +} 4.70 + 4.71 +type Collection struct { 4.72 + ID uuid.ID 4.73 + Name string 4.74 + Owner uuid.ID 4.75 + Created time.Time 4.76 +} 4.77 + 4.78 +type Domain struct { 4.79 + Domain string 4.80 + CollectionID uuid.ID 4.81 + Created time.Time 4.82 +} 4.83 + 4.84 +type Item struct { 4.85 + Blob string 4.86 + Bucket string 4.87 + CollectionID uuid.ID 4.88 + Name string 4.89 +} 4.90 + 4.91 +type User struct { 4.92 + ID uuid.ID 4.93 + Name string 4.94 + Passphrase string 4.95 + Email string 4.96 + Created time.Time 4.97 + LastSeen time.Time 4.98 +} 4.99 + 4.100 +type Login struct { 4.101 + Type string 4.102 + Value string 4.103 + UserID uuid.ID 4.104 + Created time.Time 4.105 + LastUsed time.Time 4.106 +}
5.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 5.2 +++ b/datastore_test.go Wed Aug 27 22:34:02 2014 -0400 5.3 @@ -0,0 +1,49 @@ 5.4 +package api 5.5 + 5.6 +import ( 5.7 + "testing" 5.8 + "time" 5.9 + 5.10 + "secondbit.org/uuid" 5.11 +) 5.12 + 5.13 +var datastores = []Datastore{NewMemstore()} 5.14 + 5.15 +func compareCollections(expectation, result Collection) (field string, expected, got interface{}) { 5.16 + if !expectation.ID.Equal(result.ID) { 5.17 + return "ID", expectation.ID, result.ID 5.18 + } 5.19 + if expectation.Name != result.Name { 5.20 + return "name", expectation.Name, result.Name 5.21 + } 5.22 + if !expectation.Owner.Equal(result.Owner) { 5.23 + return "owner", expectation.Owner, result.Owner 5.24 + } 5.25 + if !expectation.Created.Equal(result.Created) { 5.26 + return "created", expectation.Created, result.Created 5.27 + } 5.28 + return "", nil, nil 5.29 +} 5.30 + 5.31 +func TestCreateCollection(t *testing.T) { 5.32 + for pos, datastore := range datastores { 5.33 + expectation := Collection{ 5.34 + ID: uuid.NewID(), 5.35 + Name: "Test Collection", 5.36 + Owner: uuid.NewID(), 5.37 + Created: time.Now(), 5.38 + } 5.39 + err := datastore.CreateCollection(expectation) 5.40 + if err != nil { 5.41 + t.Errorf("Error creating collection in datastore %d: %s\n", pos, err) 5.42 + } 5.43 + result, err := datastore.GetCollectionByID(expectation.ID) 5.44 + if err != nil { 5.45 + t.Errorf("Error retrieving collection in datastore %d: %s\n", pos, err) 5.46 + } 5.47 + field, exp, got := compareCollections(expectation, result) 5.48 + if field != "" { 5.49 + t.Errorf("Collection did not meet expectations for datastore %d. Expected %v for %s, got %v.", pos, exp, field, got) 5.50 + } 5.51 + } 5.52 +}
6.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 6.2 +++ b/memstore.go Wed Aug 27 22:34:02 2014 -0400 6.3 @@ -0,0 +1,228 @@ 6.4 +package api 6.5 + 6.6 +import ( 6.7 + "time" 6.8 + 6.9 + "secondbit.org/uuid" 6.10 +) 6.11 + 6.12 +type Memstore struct { 6.13 + collections map[string]Collection 6.14 + domains map[string]Domain 6.15 + items map[string]Item 6.16 + users map[string]User 6.17 + logins map[string]Login 6.18 +} 6.19 + 6.20 +func NewMemstore() Memstore { 6.21 + return Memstore{ 6.22 + collections: map[string]Collection{}, 6.23 + domains: map[string]Domain{}, 6.24 + items: map[string]Item{}, 6.25 + users: map[string]User{}, 6.26 + logins: map[string]Login{}, 6.27 + } 6.28 +} 6.29 + 6.30 +func (m Memstore) CreateCollection(c Collection) error { 6.31 + m.collections[c.ID.String()] = c 6.32 + return nil 6.33 +} 6.34 + 6.35 +func (m Memstore) UpdateCollection(c Collection) error { 6.36 + if _, ok := m.collections[c.ID.String()]; !ok { 6.37 + return CollectionNotFoundError 6.38 + } 6.39 + m.collections[c.ID.String()] = c 6.40 + return nil 6.41 +} 6.42 + 6.43 +func (m Memstore) GetCollectionByDomain(domain string) (Collection, error) { 6.44 + d, ok := m.domains[domain] 6.45 + if !ok { 6.46 + return Collection{}, CollectionNotFoundError 6.47 + } 6.48 + return m.GetCollectionByID(d.CollectionID) 6.49 +} 6.50 + 6.51 +func (m Memstore) GetCollectionByID(id uuid.ID) (Collection, error) { 6.52 + if c, ok := m.collections[id.String()]; ok { 6.53 + return c, nil 6.54 + } 6.55 + return Collection{}, CollectionNotFoundError 6.56 +} 6.57 + 6.58 +func (m Memstore) GetCollectionsByUser(id uuid.ID) ([]Collection, error) { 6.59 + collections := []Collection{} 6.60 + for _, c := range m.collections { 6.61 + if c.Owner.Equal(id) { 6.62 + collections = append(collections, c) 6.63 + } 6.64 + } 6.65 + return collections, nil 6.66 +} 6.67 + 6.68 +func (m Memstore) AddDomainToCollection(id uuid.ID, domain string) error { 6.69 + if _, ok := m.domains[domain]; ok { 6.70 + return DomainAlreadyExistsError 6.71 + } 6.72 + m.domains[domain] = Domain{ 6.73 + Domain: domain, 6.74 + CollectionID: id, 6.75 + Created: time.Now(), 6.76 + } 6.77 + return nil 6.78 +} 6.79 + 6.80 +func (m Memstore) RemoveDomainFromCollection(id uuid.ID, domain string) error { 6.81 + if _, ok := m.domains[domain]; !ok { 6.82 + return DomainNotFoundError 6.83 + } 6.84 + delete(m.domains, domain) 6.85 + return nil 6.86 +} 6.87 + 6.88 +func (m Memstore) GetDomainsByCollection(id uuid.ID) ([]Domain, error) { 6.89 + domains := []Domain{} 6.90 + for _, d := range m.domains { 6.91 + if d.CollectionID.Equal(id) { 6.92 + domains = append(domains, d) 6.93 + } 6.94 + } 6.95 + return domains, nil 6.96 +} 6.97 + 6.98 +func (m Memstore) DeleteCollection(c Collection) error { 6.99 + if _, ok := m.collections[c.ID.String()]; !ok { 6.100 + return CollectionNotFoundError 6.101 + } 6.102 + delete(m.collections, c.ID.String()) 6.103 + return nil 6.104 +} 6.105 + 6.106 +func (m Memstore) GetItemsByCollectionDomain(domain string, num, offset int) ([]Item, error) { 6.107 + collection, err := m.GetCollectionByDomain(domain) 6.108 + if err != nil { 6.109 + return []Item{}, err 6.110 + } 6.111 + return m.GetItemsByCollectionID(collection.ID, num, offset) 6.112 +} 6.113 + 6.114 +func (m Memstore) GetItemsByCollectionID(id uuid.ID, num, offset int) ([]Item, error) { 6.115 + if _, ok := m.collections[id.String()]; !ok { 6.116 + return []Item{}, CollectionNotFoundError 6.117 + } 6.118 + items := []Item{} 6.119 + for _, item := range m.items { 6.120 + if item.CollectionID.Equal(id) { 6.121 + items = append(items, item) 6.122 + } 6.123 + } 6.124 + if len(items) < offset { 6.125 + return []Item{}, nil 6.126 + } 6.127 + end := offset + num 6.128 + if len(items) < end { 6.129 + end = len(items) 6.130 + } 6.131 + return items[offset:end], nil 6.132 +} 6.133 + 6.134 +func (m Memstore) AddItemToCollection(id uuid.ID, item Item) error { 6.135 + if _, ok := m.collections[id.String()]; !ok { 6.136 + return CollectionNotFoundError 6.137 + } 6.138 + if _, ok := m.items[id.String()+"/"+item.Name]; ok { 6.139 + return ItemAlreadyExistsError 6.140 + } 6.141 + item.CollectionID = id 6.142 + m.items[id.String()+"/"+item.Name] = item 6.143 + return nil 6.144 +} 6.145 + 6.146 +func (m Memstore) GetItemByName(collectionID uuid.ID, name string) (Item, error) { 6.147 + if _, ok := m.collections[collectionID.String()]; !ok { 6.148 + return Item{}, CollectionNotFoundError 6.149 + } 6.150 + if item, ok := m.items[collectionID.String()+"/"+name]; ok { 6.151 + return item, nil 6.152 + } 6.153 + return Item{}, ItemNotFoundError 6.154 +} 6.155 + 6.156 +func (m Memstore) DeleteItem(item Item) error { 6.157 + if _, ok := m.items[item.CollectionID.String()+"/"+item.Name]; !ok { 6.158 + return ItemNotFoundError 6.159 + } 6.160 + delete(m.items, item.CollectionID.String()+"/"+item.Name) 6.161 + return nil 6.162 +} 6.163 + 6.164 +func (m Memstore) GetUserByID(id uuid.ID) (User, error) { 6.165 + if u, ok := m.users[id.String()]; ok { 6.166 + return u, nil 6.167 + } 6.168 + return User{}, UserNotFoundError 6.169 +} 6.170 + 6.171 +func (m Memstore) GetUserByLogin(loginType, value, passphrase string) (User, error) { 6.172 + login, ok := m.logins[loginType+":"+value] 6.173 + if !ok { 6.174 + return User{}, UserNotFoundError 6.175 + } 6.176 + user, err := m.GetUserByID(login.UserID) 6.177 + if err != nil { 6.178 + return user, err 6.179 + } 6.180 + if user.Passphrase != passphrase { 6.181 + return User{}, UserNotFoundError 6.182 + } 6.183 + return user, nil 6.184 +} 6.185 + 6.186 +func (m Memstore) CreateUser(u User) error { 6.187 + m.users[u.ID.String()] = u 6.188 + return nil 6.189 +} 6.190 + 6.191 +func (m Memstore) AddLoginToUser(login Login, user uuid.ID) error { 6.192 + if _, ok := m.logins[login.Type+":"+login.Value]; ok { 6.193 + return LoginAlreadyExistsError 6.194 + } 6.195 + m.logins[login.Type+":"+login.Value] = login 6.196 + return nil 6.197 +} 6.198 + 6.199 +func (m Memstore) RemoveLoginFromUser(loginType, value string, user uuid.ID) error { 6.200 + if _, ok := m.logins[loginType+":"+value]; !ok { 6.201 + return LoginNotFoundError 6.202 + } 6.203 + delete(m.logins, loginType+":"+value) 6.204 + return nil 6.205 +} 6.206 + 6.207 +func (m Memstore) GetLoginsByUser(user uuid.ID) ([]Login, error) { 6.208 + logins := []Login{} 6.209 + for _, login := range m.logins { 6.210 + if login.UserID.Equal(user) { 6.211 + logins = append(logins, login) 6.212 + } 6.213 + } 6.214 + return logins, nil 6.215 +} 6.216 + 6.217 +func (m Memstore) UpdateUser(u User) error { 6.218 + if _, ok := m.users[u.ID.String()]; !ok { 6.219 + return UserNotFoundError 6.220 + } 6.221 + m.users[u.ID.String()] = u 6.222 + return nil 6.223 +} 6.224 + 6.225 +func (m Memstore) DeleteUser(u User) error { 6.226 + if _, ok := m.users[u.ID.String()]; !ok { 6.227 + return UserNotFoundError 6.228 + } 6.229 + delete(m.users, u.ID.String()) 6.230 + return nil 6.231 +}
7.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 7.2 +++ b/storage.go Wed Aug 27 22:34:02 2014 -0400 7.3 @@ -0,0 +1,142 @@ 7.4 +package api 7.5 + 7.6 +import ( 7.7 + "errors" 7.8 + "io" 7.9 + "io/ioutil" 7.10 + "log" 7.11 + "net/http" 7.12 + 7.13 + "code.google.com/p/google-api-go-client/googleapi" 7.14 + "code.google.com/p/google-api-go-client/storage/v1beta2" 7.15 +) 7.16 + 7.17 +var ( 7.18 + BucketNotFoundError = errors.New("bucket not found") 7.19 + BlobNotFoundError = errors.New("blob not found") 7.20 +) 7.21 + 7.22 +type Storage interface { 7.23 + Upload(bucket, tmp string, r io.Reader, errs chan error, done chan struct{}) 7.24 + Delete(bucket, tmp string) error 7.25 + Move(srcBucket, src, dstBucket, dst string) error 7.26 + Download(bucket, id string, w io.Writer) (int64, error) 7.27 +} 7.28 + 7.29 +type GoogleCloudStorage struct { 7.30 + *storage.Service 7.31 +} 7.32 + 7.33 +func (gcs *GoogleCloudStorage) Upload(bucket, tmp string, r io.Reader, errs chan error, done chan struct{}) { 7.34 + if errs != nil { 7.35 + defer close(errs) 7.36 + } 7.37 + if done != nil { 7.38 + defer close(done) 7.39 + } 7.40 + object := &storage.Object{Name: tmp} 7.41 + _, err := gcs.Objects.Insert(bucket, object).Media(r).Do() 7.42 + if err != nil && errs != nil { 7.43 + errs <- err 7.44 + } 7.45 +} 7.46 + 7.47 +func (gcs *GoogleCloudStorage) Delete(bucket, tmp string) error { 7.48 + return gcs.Objects.Delete(bucket, tmp).Do() 7.49 +} 7.50 + 7.51 +func (gcs *GoogleCloudStorage) Move(srcBucket, src, dstBucket, dst string) error { 7.52 + _, err := gcs.Objects.Get(dstBucket, dst).Do() 7.53 + if err == nil { 7.54 + go gcs.del(srcBucket, src) 7.55 + return nil 7.56 + } 7.57 + if e, ok := err.(*googleapi.Error); !ok || e.Code != 404 { 7.58 + return err 7.59 + } 7.60 + _, err = gcs.Objects.Copy(srcBucket, src, dstBucket, dst, nil).Do() 7.61 + if err != nil { 7.62 + return err 7.63 + } 7.64 + objectAcl := &storage.ObjectAccessControl{ 7.65 + Bucket: dstBucket, Entity: "allUsers", Object: dst, Role: "READER", 7.66 + } 7.67 + _, err = gcs.ObjectAccessControls.Insert(dstBucket, dst, objectAcl).Do() 7.68 + if err != nil { 7.69 + return err 7.70 + } 7.71 + go gcs.del(srcBucket, src) 7.72 + return nil 7.73 +} 7.74 + 7.75 +func (gcs *GoogleCloudStorage) del(bucket, tmp string) { 7.76 + err := gcs.Delete(bucket, tmp) 7.77 + if err != nil { 7.78 + log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err) 7.79 + } 7.80 +} 7.81 + 7.82 +func (gcs *GoogleCloudStorage) Download(bucket, id string, w io.Writer) (int64, error) { 7.83 + res, err := gcs.Objects.Get(bucket, id).Do() 7.84 + if err != nil { 7.85 + return 0, err 7.86 + } 7.87 + resp, err := http.Get(res.MediaLink) 7.88 + if err != nil { 7.89 + return 0, err 7.90 + } 7.91 + defer resp.Body.Close() 7.92 + return io.Copy(w, resp.Body) 7.93 +} 7.94 + 7.95 +type Memstorage map[string]Bucket 7.96 + 7.97 +type Bucket map[string][]byte 7.98 + 7.99 +func (m Memstorage) Upload(bucket, tmp string, r io.Reader, errs chan error, done chan struct{}) { 7.100 + if errs != nil { 7.101 + defer close(errs) 7.102 + } 7.103 + if done != nil { 7.104 + defer close(done) 7.105 + } 7.106 + if _, ok := m[bucket]; !ok { 7.107 + m[bucket] = make(Bucket) 7.108 + } 7.109 + bytes, err := ioutil.ReadAll(r) 7.110 + if err != nil { 7.111 + errs <- err 7.112 + return 7.113 + } 7.114 + m[bucket][tmp] = bytes 7.115 +} 7.116 + 7.117 +func (m Memstorage) Delete(bucket, tmp string) error { 7.118 + delete(m[bucket], tmp) 7.119 + return nil 7.120 +} 7.121 + 7.122 +func (m Memstorage) Move(srcBucket, src, dstBucket, dst string) error { 7.123 + if _, ok := m[srcBucket]; !ok { 7.124 + return BucketNotFoundError 7.125 + } 7.126 + if _, ok := m[srcBucket][src]; !ok { 7.127 + return BlobNotFoundError 7.128 + } 7.129 + if _, ok := m[dstBucket]; !ok { 7.130 + m[dstBucket] = make(Bucket) 7.131 + } 7.132 + m[dstBucket][dst] = m[srcBucket][src] 7.133 + return m.Delete(srcBucket, src) 7.134 +} 7.135 + 7.136 +func (m Memstorage) Download(bucket, id string, w io.Writer) (int64, error) { 7.137 + if _, ok := m[bucket]; !ok { 7.138 + return 0, BucketNotFoundError 7.139 + } 7.140 + if _, ok := m[bucket][id]; !ok { 7.141 + return 0, BlobNotFoundError 7.142 + } 7.143 + n, err := w.Write(m[bucket][id]) 7.144 + return int64(n), err 7.145 +}
8.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 8.2 +++ b/upload.go Wed Aug 27 22:34:02 2014 -0400 8.3 @@ -0,0 +1,124 @@ 8.4 +package api 8.5 + 8.6 +import ( 8.7 + "crypto/sha1" 8.8 + "encoding/hex" 8.9 + "io" 8.10 + "log" 8.11 + 8.12 + "secondbit.org/uuid" 8.13 +) 8.14 + 8.15 +func Upload(collectionID uuid.ID, r io.Reader, c Context) (Item, int64, error) { 8.16 + hashReader, hashWriter := io.Pipe() 8.17 + uploadReader, uploadWriter := io.Pipe() 8.18 + m := io.MultiWriter(hashWriter, uploadWriter) 8.19 + 8.20 + hashChan := make(chan string) 8.21 + hashError := make(chan error) 8.22 + hashDone := make(chan struct{}) 8.23 + 8.24 + cpDone := make(chan struct{}) 8.25 + cpErr := make(chan error) 8.26 + 8.27 + uploadDone := make(chan struct{}) 8.28 + uploadErr := make(chan error) 8.29 + 8.30 + n := make(chan int64) 8.31 + 8.32 + var bytesWritten int64 8.33 + tmp := uuid.NewID().String() 8.34 + tmp = "tmp/" + tmp 8.35 + 8.36 + go hash(hashReader, hashChan, hashError, hashDone) 8.37 + go asyncCopy(m, r, n, cpErr, cpDone) 8.38 + if c.Storage != nil { 8.39 + go c.Storage.Upload(c.Bucket, tmp, uploadReader, uploadErr, uploadDone) 8.40 + } 8.41 + 8.42 + select { 8.43 + case err := <-hashError: 8.44 + if err != nil { 8.45 + hashWriter.CloseWithError(err) 8.46 + uploadWriter.CloseWithError(err) 8.47 + return Item{}, 0, err 8.48 + } else { 8.49 + hashWriter.Close() 8.50 + uploadWriter.Close() 8.51 + } 8.52 + case err := <-cpErr: 8.53 + if err != nil { 8.54 + hashWriter.CloseWithError(err) 8.55 + uploadWriter.CloseWithError(err) 8.56 + return Item{}, 0, err 8.57 + } else { 8.58 + hashWriter.Close() 8.59 + uploadWriter.Close() 8.60 + } 8.61 + case err := <-uploadErr: 8.62 + if err != nil { 8.63 + hashWriter.CloseWithError(err) 8.64 + uploadWriter.CloseWithError(err) 8.65 + return Item{}, 0, err 8.66 + } else { 8.67 + hashWriter.Close() 8.68 + uploadWriter.Close() 8.69 + } 8.70 + case <-cpDone: 8.71 + hashWriter.Close() 8.72 + uploadWriter.Close() 8.73 + } 8.74 + <-cpDone 8.75 + <-hashDone 8.76 + if c.Storage != nil { 8.77 + <-uploadDone 8.78 + } 8.79 + bytesWritten = <-n 8.80 + finalLocation := <-hashChan 8.81 + if c.Storage != nil { 8.82 + err := c.Storage.Move(c.Bucket, tmp, c.Bucket, finalLocation) 8.83 + if err != nil { 8.84 + return Item{}, 0, err 8.85 + } 8.86 + } 8.87 + return Item{ 8.88 + Blob: finalLocation, 8.89 + Bucket: c.Bucket, 8.90 + CollectionID: collectionID, 8.91 + }, bytesWritten, nil 8.92 +} 8.93 + 8.94 +func hash(r io.Reader, resp chan string, errs chan error, done chan struct{}) { 8.95 + if resp != nil { 8.96 + defer close(resp) 8.97 + } 8.98 + h := sha1.New() 8.99 + go asyncCopy(h, r, nil, errs, done) 8.100 + <-done 8.101 + resp <- hex.EncodeToString(h.Sum(nil)) 8.102 +} 8.103 + 8.104 +func del(bucket, tmp string, c Context) { 8.105 + err := c.Storage.Delete(bucket, tmp) 8.106 + if err != nil { 8.107 + log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err) 8.108 + } 8.109 +} 8.110 + 8.111 +func asyncCopy(dst io.Writer, src io.Reader, n chan int64, errs chan error, done chan struct{}) { 8.112 + if errs != nil { 8.113 + defer close(errs) 8.114 + } 8.115 + if done != nil { 8.116 + defer close(done) 8.117 + } 8.118 + written, err := io.Copy(dst, src) 8.119 + if errs != nil && err != nil { 8.120 + errs <- err 8.121 + } 8.122 + if n != nil { 8.123 + go func(n chan int64, written int64) { 8.124 + n <- written 8.125 + }(n, written) 8.126 + } 8.127 +}
9.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 9.2 +++ b/usage.go Wed Aug 27 22:34:02 2014 -0400 9.3 @@ -0,0 +1,87 @@ 9.4 +package api 9.5 + 9.6 +import ( 9.7 + "sync" 9.8 +) 9.9 + 9.10 +func NewUsageTracker() *UsageTracker { 9.11 + return &UsageTracker{ 9.12 + usages: make(map[string]*Usage), 9.13 + } 9.14 +} 9.15 + 9.16 +type UsageTracker struct { 9.17 + usages map[string]*Usage 9.18 + sync.Mutex 9.19 +} 9.20 + 9.21 +func (u *UsageTracker) TrackUploads(id string) (bytes, requests chan int64) { 9.22 + u.Lock() 9.23 + defer u.Unlock() 9.24 + if _, ok := u.usages[id]; !ok { 9.25 + u.usages[id] = &Usage{ 9.26 + UploadBytesChan: make(chan int64), 9.27 + DownloadBytesChan: make(chan int64), 9.28 + UploadRequestsChan: make(chan int64), 9.29 + DownloadRequestsChan: make(chan int64), 9.30 + } 9.31 + go u.usages[id].collect() 9.32 + } 9.33 + return u.usages[id].UploadBytesChan, u.usages[id].UploadRequestsChan 9.34 +} 9.35 + 9.36 +func (u *UsageTracker) TrackDownloads(id string) (bytes, requests chan int64) { 9.37 + u.Lock() 9.38 + defer u.Unlock() 9.39 + if _, ok := u.usages[id]; !ok { 9.40 + u.usages[id] = &Usage{ 9.41 + UploadBytesChan: make(chan int64), 9.42 + DownloadBytesChan: make(chan int64), 9.43 + UploadRequestsChan: make(chan int64), 9.44 + DownloadRequestsChan: make(chan int64), 9.45 + } 9.46 + go u.usages[id].collect() 9.47 + } 9.48 + return u.usages[id].DownloadBytesChan, u.usages[id].DownloadRequestsChan 9.49 +} 9.50 + 9.51 +type Usage struct { 9.52 + UploadedBytes int64 9.53 + UploadBytesChan chan int64 9.54 + DownloadedBytes int64 9.55 + DownloadBytesChan chan int64 9.56 + UploadRequests int64 9.57 + UploadRequestsChan chan int64 9.58 + DownloadRequests int64 9.59 + DownloadRequestsChan chan int64 9.60 +} 9.61 + 9.62 +func (u *Usage) collect() { 9.63 + for { 9.64 + select { 9.65 + case b, ok := <-u.UploadBytesChan: 9.66 + if !ok { 9.67 + u.UploadBytesChan = nil 9.68 + } 9.69 + u.UploadedBytes += b 9.70 + case r, ok := <-u.UploadRequestsChan: 9.71 + if !ok { 9.72 + u.UploadRequestsChan = nil 9.73 + } 9.74 + u.UploadRequests += r 9.75 + case b, ok := <-u.DownloadBytesChan: 9.76 + if !ok { 9.77 + u.DownloadBytesChan = nil 9.78 + } 9.79 + u.DownloadedBytes += b 9.80 + case r, ok := <-u.DownloadRequestsChan: 9.81 + if !ok { 9.82 + u.DownloadRequestsChan = nil 9.83 + } 9.84 + u.DownloadRequests += r 9.85 + } 9.86 + if u.UploadBytesChan == nil && u.UploadRequestsChan == nil && u.DownloadBytesChan == nil && u.DownloadRequestsChan == nil { 9.87 + break 9.88 + } 9.89 + } 9.90 +}