gifs/api
gifs/api/upload.go
Upload is no longer async, memstorage is parallel-safe. Upload no longer needs to be run async (it can be run inside a goroutine), so it now returns stuff instead of taking a channel as an argument. This will make it easier to implement, as all the async stuff is an abstraction above, and therefore doesn't need to be worried about for each reimplementation. The memstorage type is no longer exported and can now be safely used by multiple goroutines, thanks to the sync package.
| paddy@0 | 1 package api |
| paddy@0 | 2 |
| paddy@0 | 3 import ( |
| paddy@0 | 4 "crypto/sha1" |
| paddy@0 | 5 "encoding/hex" |
| paddy@0 | 6 "io" |
| paddy@0 | 7 "log" |
| paddy@0 | 8 |
| paddy@0 | 9 "secondbit.org/uuid" |
| paddy@0 | 10 ) |
| paddy@0 | 11 |
| paddy@0 | 12 func Upload(collectionID uuid.ID, r io.Reader, c Context) (Item, int64, error) { |
| paddy@0 | 13 hashReader, hashWriter := io.Pipe() |
| paddy@0 | 14 uploadReader, uploadWriter := io.Pipe() |
| paddy@0 | 15 m := io.MultiWriter(hashWriter, uploadWriter) |
| paddy@0 | 16 |
| paddy@0 | 17 hashChan := make(chan string) |
| paddy@0 | 18 hashError := make(chan error) |
| paddy@0 | 19 hashDone := make(chan struct{}) |
| paddy@0 | 20 |
| paddy@0 | 21 cpDone := make(chan struct{}) |
| paddy@0 | 22 cpErr := make(chan error) |
| paddy@0 | 23 |
| paddy@0 | 24 uploadDone := make(chan struct{}) |
| paddy@0 | 25 uploadErr := make(chan error) |
| paddy@0 | 26 |
| paddy@0 | 27 n := make(chan int64) |
| paddy@0 | 28 |
| paddy@0 | 29 var bytesWritten int64 |
| paddy@0 | 30 tmp := uuid.NewID().String() |
| paddy@0 | 31 tmp = "tmp/" + tmp |
| paddy@0 | 32 |
| paddy@0 | 33 go hash(hashReader, hashChan, hashError, hashDone) |
| paddy@0 | 34 go asyncCopy(m, r, n, cpErr, cpDone) |
| paddy@0 | 35 if c.Storage != nil { |
| paddy@0 | 36 go c.Storage.Upload(c.Bucket, tmp, uploadReader, uploadErr, uploadDone) |
| paddy@0 | 37 } |
| paddy@0 | 38 |
| paddy@0 | 39 select { |
| paddy@0 | 40 case err := <-hashError: |
| paddy@0 | 41 if err != nil { |
| paddy@0 | 42 hashWriter.CloseWithError(err) |
| paddy@0 | 43 uploadWriter.CloseWithError(err) |
| paddy@0 | 44 return Item{}, 0, err |
| paddy@0 | 45 } else { |
| paddy@0 | 46 hashWriter.Close() |
| paddy@0 | 47 uploadWriter.Close() |
| paddy@0 | 48 } |
| paddy@0 | 49 case err := <-cpErr: |
| paddy@0 | 50 if err != nil { |
| paddy@0 | 51 hashWriter.CloseWithError(err) |
| paddy@0 | 52 uploadWriter.CloseWithError(err) |
| paddy@0 | 53 return Item{}, 0, err |
| paddy@0 | 54 } else { |
| paddy@0 | 55 hashWriter.Close() |
| paddy@0 | 56 uploadWriter.Close() |
| paddy@0 | 57 } |
| paddy@0 | 58 case err := <-uploadErr: |
| paddy@0 | 59 if err != nil { |
| paddy@0 | 60 hashWriter.CloseWithError(err) |
| paddy@0 | 61 uploadWriter.CloseWithError(err) |
| paddy@0 | 62 return Item{}, 0, err |
| paddy@0 | 63 } else { |
| paddy@0 | 64 hashWriter.Close() |
| paddy@0 | 65 uploadWriter.Close() |
| paddy@0 | 66 } |
| paddy@0 | 67 case <-cpDone: |
| paddy@0 | 68 hashWriter.Close() |
| paddy@0 | 69 uploadWriter.Close() |
| paddy@0 | 70 } |
| paddy@0 | 71 <-cpDone |
| paddy@0 | 72 <-hashDone |
| paddy@0 | 73 if c.Storage != nil { |
| paddy@0 | 74 <-uploadDone |
| paddy@0 | 75 } |
| paddy@0 | 76 bytesWritten = <-n |
| paddy@0 | 77 finalLocation := <-hashChan |
| paddy@0 | 78 if c.Storage != nil { |
| paddy@0 | 79 err := c.Storage.Move(c.Bucket, tmp, c.Bucket, finalLocation) |
| paddy@0 | 80 if err != nil { |
| paddy@0 | 81 return Item{}, 0, err |
| paddy@0 | 82 } |
| paddy@0 | 83 } |
| paddy@0 | 84 return Item{ |
| paddy@0 | 85 Blob: finalLocation, |
| paddy@0 | 86 Bucket: c.Bucket, |
| paddy@0 | 87 CollectionID: collectionID, |
| paddy@0 | 88 }, bytesWritten, nil |
| paddy@0 | 89 } |
| paddy@0 | 90 |
| paddy@0 | 91 func hash(r io.Reader, resp chan string, errs chan error, done chan struct{}) { |
| paddy@0 | 92 if resp != nil { |
| paddy@0 | 93 defer close(resp) |
| paddy@0 | 94 } |
| paddy@0 | 95 h := sha1.New() |
| paddy@0 | 96 go asyncCopy(h, r, nil, errs, done) |
| paddy@0 | 97 <-done |
| paddy@0 | 98 resp <- hex.EncodeToString(h.Sum(nil)) |
| paddy@0 | 99 } |
| paddy@0 | 100 |
| paddy@0 | 101 func del(bucket, tmp string, c Context) { |
| paddy@0 | 102 err := c.Storage.Delete(bucket, tmp) |
| paddy@0 | 103 if err != nil { |
| paddy@0 | 104 log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err) |
| paddy@0 | 105 } |
| paddy@0 | 106 } |
| paddy@0 | 107 |
| paddy@0 | 108 func asyncCopy(dst io.Writer, src io.Reader, n chan int64, errs chan error, done chan struct{}) { |
| paddy@0 | 109 if errs != nil { |
| paddy@0 | 110 defer close(errs) |
| paddy@0 | 111 } |
| paddy@0 | 112 if done != nil { |
| paddy@0 | 113 defer close(done) |
| paddy@0 | 114 } |
| paddy@0 | 115 written, err := io.Copy(dst, src) |
| paddy@0 | 116 if errs != nil && err != nil { |
| paddy@0 | 117 errs <- err |
| paddy@0 | 118 } |
| paddy@0 | 119 if n != nil { |
| paddy@0 | 120 go func(n chan int64, written int64) { |
| paddy@0 | 121 n <- written |
| paddy@0 | 122 }(n, written) |
| paddy@0 | 123 } |
| paddy@0 | 124 } |