gifs/api

Paddy 2014-10-17 Parent:08ec88016e2f

5:b5d88d57d587 Go to Latest

gifs/api/upload.go

Simplify upload. Simplify the upload code by not running the hashing async, which requires fewer copy operations and less channel synchronization. Also, take advantage of the fact that PipeWriters and PipeReaders will return an error to the PipeReaders/PipeWriters (respectively) when read/write is called (respectively) to avoid passing back errors through channels.

History
     1.1 --- a/upload.go	Fri Oct 17 07:17:29 2014 -0400
     1.2 +++ b/upload.go	Fri Oct 17 07:20:06 2014 -0400
     1.3 @@ -4,79 +4,48 @@
     1.4  	"crypto/sha1"
     1.5  	"encoding/hex"
     1.6  	"io"
     1.7 -	"log"
     1.8  
     1.9 -	"secondbit.org/uuid"
    1.10 +	"code.secondbit.org/uuid.hg"
    1.11  )
    1.12  
    1.13 +func upload(store Storage, bucket, name string, data *io.PipeReader, done chan struct{}) {
    1.14 +	err := store.Upload(bucket, name, data)
    1.15 +	if err != nil {
    1.16 +		data.CloseWithError(err)
    1.17 +		return
    1.18 +	}
    1.19 +	close(done)
    1.20 +}
    1.21 +
    1.22  func Upload(collectionID uuid.ID, r io.Reader, c Context) (Item, int64, error) {
    1.23 -	hashReader, hashWriter := io.Pipe()
    1.24 +	h := sha1.New()
    1.25 +	var w io.Writer
    1.26  	uploadReader, uploadWriter := io.Pipe()
    1.27 -	m := io.MultiWriter(hashWriter, uploadWriter)
    1.28 +	uploadChan := make(chan struct{})
    1.29 +	tmp := uuid.NewID().String()
    1.30  
    1.31 -	hashChan := make(chan string)
    1.32 -	hashError := make(chan error)
    1.33 -	hashDone := make(chan struct{})
    1.34 -
    1.35 -	cpDone := make(chan struct{})
    1.36 -	cpErr := make(chan error)
    1.37 -
    1.38 -	uploadDone := make(chan struct{})
    1.39 -	uploadErr := make(chan error)
    1.40 -
    1.41 -	n := make(chan int64)
    1.42 -
    1.43 -	var bytesWritten int64
    1.44 -	tmp := uuid.NewID().String()
    1.45 -	tmp = "tmp/" + tmp
    1.46 -
    1.47 -	go hash(hashReader, hashChan, hashError, hashDone)
    1.48 -	go asyncCopy(m, r, n, cpErr, cpDone)
    1.49  	if c.Storage != nil {
    1.50 -		go c.Storage.Upload(c.Bucket, tmp, uploadReader, uploadErr, uploadDone)
    1.51 +		w = io.MultiWriter(h, uploadWriter)
    1.52 +		go upload(c.Storage, c.TmpBucket, tmp, uploadReader, uploadChan)
    1.53 +	} else {
    1.54 +		w = h
    1.55 +		close(uploadChan)
    1.56  	}
    1.57  
    1.58 -	select {
    1.59 -	case err := <-hashError:
    1.60 -		if err != nil {
    1.61 -			hashWriter.CloseWithError(err)
    1.62 -			uploadWriter.CloseWithError(err)
    1.63 -			return Item{}, 0, err
    1.64 -		} else {
    1.65 -			hashWriter.Close()
    1.66 +	size, err := io.Copy(w, r)
    1.67 +	if err != nil {
    1.68 +		if c.Storage != nil {
    1.69  			uploadWriter.Close()
    1.70  		}
    1.71 -	case err := <-cpErr:
    1.72 -		if err != nil {
    1.73 -			hashWriter.CloseWithError(err)
    1.74 -			uploadWriter.CloseWithError(err)
    1.75 -			return Item{}, 0, err
    1.76 -		} else {
    1.77 -			hashWriter.Close()
    1.78 -			uploadWriter.Close()
    1.79 -		}
    1.80 -	case err := <-uploadErr:
    1.81 -		if err != nil {
    1.82 -			hashWriter.CloseWithError(err)
    1.83 -			uploadWriter.CloseWithError(err)
    1.84 -			return Item{}, 0, err
    1.85 -		} else {
    1.86 -			hashWriter.Close()
    1.87 -			uploadWriter.Close()
    1.88 -		}
    1.89 -	case <-cpDone:
    1.90 -		hashWriter.Close()
    1.91 -		uploadWriter.Close()
    1.92 +		return Item{}, 0, err
    1.93  	}
    1.94 -	<-cpDone
    1.95 -	<-hashDone
    1.96 +
    1.97 +	finalLocation := hex.EncodeToString(h.Sum(nil))
    1.98 +	uploadWriter.Close()
    1.99 +	<-uploadChan
   1.100 +
   1.101  	if c.Storage != nil {
   1.102 -		<-uploadDone
   1.103 -	}
   1.104 -	bytesWritten = <-n
   1.105 -	finalLocation := <-hashChan
   1.106 -	if c.Storage != nil {
   1.107 -		err := c.Storage.Move(c.Bucket, tmp, c.Bucket, finalLocation)
   1.108 +		err = c.Storage.Move(c.TmpBucket, tmp, c.Bucket, finalLocation)
   1.109  		if err != nil {
   1.110  			return Item{}, 0, err
   1.111  		}
   1.112 @@ -85,40 +54,5 @@
   1.113  		Blob:         finalLocation,
   1.114  		Bucket:       c.Bucket,
   1.115  		CollectionID: collectionID,
   1.116 -	}, bytesWritten, nil
   1.117 +	}, size, nil
   1.118  }
   1.119 -
   1.120 -func hash(r io.Reader, resp chan string, errs chan error, done chan struct{}) {
   1.121 -	if resp != nil {
   1.122 -		defer close(resp)
   1.123 -	}
   1.124 -	h := sha1.New()
   1.125 -	go asyncCopy(h, r, nil, errs, done)
   1.126 -	<-done
   1.127 -	resp <- hex.EncodeToString(h.Sum(nil))
   1.128 -}
   1.129 -
   1.130 -func del(bucket, tmp string, c Context) {
   1.131 -	err := c.Storage.Delete(bucket, tmp)
   1.132 -	if err != nil {
   1.133 -		log.Printf("Error deleting temporary upload %s in %s: %s\n", tmp, bucket, err)
   1.134 -	}
   1.135 -}
   1.136 -
   1.137 -func asyncCopy(dst io.Writer, src io.Reader, n chan int64, errs chan error, done chan struct{}) {
   1.138 -	if errs != nil {
   1.139 -		defer close(errs)
   1.140 -	}
   1.141 -	if done != nil {
   1.142 -		defer close(done)
   1.143 -	}
   1.144 -	written, err := io.Copy(dst, src)
   1.145 -	if errs != nil && err != nil {
   1.146 -		errs <- err
   1.147 -	}
   1.148 -	if n != nil {
   1.149 -		go func(n chan int64, written int64) {
   1.150 -			n <- written
   1.151 -		}(n, written)
   1.152 -	}
   1.153 -}