move reading to goroutine

This commit is contained in:
Zack Scholl 2018-09-25 09:55:35 -07:00
parent 75f7cdcf65
commit f62459e1a4
1 changed files with 52 additions and 33 deletions

View File

@ -49,6 +49,7 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we
var hash256 []byte var hash256 []byte
var otherIP string var otherIP string
var tcpConnection comm.Comm var tcpConnection comm.Comm
dataChan := make(chan []byte, 1024*1024)
useWebsockets := true useWebsockets := true
switch forceSend { switch forceSend {
@ -195,10 +196,48 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we
progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetBytes(int(fstats.Size)),
progressbar.OptionSetWriter(os.Stderr), progressbar.OptionSetWriter(os.Stderr),
) )
finished := make(chan bool)
go func(finished chan bool, dataChan chan []byte) (err error) {
for {
message := <-dataChan
// do decryption
var enc crypt.Encryption
err = json.Unmarshal(message, &enc)
if err != nil {
// log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs)
log.Error(err)
return err
}
decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted)
if err != nil {
log.Error(err)
return err
}
// do decompression
if fstats.IsCompressed && !fstats.IsDir {
decrypted = compress.Decompress(decrypted)
}
// write to file
n, err := f.Write(decrypted)
if err != nil {
return err
}
// update the bytes written
bytesWritten += n
// update the progress bar
bar.Add(n)
if int64(bytesWritten) == fstats.Size {
log.Debug("finished")
break
}
}
finished <- true
return
}(finished, dataChan)
c.WriteMessage(websocket.BinaryMessage, []byte("ready")) c.WriteMessage(websocket.BinaryMessage, []byte("ready"))
startTime := time.Now() startTime := time.Now()
var numBytes int
var bs []byte
for { for {
if useWebsockets { if useWebsockets {
var messageType int var messageType int
@ -209,7 +248,7 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we
} }
} else { } else {
// read from TCP connection // read from TCP connection
message, numBytes, bs, err = tcpConnection.Read() message, _, _, err = tcpConnection.Read()
// log.Debugf("message: %s", message) // log.Debugf("message: %s", message)
} }
if err != nil { if err != nil {
@ -220,39 +259,19 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we
log.Debug("got magic") log.Debug("got magic")
break break
} }
select {
// do decryption case dataChan <- message:
var enc crypt.Encryption continue
err = json.Unmarshal(message, &enc) default:
if err != nil { log.Debug("blocked")
log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) // no message sent
return err // block
dataChan <- message
} }
decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted)
if err != nil {
return err
}
// do decompression
if fstats.IsCompressed && !fstats.IsDir {
decrypted = compress.Decompress(decrypted)
}
// write to file
n, err := f.Write(decrypted)
if err != nil {
return err
}
// update the bytes written
bytesWritten += n
// update the progress bar
bar.Add(n)
// if int64(bytesWritten) == fstats.Size {
// break
// }
} }
_ = <-finished
c.WriteMessage(websocket.BinaryMessage, []byte("done")) c.WriteMessage(websocket.BinaryMessage, []byte("done"))
// we are finished // we are finished
transferTime = time.Since(startTime) transferTime = time.Since(startTime)