diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 3860fd0..1df0a93 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -49,6 +49,7 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we var hash256 []byte var otherIP string var tcpConnection comm.Comm + dataChan := make(chan []byte, 1024*1024) useWebsockets := true switch forceSend { @@ -195,10 +196,48 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetWriter(os.Stderr), ) + finished := make(chan bool) + go func(finished chan bool, dataChan chan []byte) (err error) { + for { + message := <-dataChan + // do decryption + var enc crypt.Encryption + err = json.Unmarshal(message, &enc) + if err != nil { + // log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) + log.Error(err) + return err + } + decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) + if err != nil { + log.Error(err) + return err + } + + // do decompression + if fstats.IsCompressed && !fstats.IsDir { + decrypted = compress.Decompress(decrypted) + } + + // write to file + n, err := f.Write(decrypted) + if err != nil { + return err + } + // update the bytes written + bytesWritten += n + // update the progress bar + bar.Add(n) + if int64(bytesWritten) == fstats.Size { + log.Debug("finished") + break + } + } + finished <- true + return + }(finished, dataChan) c.WriteMessage(websocket.BinaryMessage, []byte("ready")) startTime := time.Now() - var numBytes int - var bs []byte for { if useWebsockets { var messageType int @@ -209,7 +248,7 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we } } else { // read from TCP connection - message, numBytes, bs, err = tcpConnection.Read() + message, _, _, err = tcpConnection.Read() // log.Debugf("message: %s", message) } if err != nil { @@ -220,39 +259,19 @@ func receive(forceSend int, serverAddress, serverTCP string, isLocal bool, c *we log.Debug("got magic") break } - - // do decryption - var enc crypt.Encryption - err = json.Unmarshal(message, &enc) - if err != nil { - log.Errorf("%s: [%s] [%+v] (%d/%d) %+v", err.Error(), message, message, len(message), numBytes, bs) - return err + select { + case dataChan <- message: + continue + default: + log.Debug("blocked") + // no message sent + // block + dataChan <- message } - decrypted, err := enc.Decrypt(sessionKey, !fstats.IsEncrypted) - if err != nil { - return err - } - - // do decompression - if fstats.IsCompressed && !fstats.IsDir { - decrypted = compress.Decompress(decrypted) - } - - // write to file - n, err := f.Write(decrypted) - if err != nil { - return err - } - // update the bytes written - bytesWritten += n - // update the progress bar - bar.Add(n) - - // if int64(bytesWritten) == fstats.Size { - // break - // } } + _ = <-finished + c.WriteMessage(websocket.BinaryMessage, []byte("done")) // we are finished transferTime = time.Since(startTime)