diff --git a/src/croc/croc.go b/src/croc/croc.go index 06a074b..1cec896 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -70,9 +70,11 @@ type Client struct { FilesToTransferCurrentNum int // send / receive information of current file - CurrentFile *os.File - CurrentFileChunks []int64 - TotalSent int64 + CurrentFile *os.File + CurrentFileChunks []int64 + TotalSent int64 + TotalChunksTransfered int + chunkMap map[uint64]struct{} // tcp connections conn []*comm.Comm @@ -381,6 +383,11 @@ func (c *Client) processMessage(payload []byte) (done bool, err error) { } c.FilesToTransferCurrentNum = remoteFile.FilesToTransferCurrentNum c.CurrentFileChunks = remoteFile.CurrentFileChunks + log.Debugf("current file chunks: %+v", c.CurrentFileChunks) + c.chunkMap = make(map[uint64]struct{}) + for _, chunk := range c.CurrentFileChunks { + c.chunkMap[uint64(chunk)] = struct{}{} + } c.Step3RecipientRequestFile = true case "close-sender": log.Debug("close-sender received...") @@ -504,6 +511,13 @@ func (c *Client) updateState() (err error) { c.TotalSent = 0 // recipient requests the file and chunks (if empty, then should receive all chunks) + // TODO: determine the missing chunks + c.CurrentFileChunks = utils.MissingChunks( + pathToFile, + c.FilesToTransfer[c.FilesToTransferCurrentNum].Size, + models.TCP_BUFFER_SIZE/2, + ) + c.bar.Add(len(c.CurrentFileChunks) * models.TCP_BUFFER_SIZE / 2) bRequest, _ := json.Marshal(RemoteFileRequest{ CurrentFileChunks: c.CurrentFileChunks, FilesToTransferCurrentNum: c.FilesToTransferCurrentNum, @@ -533,6 +547,7 @@ func (c *Client) updateState() (err error) { progressbar.OptionSetWriter(os.Stderr), progressbar.OptionThrottle(100*time.Millisecond), ) + c.bar.Add(len(c.CurrentFileChunks) * models.TCP_BUFFER_SIZE / 2) c.TotalSent = 0 for i := 1; i < len(c.Options.RelayPorts); i++ { go c.sendData(i) @@ -573,8 +588,9 @@ func (c *Client) receiveData(i int) { } c.bar.Add(len(data[8:])) c.TotalSent += int64(len(data[8:])) + c.TotalChunksTransfered++ log.Debugf("block: %+v", positionInt64) - if c.TotalSent == c.FilesToTransfer[c.FilesToTransferCurrentNum].Size { + if c.TotalChunksTransfered == len(c.CurrentFileChunks) || c.TotalSent == c.FilesToTransfer[c.FilesToTransferCurrentNum].Size { log.Debug("finished receiving!") c.CurrentFile.Close() log.Debug("sending close-sender") @@ -619,25 +635,41 @@ func (c *Client) sendData(i int) { } if math.Mod(curi, float64(len(c.Options.RelayPorts)-1))+1 == float64(i) { - posByte := make([]byte, 8) - binary.LittleEndian.PutUint64(posByte, pos) - - dataToSend, err := c.Key.Encrypt( - compress.Compress( - append(posByte, data[:n]...), - ), - ) - if err != nil { - panic(err) + // check to see if this is a chunk that the recipient wants + usableChunk := true + c.mutex.Lock() + if len(c.chunkMap) != 0 { + if _, ok := c.chunkMap[pos]; !ok { + usableChunk = false + } else { + delete(c.chunkMap, pos) + } } + c.mutex.Unlock() + if usableChunk { + log.Debugf("sending chunk %d", pos) + posByte := make([]byte, 8) + binary.LittleEndian.PutUint64(posByte, pos) - err = c.conn[i].Send(dataToSend) - if err != nil { - panic(err) + dataToSend, err := c.Key.Encrypt( + compress.Compress( + append(posByte, data[:n]...), + ), + ) + if err != nil { + panic(err) + } + + err = c.conn[i].Send(dataToSend) + if err != nil { + panic(err) + } + c.bar.Add(n) + c.TotalSent += int64(n) + // time.Sleep(100 * time.Millisecond) + } else { + log.Debugf("skipping chunk %d", pos) } - c.bar.Add(n) - c.TotalSent += int64(n) - // time.Sleep(100 * time.Millisecond) } curi++ diff --git a/src/utils/utils.go b/src/utils/utils.go index 32f5989..36460c4 100644 --- a/src/utils/utils.go +++ b/src/utils/utils.go @@ -129,18 +129,19 @@ func MissingChunks(fname string, fsize int64, chunkSize int) (chunks []int64) { } defer f.Close() - buffer := make([]byte, chunkSize) emptyBuffer := make([]byte, chunkSize) chunkNum := 0 chunks = make([]int64, int64(math.Ceil(float64(fsize)/float64(chunkSize)))) var currentLocation int64 for { + buffer := make([]byte, chunkSize) bytesread, err := f.Read(buffer) if err != nil { break } if bytes.Equal(buffer[:bytesread], emptyBuffer[:bytesread]) { chunks[chunkNum] = currentLocation + chunkNum++ } currentLocation += int64(bytesread) } diff --git a/src/utils/utils_test.go b/src/utils/utils_test.go index 2b8ab8e..d8b084d 100644 --- a/src/utils/utils_test.go +++ b/src/utils/utils_test.go @@ -16,7 +16,7 @@ func TestExists(t *testing.T) { func TestHashFile(t *testing.T) { b, err := HashFile("utils.go") assert.Nil(t, err) - assert.Equal(t, "9a66e5c18b9759073666953da376c037", fmt.Sprintf("%x", b)) + assert.Equal(t, "6d39c2f3468e0d5869e0c9b349503175", fmt.Sprintf("%x", b)) } // SHA256 returns sha256 sum @@ -27,3 +27,8 @@ func TestSHA256(t *testing.T) { func TestByteCountDecimal(t *testing.T) { assert.Equal(t, "10.0 kB", ByteCountDecimal(10000)) } + +func TestMissingChunks(t *testing.T) { + chunks := MissingChunks("test",11346432,1024 * 32) + assert.Equal(t,202,len(chunks)) +}