allow resume

This commit is contained in:
Zack Scholl 2019-04-30 07:29:02 -07:00
parent 4f20f3ce43
commit 6da93ae8da
3 changed files with 60 additions and 22 deletions

View File

@ -70,9 +70,11 @@ type Client struct {
FilesToTransferCurrentNum int
// send / receive information of current file
CurrentFile *os.File
CurrentFileChunks []int64
TotalSent int64
CurrentFile *os.File
CurrentFileChunks []int64
TotalSent int64
TotalChunksTransfered int
chunkMap map[uint64]struct{}
// tcp connections
conn []*comm.Comm
@ -381,6 +383,11 @@ func (c *Client) processMessage(payload []byte) (done bool, err error) {
}
c.FilesToTransferCurrentNum = remoteFile.FilesToTransferCurrentNum
c.CurrentFileChunks = remoteFile.CurrentFileChunks
log.Debugf("current file chunks: %+v", c.CurrentFileChunks)
c.chunkMap = make(map[uint64]struct{})
for _, chunk := range c.CurrentFileChunks {
c.chunkMap[uint64(chunk)] = struct{}{}
}
c.Step3RecipientRequestFile = true
case "close-sender":
log.Debug("close-sender received...")
@ -504,6 +511,13 @@ func (c *Client) updateState() (err error) {
c.TotalSent = 0
// recipient requests the file and chunks (if empty, then should receive all chunks)
// TODO: determine the missing chunks
c.CurrentFileChunks = utils.MissingChunks(
pathToFile,
c.FilesToTransfer[c.FilesToTransferCurrentNum].Size,
models.TCP_BUFFER_SIZE/2,
)
c.bar.Add(len(c.CurrentFileChunks) * models.TCP_BUFFER_SIZE / 2)
bRequest, _ := json.Marshal(RemoteFileRequest{
CurrentFileChunks: c.CurrentFileChunks,
FilesToTransferCurrentNum: c.FilesToTransferCurrentNum,
@ -533,6 +547,7 @@ func (c *Client) updateState() (err error) {
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionThrottle(100*time.Millisecond),
)
c.bar.Add(len(c.CurrentFileChunks) * models.TCP_BUFFER_SIZE / 2)
c.TotalSent = 0
for i := 1; i < len(c.Options.RelayPorts); i++ {
go c.sendData(i)
@ -573,8 +588,9 @@ func (c *Client) receiveData(i int) {
}
c.bar.Add(len(data[8:]))
c.TotalSent += int64(len(data[8:]))
c.TotalChunksTransfered++
log.Debugf("block: %+v", positionInt64)
if c.TotalSent == c.FilesToTransfer[c.FilesToTransferCurrentNum].Size {
if c.TotalChunksTransfered == len(c.CurrentFileChunks) || c.TotalSent == c.FilesToTransfer[c.FilesToTransferCurrentNum].Size {
log.Debug("finished receiving!")
c.CurrentFile.Close()
log.Debug("sending close-sender")
@ -619,25 +635,41 @@ func (c *Client) sendData(i int) {
}
if math.Mod(curi, float64(len(c.Options.RelayPorts)-1))+1 == float64(i) {
posByte := make([]byte, 8)
binary.LittleEndian.PutUint64(posByte, pos)
dataToSend, err := c.Key.Encrypt(
compress.Compress(
append(posByte, data[:n]...),
),
)
if err != nil {
panic(err)
// check to see if this is a chunk that the recipient wants
usableChunk := true
c.mutex.Lock()
if len(c.chunkMap) != 0 {
if _, ok := c.chunkMap[pos]; !ok {
usableChunk = false
} else {
delete(c.chunkMap, pos)
}
}
c.mutex.Unlock()
if usableChunk {
log.Debugf("sending chunk %d", pos)
posByte := make([]byte, 8)
binary.LittleEndian.PutUint64(posByte, pos)
err = c.conn[i].Send(dataToSend)
if err != nil {
panic(err)
dataToSend, err := c.Key.Encrypt(
compress.Compress(
append(posByte, data[:n]...),
),
)
if err != nil {
panic(err)
}
err = c.conn[i].Send(dataToSend)
if err != nil {
panic(err)
}
c.bar.Add(n)
c.TotalSent += int64(n)
// time.Sleep(100 * time.Millisecond)
} else {
log.Debugf("skipping chunk %d", pos)
}
c.bar.Add(n)
c.TotalSent += int64(n)
// time.Sleep(100 * time.Millisecond)
}
curi++

View File

@ -129,18 +129,19 @@ func MissingChunks(fname string, fsize int64, chunkSize int) (chunks []int64) {
}
defer f.Close()
buffer := make([]byte, chunkSize)
emptyBuffer := make([]byte, chunkSize)
chunkNum := 0
chunks = make([]int64, int64(math.Ceil(float64(fsize)/float64(chunkSize))))
var currentLocation int64
for {
buffer := make([]byte, chunkSize)
bytesread, err := f.Read(buffer)
if err != nil {
break
}
if bytes.Equal(buffer[:bytesread], emptyBuffer[:bytesread]) {
chunks[chunkNum] = currentLocation
chunkNum++
}
currentLocation += int64(bytesread)
}

View File

@ -16,7 +16,7 @@ func TestExists(t *testing.T) {
func TestHashFile(t *testing.T) {
b, err := HashFile("utils.go")
assert.Nil(t, err)
assert.Equal(t, "9a66e5c18b9759073666953da376c037", fmt.Sprintf("%x", b))
assert.Equal(t, "6d39c2f3468e0d5869e0c9b349503175", fmt.Sprintf("%x", b))
}
// SHA256 returns sha256 sum
@ -27,3 +27,8 @@ func TestSHA256(t *testing.T) {
func TestByteCountDecimal(t *testing.T) {
assert.Equal(t, "10.0 kB", ByteCountDecimal(10000))
}
func TestMissingChunks(t *testing.T) {
chunks := MissingChunks("test",11346432,1024 * 32)
assert.Equal(t,202,len(chunks))
}