do compression in thread

This commit is contained in:
Zack Scholl 2018-09-25 07:29:55 -07:00
parent f169b12109
commit 1a22d37c2d
1 changed files with 82 additions and 30 deletions

View File

@ -52,6 +52,14 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso
var startTransfer time.Time
var tcpConnection comm.Comm
type DataChan struct {
b []byte
bytesRead int
err error
}
dataChan := make(chan DataChan, 1024*256)
defer close(dataChan)
useWebsockets := true
switch forceSend {
case 0:
@ -191,6 +199,71 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso
if err != nil {
return err
}
// start streaming encryption/compression
go func(dataChan chan DataChan) {
var buffer []byte
if useWebsockets {
buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8)
} else {
buffer = make([]byte, models.TCP_BUFFER_SIZE/2)
}
for {
bytesread, err := f.Read(buffer)
if bytesread > 0 {
// do compression
var compressedBytes []byte
if useCompression && !fstats.IsDir {
compressedBytes = compress.Compress(buffer[:bytesread])
} else {
compressedBytes = buffer[:bytesread]
}
// do encryption
enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption)
encBytes, err := json.Marshal(enc)
if err != nil {
dataChan <- DataChan{
b: nil,
bytesRead: 0,
err: err,
}
return
}
select {
case dataChan <- DataChan{
b: encBytes,
bytesRead: bytesread,
err: nil,
}:
continue
default:
log.Debug("blocked")
// no message sent
// block
dataChan <- DataChan{
b: encBytes,
bytesRead: bytesread,
err: nil,
}
}
}
if err != nil {
if err != io.EOF {
log.Error(err)
}
break
}
}
// finish
dataChan <- DataChan{
b: nil,
bytesRead: 0,
err: nil,
}
}(dataChan)
// encrypt the file meta data
enc := crypt.Encrypt(fstatsBytes, sessionKey)
// send the file meta data
@ -203,7 +276,6 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso
return errors.New("recipient refused file")
}
buffer := make([]byte, models.WEBSOCKET_BUFFER_SIZE/8)
if !useWebsockets {
// connection to TCP
tcpConnection, err = connectToTCPServer(utils.SHA256(fmt.Sprintf("%x", sessionKey)), serverAddress+":"+serverTCP)
@ -212,7 +284,6 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso
return
}
defer tcpConnection.Close()
buffer = make([]byte, models.TCP_BUFFER_SIZE/2)
}
fmt.Fprintf(os.Stderr, "\rSending (->%s)...\n", otherIP)
@ -226,43 +297,24 @@ func send(forceSend int, serverAddress, serverTCP string, isLocal bool, c *webso
progressbar.OptionSetWriter(os.Stderr),
)
for {
bytesread, err := f.Read(buffer)
bar.Add(bytesread)
if bytesread > 0 {
// do compression
var compressedBytes []byte
if useCompression && !fstats.IsDir {
compressedBytes = compress.Compress(buffer[:bytesread])
} else {
compressedBytes = buffer[:bytesread]
}
// do encryption
enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption)
encBytes, err := json.Marshal(enc)
if err != nil {
return err
}
data := <-dataChan
if data.err != nil {
return data.err
}
if data.bytesRead > 0 {
bar.Add(data.bytesRead)
if !useWebsockets {
// write data to tcp connection
_, err = tcpConnection.Write(encBytes)
_, err = tcpConnection.Write(data.b)
} else {
// write data to websockets
err = c.WriteMessage(websocket.BinaryMessage, encBytes)
err = c.WriteMessage(websocket.BinaryMessage, data.b)
}
if err != nil {
err = errors.Wrap(err, "problem writing message")
return err
}
}
if err != nil {
if err != io.EOF {
log.Error(err)
}
// if !isLocal {
// tcpConnection.Write([]byte("end"))
// }
} else {
break
}
}