diff --git a/src/sender/sender.go b/src/sender/sender.go index 3adf687..8a76ef1 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -52,6 +52,14 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso var startTransfer time.Time var tcpConnection comm.Comm + type DataChan struct { + b []byte + bytesRead int + err error + } + dataChan := make(chan DataChan, 1024*256) + defer close(dataChan) + useWebsockets := true switch forceSend { case 0: @@ -191,6 +199,71 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso if err != nil { return err } + + // start streaming encryption/compression + go func(dataChan chan DataChan) { + var buffer []byte + if useWebsockets { + buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) + } else { + buffer = make([]byte, models.TCP_BUFFER_SIZE/2) + } + for { + bytesread, err := f.Read(buffer) + if bytesread > 0 { + // do compression + var compressedBytes []byte + if useCompression && !fstats.IsDir { + compressedBytes = compress.Compress(buffer[:bytesread]) + } else { + compressedBytes = buffer[:bytesread] + } + + // do encryption + enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) + encBytes, err := json.Marshal(enc) + if err != nil { + dataChan <- DataChan{ + b: nil, + bytesRead: 0, + err: err, + } + return + } + + select { + case dataChan <- DataChan{ + b: encBytes, + bytesRead: bytesread, + err: nil, + }: + continue + default: + log.Debug("blocked") + // no message sent + // block + dataChan <- DataChan{ + b: encBytes, + bytesRead: bytesread, + err: nil, + } + } + } + if err != nil { + if err != io.EOF { + log.Error(err) + } + break + } + } + // finish + dataChan <- DataChan{ + b: nil, + bytesRead: 0, + err: nil, + } + }(dataChan) + // encrypt the file meta data enc := crypt.Encrypt(fstatsBytes, sessionKey) // send the file meta data @@ -203,7 +276,6 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso return errors.New("recipient refused file") } - buffer := make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) if !useWebsockets { // connection to TCP tcpConnection, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%x", sessionKey)), serverAddress+":"+serverTCP) @@ -212,7 +284,6 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso return } defer tcpConnection.Close() - buffer = make([]byte, models.TCP_BUFFER_SIZE/2) } fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP) @@ -226,43 +297,24 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso progressbar.OptionSetWriter(os.Stderr), ) for { - bytesread, err := f.Read(buffer) - bar.Add(bytesread) - if bytesread > 0 { - // do compression - var compressedBytes []byte - if useCompression && !fstats.IsDir { - compressedBytes = compress.Compress(buffer[:bytesread]) - } else { - compressedBytes = buffer[:bytesread] - } - - // do encryption - enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) - encBytes, err := json.Marshal(enc) - if err != nil { - return err - } - + data := <-dataChan + if data.err != nil { + return data.err + } + if data.bytesRead > 0 { + bar.Add(data.bytesRead) if !useWebsockets { // write data to tcp connection - _, err = tcpConnection.Write(encBytes) + _, err = tcpConnection.Write(data.b) } else { // write data to websockets - err = c.WriteMessage(websocket.BinaryMessage, encBytes) + err = c.WriteMessage(websocket.BinaryMessage, data.b) } if err != nil { err = errors.Wrap(err, "problem writing message") return err } - } - if err != nil { - if err != io.EOF { - log.Error(err) - } - // if !isLocal { - // tcpConnection.Write([]byte("end")) - // } + } else { break } }