gifs/api

Paddy 2014-10-17 Parent:08ec88016e2f Child:b5d88d57d587

2:18cb30e077ba Go to Latest

gifs/api/upload.go

Stub out the rest of our datastore tests. Seriously, just define the functions we eventually want to write.

History
1 package api
3 import (
4 "crypto/sha1"
5 "encoding/hex"
6 "io"
7 "log"
9 "secondbit.org/uuid"
10 )
12 func Upload(collectionID uuid.ID, r io.Reader, c Context) (Item, int64, error) {
13 hashReader, hashWriter := io.Pipe()
14 uploadReader, uploadWriter := io.Pipe()
15 m := io.MultiWriter(hashWriter, uploadWriter)
17 hashChan := make(chan string)
18 hashError := make(chan error)
19 hashDone := make(chan struct{})
21 cpDone := make(chan struct{})
22 cpErr := make(chan error)
24 uploadDone := make(chan struct{})
25 uploadErr := make(chan error)
27 n := make(chan int64)
29 var bytesWritten int64
30 tmp := uuid.NewID().String()
31 tmp = "tmp/" + tmp
33 go hash(hashReader, hashChan, hashError, hashDone)
34 go asyncCopy(m, r, n, cpErr, cpDone)
35 if c.Storage != nil {
36 go c.Storage.Upload(c.Bucket, tmp, uploadReader, uploadErr, uploadDone)
37 }
39 select {
40 case err := <-hashError:
41 if err != nil {
42 hashWriter.CloseWithError(err)
43 uploadWriter.CloseWithError(err)
44 return Item{}, 0, err
45 } else {
46 hashWriter.Close()
47 uploadWriter.Close()
48 }
49 case err := <-cpErr:
50 if err != nil {
51 hashWriter.CloseWithError(err)
52 uploadWriter.CloseWithError(err)
53 return Item{}, 0, err
54 } else {
55 hashWriter.Close()
56 uploadWriter.Close()
57 }
58 case err := <-uploadErr:
59 if err != nil {
60 hashWriter.CloseWithError(err)
61 uploadWriter.CloseWithError(err)
62 return Item{}, 0, err
63 } else {
64 hashWriter.Close()
65 uploadWriter.Close()
66 }
67 case <-cpDone:
68 hashWriter.Close()
69 uploadWriter.Close()
70 }
71 <-cpDone
72 <-hashDone
73 if c.Storage != nil {
74 <-uploadDone
75 }
76 bytesWritten = <-n
77 finalLocation := <-hashChan
78 if c.Storage != nil {
79 err := c.Storage.Move(c.Bucket, tmp, c.Bucket, finalLocation)
80 if err != nil {
81 return Item{}, 0, err
82 }
83 }
84 return Item{
85 Blob: finalLocation,
86 Bucket: c.Bucket,
87 CollectionID: collectionID,
88 }, bytesWritten, nil
89 }
91 func hash(r io.Reader, resp chan string, errs chan error, done chan struct{}) {
92 if resp != nil {
93 defer close(resp)
94 }
95 h := sha1.New()
96 go asyncCopy(h, r, nil, errs, done)
97 <-done
98 resp <- hex.EncodeToString(h.Sum(nil))
99 }
101 func del(bucket, tmp string, c Context) {
102 err := c.Storage.Delete(bucket, tmp)
103 if err != nil {
104 log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err)
105 }
106 }
108 func asyncCopy(dst io.Writer, src io.Reader, n chan int64, errs chan error, done chan struct{}) {
109 if errs != nil {
110 defer close(errs)
111 }
112 if done != nil {
113 defer close(done)
114 }
115 written, err := io.Copy(dst, src)
116 if errs != nil && err != nil {
117 errs <- err
118 }
119 if n != nil {
120 go func(n chan int64, written int64) {
121 n <- written
122 }(n, written)
123 }
124 }