gifs/api
gifs/api/upload.go
Remove user functions and logins, fix UpdateCollection. UpdateCollection now takes a CollectionChange argument, rather than overwriting everything. Remove all our user related code, as that's going to be superseded by code.secondbit.org/auth, anyways.
| paddy@0 | 1 package api |
| paddy@0 | 2 |
| paddy@0 | 3 import ( |
| paddy@0 | 4 "crypto/sha1" |
| paddy@0 | 5 "encoding/hex" |
| paddy@0 | 6 "io" |
| paddy@0 | 7 "log" |
| paddy@0 | 8 |
| paddy@0 | 9 "secondbit.org/uuid" |
| paddy@0 | 10 ) |
| paddy@0 | 11 |
| paddy@0 | 12 func Upload(collectionID uuid.ID, r io.Reader, c Context) (Item, int64, error) { |
| paddy@0 | 13 hashReader, hashWriter := io.Pipe() |
| paddy@0 | 14 uploadReader, uploadWriter := io.Pipe() |
| paddy@0 | 15 m := io.MultiWriter(hashWriter, uploadWriter) |
| paddy@0 | 16 |
| paddy@0 | 17 hashChan := make(chan string) |
| paddy@0 | 18 hashError := make(chan error) |
| paddy@0 | 19 hashDone := make(chan struct{}) |
| paddy@0 | 20 |
| paddy@0 | 21 cpDone := make(chan struct{}) |
| paddy@0 | 22 cpErr := make(chan error) |
| paddy@0 | 23 |
| paddy@0 | 24 uploadDone := make(chan struct{}) |
| paddy@0 | 25 uploadErr := make(chan error) |
| paddy@0 | 26 |
| paddy@0 | 27 n := make(chan int64) |
| paddy@0 | 28 |
| paddy@0 | 29 var bytesWritten int64 |
| paddy@0 | 30 tmp := uuid.NewID().String() |
| paddy@0 | 31 tmp = "tmp/" + tmp |
| paddy@0 | 32 |
| paddy@0 | 33 go hash(hashReader, hashChan, hashError, hashDone) |
| paddy@0 | 34 go asyncCopy(m, r, n, cpErr, cpDone) |
| paddy@0 | 35 if c.Storage != nil { |
| paddy@0 | 36 go c.Storage.Upload(c.Bucket, tmp, uploadReader, uploadErr, uploadDone) |
| paddy@0 | 37 } |
| paddy@0 | 38 |
| paddy@0 | 39 select { |
| paddy@0 | 40 case err := <-hashError: |
| paddy@0 | 41 if err != nil { |
| paddy@0 | 42 hashWriter.CloseWithError(err) |
| paddy@0 | 43 uploadWriter.CloseWithError(err) |
| paddy@0 | 44 return Item{}, 0, err |
| paddy@0 | 45 } else { |
| paddy@0 | 46 hashWriter.Close() |
| paddy@0 | 47 uploadWriter.Close() |
| paddy@0 | 48 } |
| paddy@0 | 49 case err := <-cpErr: |
| paddy@0 | 50 if err != nil { |
| paddy@0 | 51 hashWriter.CloseWithError(err) |
| paddy@0 | 52 uploadWriter.CloseWithError(err) |
| paddy@0 | 53 return Item{}, 0, err |
| paddy@0 | 54 } else { |
| paddy@0 | 55 hashWriter.Close() |
| paddy@0 | 56 uploadWriter.Close() |
| paddy@0 | 57 } |
| paddy@0 | 58 case err := <-uploadErr: |
| paddy@0 | 59 if err != nil { |
| paddy@0 | 60 hashWriter.CloseWithError(err) |
| paddy@0 | 61 uploadWriter.CloseWithError(err) |
| paddy@0 | 62 return Item{}, 0, err |
| paddy@0 | 63 } else { |
| paddy@0 | 64 hashWriter.Close() |
| paddy@0 | 65 uploadWriter.Close() |
| paddy@0 | 66 } |
| paddy@0 | 67 case <-cpDone: |
| paddy@0 | 68 hashWriter.Close() |
| paddy@0 | 69 uploadWriter.Close() |
| paddy@0 | 70 } |
| paddy@0 | 71 <-cpDone |
| paddy@0 | 72 <-hashDone |
| paddy@0 | 73 if c.Storage != nil { |
| paddy@0 | 74 <-uploadDone |
| paddy@0 | 75 } |
| paddy@0 | 76 bytesWritten = <-n |
| paddy@0 | 77 finalLocation := <-hashChan |
| paddy@0 | 78 if c.Storage != nil { |
| paddy@0 | 79 err := c.Storage.Move(c.Bucket, tmp, c.Bucket, finalLocation) |
| paddy@0 | 80 if err != nil { |
| paddy@0 | 81 return Item{}, 0, err |
| paddy@0 | 82 } |
| paddy@0 | 83 } |
| paddy@0 | 84 return Item{ |
| paddy@0 | 85 Blob: finalLocation, |
| paddy@0 | 86 Bucket: c.Bucket, |
| paddy@0 | 87 CollectionID: collectionID, |
| paddy@0 | 88 }, bytesWritten, nil |
| paddy@0 | 89 } |
| paddy@0 | 90 |
| paddy@0 | 91 func hash(r io.Reader, resp chan string, errs chan error, done chan struct{}) { |
| paddy@0 | 92 if resp != nil { |
| paddy@0 | 93 defer close(resp) |
| paddy@0 | 94 } |
| paddy@0 | 95 h := sha1.New() |
| paddy@0 | 96 go asyncCopy(h, r, nil, errs, done) |
| paddy@0 | 97 <-done |
| paddy@0 | 98 resp <- hex.EncodeToString(h.Sum(nil)) |
| paddy@0 | 99 } |
| paddy@0 | 100 |
| paddy@0 | 101 func del(bucket, tmp string, c Context) { |
| paddy@0 | 102 err := c.Storage.Delete(bucket, tmp) |
| paddy@0 | 103 if err != nil { |
| paddy@0 | 104 log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err) |
| paddy@0 | 105 } |
| paddy@0 | 106 } |
| paddy@0 | 107 |
| paddy@0 | 108 func asyncCopy(dst io.Writer, src io.Reader, n chan int64, errs chan error, done chan struct{}) { |
| paddy@0 | 109 if errs != nil { |
| paddy@0 | 110 defer close(errs) |
| paddy@0 | 111 } |
| paddy@0 | 112 if done != nil { |
| paddy@0 | 113 defer close(done) |
| paddy@0 | 114 } |
| paddy@0 | 115 written, err := io.Copy(dst, src) |
| paddy@0 | 116 if errs != nil && err != nil { |
| paddy@0 | 117 errs <- err |
| paddy@0 | 118 } |
| paddy@0 | 119 if n != nil { |
| paddy@0 | 120 go func(n chan int64, written int64) { |
| paddy@0 | 121 n <- written |
| paddy@0 | 122 }(n, written) |
| paddy@0 | 123 } |
| paddy@0 | 124 } |