From 61d57ad0af46cf805596ac5bd9c2963386b37a67 Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 9 Oct 2018 06:01:21 -0700 Subject: [PATCH 1/4] sender and recipient share blocks --- src/recipient/recipient.go | 60 +++++++++--- src/sender/sender.go | 184 ++++++++++++++++++++----------------- 2 files changed, 151 insertions(+), 93 deletions(-) diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 5d283b2..1e181d7 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -1,6 +1,7 @@ package recipient import ( + "bufio" "bytes" "encoding/json" "errors" @@ -52,6 +53,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo var otherIP string var tcpConnections []comm.Comm dataChan := make(chan []byte, 1024*1024) + blocks := []string{} useWebsockets := true switch forceSend { @@ -129,7 +131,18 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo return err } log.Debugf("%x\n", sessionKey) - c.WriteMessage(websocket.BinaryMessage, []byte("ready")) + + // append the previous blocks if there was progress previously + file, errCrocProgress := os.Open("croc-progress") + if errCrocProgress == nil { + scanner := bufio.NewScanner(file) + for scanner.Scan() { + blocks = append(blocks, strings.TrimSpace(scanner.Text())) + } + file.Close() + } + blocksBytes, _ := json.Marshal(blocks) + c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) case 3: spin.Stop() @@ -189,14 +202,23 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } // await file - f, err := os.Create(fstats.SentName) - if err != nil { - log.Error(err) - return err - } - if err = f.Truncate(fstats.Size); err != nil { - log.Error(err) - return err + var f *os.File + if utils.Exists(fstats.SentName) { + f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) + if err != nil { + log.Error(err) + return err + } + } else { + f, err = os.Create(fstats.SentName) + if err != nil { + log.Error(err) + return err + } + if err = f.Truncate(fstats.Size); err != nil { + log.Error(err) + return err + } } bytesWritten := 0 fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) @@ -209,6 +231,19 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo finished := make(chan bool) go func(finished chan bool, dataChan chan []byte) (err error) { + os.Remove("croc-progress2") + fProgress, errCreate := os.Create("croc-progress2") + if errCreate != nil { + panic(errCreate) + } + defer fProgress.Close() + blocksWritten := 0.0 + blocksToWrite := float64(fstats.Size) + if useWebsockets { + blocksToWrite = blocksToWrite / float64(models.WEBSOCKET_BUFFER_SIZE/8) + } else { + blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks)) + } for { message := <-dataChan // do decryption @@ -231,8 +266,9 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo pieces := bytes.SplitN(decrypted, []byte("-"), 2) decrypted = pieces[1] locationToWrite, _ = strconv.Atoi(string(pieces[0])) + log.Debugf("writing to location %d (%2.0f/%2.0f)", locationToWrite, blocksWritten, blocksToWrite) + fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) } - // do decompression if fstats.IsCompressed && !fstats.IsDir { decrypted = compress.Decompress(decrypted) @@ -251,13 +287,15 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } if err != nil { + log.Error(err) return err } // update the bytes written bytesWritten += n + blocksWritten += 1.0 // update the progress bar bar.Add(n) - if int64(bytesWritten) == fstats.Size { + if int64(bytesWritten) == fstats.Size || blocksWritten >= blocksToWrite { log.Debug("finished") break } diff --git a/src/sender/sender.go b/src/sender/sender.go index caed83d..335557c 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -8,6 +8,7 @@ import ( "net" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -52,6 +53,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, var otherIP string var startTransfer time.Time var tcpConnections []comm.Comm + blocksToSkip := make(map[int64]struct{}) type DataChan struct { b []byte @@ -169,87 +171,6 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } fileReady <- nil - // start streaming encryption/compression - go func(dataChan chan DataChan) { - var buffer []byte - if useWebsockets { - buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) - } else { - buffer = make([]byte, models.TCP_BUFFER_SIZE/2) - } - currentPostition := int64(0) - for { - bytesread, err := f.Read(buffer) - if bytesread > 0 { - // do compression - var compressedBytes []byte - if useCompression && !fstats.IsDir { - compressedBytes = compress.Compress(buffer[:bytesread]) - } else { - compressedBytes = buffer[:bytesread] - } - - // if using TCP, prepend the location to write the data to in the resulting file - if !useWebsockets { - compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) - } - - // do encryption - enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) - encBytes, err := json.Marshal(enc) - if err != nil { - dataChan <- DataChan{ - b: nil, - bytesRead: 0, - err: err, - } - return - } - - select { - case dataChan <- DataChan{ - b: encBytes, - bytesRead: bytesread, - err: nil, - }: - default: - log.Debug("blocked") - // no message sent - // block - dataChan <- DataChan{ - b: encBytes, - bytesRead: bytesread, - err: nil, - } - } - currentPostition += int64(bytesread) - } - if err != nil { - if err != io.EOF { - log.Error(err) - } - break - } - } - // finish - log.Debug("sending magic") - dataChan <- DataChan{ - b: []byte("magic"), - bytesRead: 0, - err: nil, - } - if !useWebsockets { - log.Debug("sending extra magic to %d others", len(tcpPorts)-1) - for i := 0; i < len(tcpPorts)-1; i++ { - log.Debug("sending magic") - dataChan <- DataChan{ - b: []byte("magic"), - bytesRead: 0, - err: nil, - } - } - } - }(dataChan) }() // send pake data @@ -275,9 +196,108 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, spin.Start() case 3: log.Debugf("[%d] recipient declares readiness for file info", step) - if !bytes.Equal(message, []byte("ready")) { + if !bytes.HasPrefix(message, []byte("ready")) { return errors.New("recipient refused file") } + // determine if any blocks were sent to skip + var blocks []string + errBlocks := json.Unmarshal(message[5:], &blocks) + if errBlocks == nil { + log.Debugf("found blocks: %+v", blocks) + for _, block := range blocks { + blockInt64, _ := strconv.Atoi(block) + blocksToSkip[int64(blockInt64)] = struct{}{} + } + } + + // start streaming encryption/compression + go func(dataChan chan DataChan) { + var buffer []byte + if useWebsockets { + buffer = make([]byte, models.WEBSOCKET_BUFFER_SIZE/8) + } else { + buffer = make([]byte, models.TCP_BUFFER_SIZE/2) + } + currentPostition := int64(0) + for { + bytesread, err := f.Read(buffer) + if bytesread > 0 { + if _, ok := blocksToSkip[currentPostition]; ok { + log.Debugf("skipping the sending of block %d", currentPostition) + currentPostition += int64(bytesread) + continue + } + + // do compression + var compressedBytes []byte + if useCompression && !fstats.IsDir { + compressedBytes = compress.Compress(buffer[:bytesread]) + } else { + compressedBytes = buffer[:bytesread] + } + + // if using TCP, prepend the location to write the data to in the resulting file + if !useWebsockets { + compressedBytes = append([]byte(fmt.Sprintf("%d-", currentPostition)), compressedBytes...) + } + + // do encryption + enc := crypt.Encrypt(compressedBytes, sessionKey, !useEncryption) + encBytes, err := json.Marshal(enc) + if err != nil { + dataChan <- DataChan{ + b: nil, + bytesRead: 0, + err: err, + } + return + } + + select { + case dataChan <- DataChan{ + b: encBytes, + bytesRead: bytesread, + err: nil, + }: + default: + log.Debug("blocked") + // no message sent + // block + dataChan <- DataChan{ + b: encBytes, + bytesRead: bytesread, + err: nil, + } + } + currentPostition += int64(bytesread) + } + if err != nil { + if err != io.EOF { + log.Error(err) + } + break + } + } + // finish + log.Debug("sending magic") + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: 0, + err: nil, + } + if !useWebsockets { + log.Debug("sending extra magic to %d others", len(tcpPorts)-1) + for i := 0; i < len(tcpPorts)-1; i++ { + log.Debug("sending magic") + dataChan <- DataChan{ + b: []byte("magic"), + bytesRead: 0, + err: nil, + } + } + } + }(dataChan) + err = <-fileReady // block until file is ready if err != nil { return err From 9940856317943793a9a665a7e0b922823286f69a Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 9 Oct 2018 06:32:49 -0700 Subject: [PATCH 2/4] rearrange block communication --- src/recipient/recipient.go | 63 +++++++++++++++++++++++++++----------- src/sender/sender.go | 49 +++++++++++++++-------------- src/zipper/zip.go | 3 +- 3 files changed, 72 insertions(+), 43 deletions(-) diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 1e181d7..0f7840b 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -51,6 +51,8 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo var transferTime time.Duration var hash256 []byte var otherIP string + var progressFile string + var resumeFile bool var tcpConnections []comm.Comm dataChan := make(chan []byte, 1024*1024) blocks := []string{} @@ -132,17 +134,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } log.Debugf("%x\n", sessionKey) - // append the previous blocks if there was progress previously - file, errCrocProgress := os.Open("croc-progress") - if errCrocProgress == nil { - scanner := bufio.NewScanner(file) - for scanner.Scan() { - blocks = append(blocks, strings.TrimSpace(scanner.Text())) - } - file.Close() - } - blocksBytes, _ := json.Marshal(blocks) - c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) + c.WriteMessage(websocket.BinaryMessage, []byte("ready")) case 3: spin.Stop() @@ -164,9 +156,14 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo log.Debugf("got file stats: %+v", fstats) // prompt user if its okay to receive file + progressFile = fmt.Sprintf("%s.progress", fstats.SentName) overwritingOrReceiving := "Receiving" - if utils.Exists(fstats.Name) { + if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) { overwritingOrReceiving = "Overwriting" + if utils.Exists(progressFile) && !useWebsockets { + overwritingOrReceiving = "Resume receiving" + resumeFile = true + } } fileOrFolder := "file" if fstats.IsDir { @@ -220,6 +217,26 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo return err } } + + // append the previous blocks if there was progress previously + if resumeFile { + file, _ := os.Open(progressFile) + scanner := bufio.NewScanner(file) + for scanner.Scan() { + blocks = append(blocks, strings.TrimSpace(scanner.Text())) + } + file.Close() + } + blocksBytes, _ := json.Marshal(blocks) + + blockSize := 0 + if useWebsockets { + blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 + } else { + blockSize = models.TCP_BUFFER_SIZE / 2 + } + + // start the ui for pgoress bytesWritten := 0 fmt.Fprintf(os.Stderr, "\nReceiving (<-%s)...\n", otherIP) bar := progressbar.NewOptions( @@ -228,15 +245,24 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetWriter(os.Stderr), ) + bar.Add((len(blocks) * blockSize)) finished := make(chan bool) go func(finished chan bool, dataChan chan []byte) (err error) { - os.Remove("croc-progress2") - fProgress, errCreate := os.Create("croc-progress2") - if errCreate != nil { - panic(errCreate) + // remove previous progress + var fProgress *os.File + var progressErr error + if resumeFile { + fProgress, progressErr = os.OpenFile(progressFile, os.O_APPEND, 0644) + } else { + os.Remove(progressFile) + fProgress, progressErr = os.Create(progressFile) + } + if progressErr != nil { + panic(progressErr) } defer fProgress.Close() + blocksWritten := 0.0 blocksToWrite := float64(fstats.Size) if useWebsockets { @@ -305,7 +331,8 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo }(finished, dataChan) log.Debug("telling sender i'm ready") - c.WriteMessage(websocket.BinaryMessage, []byte("ready")) + c.WriteMessage(websocket.BinaryMessage, append([]byte("ready"), blocksBytes...)) + startTime := time.Now() if useWebsockets { for { @@ -426,6 +453,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo fstats.Name = "stdout" } fmt.Fprintf(os.Stderr, "\nReceived %s written to %s (%2.1f %s)\n", folderOrFile, fstats.Name, transferRate, transferType) + os.Remove(progressFile) } return err } else { @@ -435,7 +463,6 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } return errors.New("file corrupted") } - default: return fmt.Errorf("unknown step") } diff --git a/src/sender/sender.go b/src/sender/sender.go index 335557c..03ce145 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -199,14 +199,38 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, if !bytes.HasPrefix(message, []byte("ready")) { return errors.New("recipient refused file") } + + err = <-fileReady // block until file is ready + if err != nil { + return err + } + fstatsBytes, err := json.Marshal(fstats) + if err != nil { + return err + } + + // encrypt the file meta data + enc := crypt.Encrypt(fstatsBytes, sessionKey) + // send the file meta data + c.WriteMessage(websocket.BinaryMessage, enc.Bytes()) + case 4: + spin.Stop() + + log.Debugf("[%d] recipient declares readiness for file data", step) + if !bytes.HasPrefix(message, []byte("ready")) { + return errors.New("recipient refused file") + } + // determine if any blocks were sent to skip var blocks []string errBlocks := json.Unmarshal(message[5:], &blocks) if errBlocks == nil { log.Debugf("found blocks: %+v", blocks) for _, block := range blocks { - blockInt64, _ := strconv.Atoi(block) - blocksToSkip[int64(blockInt64)] = struct{}{} + blockInt64, errBlock := strconv.Atoi(block) + if errBlock == nil { + blocksToSkip[int64(blockInt64)] = struct{}{} + } } } @@ -298,27 +322,6 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } }(dataChan) - err = <-fileReady // block until file is ready - if err != nil { - return err - } - fstatsBytes, err := json.Marshal(fstats) - if err != nil { - return err - } - - // encrypt the file meta data - enc := crypt.Encrypt(fstatsBytes, sessionKey) - // send the file meta data - c.WriteMessage(websocket.BinaryMessage, enc.Bytes()) - case 4: - spin.Stop() - - log.Debugf("[%d] recipient declares readiness for file data", step) - if !bytes.Equal(message, []byte("ready")) { - return errors.New("recipient refused file") - } - // connect to TCP to receive file if !useWebsockets { log.Debugf("connecting to server") diff --git a/src/zipper/zip.go b/src/zipper/zip.go index faffbd7..1d1fa06 100644 --- a/src/zipper/zip.go +++ b/src/zipper/zip.go @@ -4,7 +4,6 @@ import ( "archive/zip" "compress/flate" "io" - "io/ioutil" "os" "path/filepath" "strings" @@ -97,7 +96,7 @@ func ZipFile(fname string, compress bool) (writtenFilename string, err error) { return } log.Debugf("current directory: %s", curdir) - newfile, err := ioutil.TempFile(curdir, filename+".") + newfile, err := os.Create(fname + ".zip") if err != nil { log.Error(err) return From e18938fb7077e8be28cb2579fee1a7d90eb51b0d Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 9 Oct 2018 06:33:25 -0700 Subject: [PATCH 3/4] change name of croc zip --- src/zipper/zip.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zipper/zip.go b/src/zipper/zip.go index 1d1fa06..bfea9df 100644 --- a/src/zipper/zip.go +++ b/src/zipper/zip.go @@ -96,7 +96,7 @@ func ZipFile(fname string, compress bool) (writtenFilename string, err error) { return } log.Debugf("current directory: %s", curdir) - newfile, err := os.Create(fname + ".zip") + newfile, err := os.Create(fname + ".croc.zip") if err != nil { log.Error(err) return From c600b51888716fbbd7b4f89cbec593371158c48c Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Tue, 9 Oct 2018 06:56:38 -0700 Subject: [PATCH 4/4] resume works with websockets or tcp --- src/recipient/recipient.go | 28 +++++++++++++++++++--------- src/sender/sender.go | 9 ++++++++- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 0f7840b..12e618c 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -160,7 +160,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo overwritingOrReceiving := "Receiving" if utils.Exists(fstats.Name) || utils.Exists(fstats.SentName) { overwritingOrReceiving = "Overwriting" - if utils.Exists(progressFile) && !useWebsockets { + if utils.Exists(progressFile) { overwritingOrReceiving = "Resume receiving" resumeFile = true } @@ -200,8 +200,12 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo // await file var f *os.File - if utils.Exists(fstats.SentName) { - f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) + if utils.Exists(fstats.SentName) && resumeFile { + if !useWebsockets { + f, err = os.OpenFile(fstats.SentName, os.O_WRONLY, 0644) + } else { + f, err = os.OpenFile(fstats.SentName, os.O_APPEND, 0644) + } if err != nil { log.Error(err) return err @@ -212,9 +216,11 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo log.Error(err) return err } - if err = f.Truncate(fstats.Size); err != nil { - log.Error(err) - return err + if !useWebsockets { + if err = f.Truncate(fstats.Size); err != nil { + log.Error(err) + return err + } } } @@ -254,6 +260,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo var progressErr error if resumeFile { fProgress, progressErr = os.OpenFile(progressFile, os.O_APPEND, 0644) + bytesWritten = len(blocks) * blockSize } else { os.Remove(progressFile) fProgress, progressErr = os.Create(progressFile) @@ -266,7 +273,7 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo blocksWritten := 0.0 blocksToWrite := float64(fstats.Size) if useWebsockets { - blocksToWrite = blocksToWrite / float64(models.WEBSOCKET_BUFFER_SIZE/8) + blocksToWrite = blocksToWrite/float64(models.WEBSOCKET_BUFFER_SIZE/8) - float64(len(blocks)) } else { blocksToWrite = blocksToWrite/float64(models.TCP_BUFFER_SIZE/2) - float64(len(blocks)) } @@ -292,9 +299,8 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo pieces := bytes.SplitN(decrypted, []byte("-"), 2) decrypted = pieces[1] locationToWrite, _ = strconv.Atoi(string(pieces[0])) - log.Debugf("writing to location %d (%2.0f/%2.0f)", locationToWrite, blocksWritten, blocksToWrite) - fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) } + // do decompression if fstats.IsCompressed && !fstats.IsDir { decrypted = compress.Decompress(decrypted) @@ -307,9 +313,13 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo return err } n, err = f.WriteAt(decrypted, int64(locationToWrite)) + fProgress.WriteString(fmt.Sprintf("%d\n", locationToWrite)) + log.Debugf("writing to location %d (%2.0f/%2.0f)", locationToWrite, blocksWritten, blocksToWrite) } else { // write to file n, err = f.Write(decrypted) + log.Debugf("writing to location %d (%2.0f/%2.0f)", bytesWritten, blocksWritten, blocksToWrite) + fProgress.WriteString(fmt.Sprintf("%d\n", bytesWritten)) } if err != nil { diff --git a/src/sender/sender.go b/src/sender/sender.go index 03ce145..f386c41 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -225,7 +225,6 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, var blocks []string errBlocks := json.Unmarshal(message[5:], &blocks) if errBlocks == nil { - log.Debugf("found blocks: %+v", blocks) for _, block := range blocks { blockInt64, errBlock := strconv.Atoi(block) if errBlock == nil { @@ -233,6 +232,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } } } + log.Debugf("found blocks: %+v", blocksToSkip) // start streaming encryption/compression go func(dataChan chan DataChan) { @@ -341,12 +341,19 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, // send file, compure hash simultaneously startTransfer = time.Now() + blockSize := 0 + if useWebsockets { + blockSize = models.WEBSOCKET_BUFFER_SIZE / 8 + } else { + blockSize = models.TCP_BUFFER_SIZE / 2 + } bar := progressbar.NewOptions( int(fstats.Size), progressbar.OptionSetRenderBlankState(true), progressbar.OptionSetBytes(int(fstats.Size)), progressbar.OptionSetWriter(os.Stderr), ) + bar.Add(blockSize * len(blocksToSkip)) if useWebsockets { for {