gifs/api
5:b5d88d57d587 Browse Files
Simplify upload. Simplify the upload code by not running the hashing async, which requires fewer copy operations and less channel synchronization. Also, take advantage of the fact that PipeWriters and PipeReaders will return an error to the PipeReaders/PipeWriters (respectively) when read/write is called (respectively) to avoid passing back errors through channels.
1.1 --- a/upload.go Fri Oct 17 07:17:29 2014 -0400 1.2 +++ b/upload.go Fri Oct 17 07:20:06 2014 -0400 1.3 @@ -4,79 +4,48 @@ 1.4 "crypto/sha1" 1.5 "encoding/hex" 1.6 "io" 1.7 - "log" 1.8 1.9 - "secondbit.org/uuid" 1.10 + "code.secondbit.org/uuid.hg" 1.11 ) 1.12 1.13 +func upload(store Storage, bucket, name string, data *io.PipeReader, done chan struct{}) { 1.14 + err := store.Upload(bucket, name, data) 1.15 + if err != nil { 1.16 + data.CloseWithError(err) 1.17 + return 1.18 + } 1.19 + close(done) 1.20 +} 1.21 + 1.22 func Upload(collectionID uuid.ID, r io.Reader, c Context) (Item, int64, error) { 1.23 - hashReader, hashWriter := io.Pipe() 1.24 + h := sha1.New() 1.25 + var w io.Writer 1.26 uploadReader, uploadWriter := io.Pipe() 1.27 - m := io.MultiWriter(hashWriter, uploadWriter) 1.28 + uploadChan := make(chan struct{}) 1.29 + tmp := uuid.NewID().String() 1.30 1.31 - hashChan := make(chan string) 1.32 - hashError := make(chan error) 1.33 - hashDone := make(chan struct{}) 1.34 - 1.35 - cpDone := make(chan struct{}) 1.36 - cpErr := make(chan error) 1.37 - 1.38 - uploadDone := make(chan struct{}) 1.39 - uploadErr := make(chan error) 1.40 - 1.41 - n := make(chan int64) 1.42 - 1.43 - var bytesWritten int64 1.44 - tmp := uuid.NewID().String() 1.45 - tmp = "tmp/" + tmp 1.46 - 1.47 - go hash(hashReader, hashChan, hashError, hashDone) 1.48 - go asyncCopy(m, r, n, cpErr, cpDone) 1.49 if c.Storage != nil { 1.50 - go c.Storage.Upload(c.Bucket, tmp, uploadReader, uploadErr, uploadDone) 1.51 + w = io.MultiWriter(h, uploadWriter) 1.52 + go upload(c.Storage, c.TmpBucket, tmp, uploadReader, uploadChan) 1.53 + } else { 1.54 + w = h 1.55 + close(uploadChan) 1.56 } 1.57 1.58 - select { 1.59 - case err := <-hashError: 1.60 - if err != nil { 1.61 - hashWriter.CloseWithError(err) 1.62 - uploadWriter.CloseWithError(err) 1.63 - return Item{}, 0, err 1.64 - } else { 1.65 - hashWriter.Close() 1.66 + size, err := io.Copy(w, r) 1.67 + if err != nil { 1.68 + if c.Storage != nil { 1.69 uploadWriter.Close() 1.70 } 1.71 - case err := <-cpErr: 1.72 - if err != nil { 1.73 - hashWriter.CloseWithError(err) 1.74 - uploadWriter.CloseWithError(err) 1.75 - return Item{}, 0, err 1.76 - } else { 1.77 - hashWriter.Close() 1.78 - uploadWriter.Close() 1.79 - } 1.80 - case err := <-uploadErr: 1.81 - if err != nil { 1.82 - hashWriter.CloseWithError(err) 1.83 - uploadWriter.CloseWithError(err) 1.84 - return Item{}, 0, err 1.85 - } else { 1.86 - hashWriter.Close() 1.87 - uploadWriter.Close() 1.88 - } 1.89 - case <-cpDone: 1.90 - hashWriter.Close() 1.91 - uploadWriter.Close() 1.92 + return Item{}, 0, err 1.93 } 1.94 - <-cpDone 1.95 - <-hashDone 1.96 + 1.97 + finalLocation := hex.EncodeToString(h.Sum(nil)) 1.98 + uploadWriter.Close() 1.99 + <-uploadChan 1.100 + 1.101 if c.Storage != nil { 1.102 - <-uploadDone 1.103 - } 1.104 - bytesWritten = <-n 1.105 - finalLocation := <-hashChan 1.106 - if c.Storage != nil { 1.107 - err := c.Storage.Move(c.Bucket, tmp, c.Bucket, finalLocation) 1.108 + err = c.Storage.Move(c.TmpBucket, tmp, c.Bucket, finalLocation) 1.109 if err != nil { 1.110 return Item{}, 0, err 1.111 } 1.112 @@ -85,40 +54,5 @@ 1.113 Blob: finalLocation, 1.114 Bucket: c.Bucket, 1.115 CollectionID: collectionID, 1.116 - }, bytesWritten, nil 1.117 + }, size, nil 1.118 } 1.119 - 1.120 -func hash(r io.Reader, resp chan string, errs chan error, done chan struct{}) { 1.121 - if resp != nil { 1.122 - defer close(resp) 1.123 - } 1.124 - h := sha1.New() 1.125 - go asyncCopy(h, r, nil, errs, done) 1.126 - <-done 1.127 - resp <- hex.EncodeToString(h.Sum(nil)) 1.128 -} 1.129 - 1.130 -func del(bucket, tmp string, c Context) { 1.131 - err := c.Storage.Delete(bucket, tmp) 1.132 - if err != nil { 1.133 - log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err) 1.134 - } 1.135 -} 1.136 - 1.137 -func asyncCopy(dst io.Writer, src io.Reader, n chan int64, errs chan error, done chan struct{}) { 1.138 - if errs != nil { 1.139 - defer close(errs) 1.140 - } 1.141 - if done != nil { 1.142 - defer close(done) 1.143 - } 1.144 - written, err := io.Copy(dst, src) 1.145 - if errs != nil && err != nil { 1.146 - errs <- err 1.147 - } 1.148 - if n != nil { 1.149 - go func(n chan int64, written int64) { 1.150 - n <- written 1.151 - }(n, written) 1.152 - } 1.153 -}