diff --git a/src/recipient/recipient.go b/src/recipient/recipient.go index 6685f51..5d283b2 100644 --- a/src/recipient/recipient.go +++ b/src/recipient/recipient.go @@ -10,6 +10,7 @@ import ( "os" "strconv" "strings" + "sync" "time" humanize "github.com/dustin/go-humanize" @@ -296,8 +297,11 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo } else { log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) // using TCP + var wg sync.WaitGroup + wg.Add(len(tcpConnections)) for i := range tcpConnections { - go func(tcpConnection comm.Comm) { + go func(wg *sync.WaitGroup, tcpConnection comm.Comm) { + defer wg.Done() for { // read from TCP connection message, _, _, err = tcpConnection.Read() @@ -319,8 +323,9 @@ func receive(forceSend int, serverAddress string, tcpPorts []string, isLocal boo dataChan <- message } } - }(tcpConnections[i]) + }(&wg, tcpConnections[i]) } + wg.Wait() } _ = <-finished diff --git a/src/sender/sender.go b/src/sender/sender.go index 521edf9..cc5d6cb 100644 --- a/src/sender/sender.go +++ b/src/sender/sender.go @@ -180,6 +180,7 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, currentPostition := int64(0) for { bytesread, err := f.Read(buffer) + log.Debug(bytesread, err) if bytesread > 0 { // do compression var compressedBytes []byte @@ -232,16 +233,19 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, } } // finish + log.Debug("sending magic") dataChan <- DataChan{ b: []byte("magic"), - bytesRead: len([]byte("magic")), + bytesRead: 0, err: nil, } if !useWebsockets { + log.Debug("sending extra magic") for i := 0; i < len(tcpConnections)-1; i++ { + log.Debug("sending magic") dataChan <- DataChan{ b: []byte("magic"), - bytesRead: len([]byte("magic")), + bytesRead: 0, err: nil, } } @@ -343,9 +347,10 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, var wg sync.WaitGroup wg.Add(len(tcpConnections)) for i := range tcpConnections { - go func(dataChan <-chan DataChan, tcpConnection comm.Comm) { + go func(i int, wg *sync.WaitGroup, dataChan <-chan DataChan, tcpConnection comm.Comm) { defer wg.Done() for data := range dataChan { + log.Debugf("%d sending", i) if data.err != nil { log.Error(data.err) return @@ -359,11 +364,11 @@ func send(forceSend int, serverAddress string, tcpPorts []string, isLocal bool, return } if bytes.Equal(data.b, []byte("magic")) { + log.Debugf("%d got magic", i) return } - } - }(dataChan, tcpConnections[i]) + }(i, &wg, dataChan, tcpConnections[i]) } wg.Wait() }