gifs/api
2014-08-27
Child:b5d88d57d587
gifs/api/upload.go
Spike out functionality and tests. Create our interfaces around storing data and retrieving it. Create an in-memory implementation of our interfaces, for testing and rapid dev purposes. Begin sketching out what our unit tests look like. Create our Google Cloud Storage datastore implementation. Sketch out an idea for a usage collection process to keep track of which users are actually using stuff.
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/upload.go Wed Aug 27 22:34:02 2014 -0400 1.3 @@ -0,0 +1,124 @@ 1.4 +package api 1.5 + 1.6 +import ( 1.7 + "crypto/sha1" 1.8 + "encoding/hex" 1.9 + "io" 1.10 + "log" 1.11 + 1.12 + "secondbit.org/uuid" 1.13 +) 1.14 + 1.15 +func Upload(collectionID uuid.ID, r io.Reader, c Context) (Item, int64, error) { 1.16 + hashReader, hashWriter := io.Pipe() 1.17 + uploadReader, uploadWriter := io.Pipe() 1.18 + m := io.MultiWriter(hashWriter, uploadWriter) 1.19 + 1.20 + hashChan := make(chan string) 1.21 + hashError := make(chan error) 1.22 + hashDone := make(chan struct{}) 1.23 + 1.24 + cpDone := make(chan struct{}) 1.25 + cpErr := make(chan error) 1.26 + 1.27 + uploadDone := make(chan struct{}) 1.28 + uploadErr := make(chan error) 1.29 + 1.30 + n := make(chan int64) 1.31 + 1.32 + var bytesWritten int64 1.33 + tmp := uuid.NewID().String() 1.34 + tmp = "tmp/" + tmp 1.35 + 1.36 + go hash(hashReader, hashChan, hashError, hashDone) 1.37 + go asyncCopy(m, r, n, cpErr, cpDone) 1.38 + if c.Storage != nil { 1.39 + go c.Storage.Upload(c.Bucket, tmp, uploadReader, uploadErr, uploadDone) 1.40 + } 1.41 + 1.42 + select { 1.43 + case err := <-hashError: 1.44 + if err != nil { 1.45 + hashWriter.CloseWithError(err) 1.46 + uploadWriter.CloseWithError(err) 1.47 + return Item{}, 0, err 1.48 + } else { 1.49 + hashWriter.Close() 1.50 + uploadWriter.Close() 1.51 + } 1.52 + case err := <-cpErr: 1.53 + if err != nil { 1.54 + hashWriter.CloseWithError(err) 1.55 + uploadWriter.CloseWithError(err) 1.56 + return Item{}, 0, err 1.57 + } else { 1.58 + hashWriter.Close() 1.59 + uploadWriter.Close() 1.60 + } 1.61 + case err := <-uploadErr: 1.62 + if err != nil { 1.63 + hashWriter.CloseWithError(err) 1.64 + uploadWriter.CloseWithError(err) 1.65 + return Item{}, 0, err 1.66 + } else { 1.67 + hashWriter.Close() 1.68 + uploadWriter.Close() 1.69 + } 1.70 + case <-cpDone: 1.71 + hashWriter.Close() 1.72 + uploadWriter.Close() 1.73 + } 1.74 + <-cpDone 1.75 + <-hashDone 1.76 + if c.Storage != nil { 1.77 + <-uploadDone 1.78 + } 1.79 + bytesWritten = <-n 1.80 + finalLocation := <-hashChan 1.81 + if c.Storage != nil { 1.82 + err := c.Storage.Move(c.Bucket, tmp, c.Bucket, finalLocation) 1.83 + if err != nil { 1.84 + return Item{}, 0, err 1.85 + } 1.86 + } 1.87 + return Item{ 1.88 + Blob: finalLocation, 1.89 + Bucket: c.Bucket, 1.90 + CollectionID: collectionID, 1.91 + }, bytesWritten, nil 1.92 +} 1.93 + 1.94 +func hash(r io.Reader, resp chan string, errs chan error, done chan struct{}) { 1.95 + if resp != nil { 1.96 + defer close(resp) 1.97 + } 1.98 + h := sha1.New() 1.99 + go asyncCopy(h, r, nil, errs, done) 1.100 + <-done 1.101 + resp <- hex.EncodeToString(h.Sum(nil)) 1.102 +} 1.103 + 1.104 +func del(bucket, tmp string, c Context) { 1.105 + err := c.Storage.Delete(bucket, tmp) 1.106 + if err != nil { 1.107 + log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err) 1.108 + } 1.109 +} 1.110 + 1.111 +func asyncCopy(dst io.Writer, src io.Reader, n chan int64, errs chan error, done chan struct{}) { 1.112 + if errs != nil { 1.113 + defer close(errs) 1.114 + } 1.115 + if done != nil { 1.116 + defer close(done) 1.117 + } 1.118 + written, err := io.Copy(dst, src) 1.119 + if errs != nil && err != nil { 1.120 + errs <- err 1.121 + } 1.122 + if n != nil { 1.123 + go func(n chan int64, written int64) { 1.124 + n <- written 1.125 + }(n, written) 1.126 + } 1.127 +}