This commit is contained in:
Zack Scholl 2018-09-25 19:25:27 -07:00
parent f555dc8dbf
commit a59399630d
2 changed files with 17 additions and 7 deletions

View File

@ -10,6 +10,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
humanize "github.com/dustin/go-humanize" humanize "github.com/dustin/go-humanize"
@ -296,8 +297,11 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo
} else { } else {
log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) log.Debugf("starting listening with tcp with %d connections", len(tcpConnections))
// using TCP // using TCP
var wg sync.WaitGroup
wg.Add(len(tcpConnections))
for i := range tcpConnections { for i := range tcpConnections {
go func(tcpConnection comm.Comm) { go func(wg *sync.WaitGroup, tcpConnection comm.Comm) {
defer wg.Done()
for { for {
// read from TCP connection // read from TCP connection
message, _, _, err = tcpConnection.Read() message, _, _, err = tcpConnection.Read()
@ -319,8 +323,9 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo
dataChan <- message dataChan <- message
} }
} }
}(tcpConnections[i]) }(&wg, tcpConnections[i])
} }
wg.Wait()
} }
_ = <-finished _ = <-finished

View File

@ -180,6 +180,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool,
currentPostition := int64(0) currentPostition := int64(0)
for { for {
bytesread, err := f.Read(buffer) bytesread, err := f.Read(buffer)
log.Debug(bytesread, err)
if bytesread > 0 { if bytesread > 0 {
// do compression // do compression
var compressedBytes []byte var compressedBytes []byte
@ -232,16 +233,19 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool,
} }
} }
// finish // finish
log.Debug("sending magic")
dataChan <- DataChan{ dataChan <- DataChan{
b: []byte("magic"), b: []byte("magic"),
bytesRead: len([]byte("magic")), bytesRead: 0,
err: nil, err: nil,
} }
if !useWebsockets { if !useWebsockets {
log.Debug("sending extra magic")
for i := 0; i < len(tcpConnections)-1; i++ { for i := 0; i < len(tcpConnections)-1; i++ {
log.Debug("sending magic")
dataChan <- DataChan{ dataChan <- DataChan{
b: []byte("magic"), b: []byte("magic"),
bytesRead: len([]byte("magic")), bytesRead: 0,
err: nil, err: nil,
} }
} }
@ -343,9 +347,10 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool,
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(tcpConnections)) wg.Add(len(tcpConnections))
for i := range tcpConnections { for i := range tcpConnections {
go func(dataChan <-chan DataChan, tcpConnection comm.Comm) { go func(i int, wg *sync.WaitGroup, dataChan <-chan DataChan, tcpConnection comm.Comm) {
defer wg.Done() defer wg.Done()
for data := range dataChan { for data := range dataChan {
log.Debugf("%d sending", i)
if data.err != nil { if data.err != nil {
log.Error(data.err) log.Error(data.err)
return return
@ -359,11 +364,11 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool,
return return
} }
if bytes.Equal(data.b, []byte("magic")) { if bytes.Equal(data.b, []byte("magic")) {
log.Debugf("%d got magic", i)
return return
} }
} }
}(dataChan, tcpConnections[i]) }(i, &wg, dataChan, tcpConnections[i])
} }
wg.Wait() wg.Wait()
} }