diff --git a/src/croc/croc_test.go b/src/croc/croc_test.go index 65b07c6..ca08c3c 100644 --- a/src/croc/croc_test.go +++ b/src/croc/croc_test.go @@ -67,9 +67,9 @@ func TestSendReceiveLocalWebsockets(t *testing.T) { sendAndReceive(t, 1, true) } -// func TestSendReceiveLocalTCP(t *testing.T) { -// sendAndReceive(t, 2, true) -// } +func TestSendReceiveLocalTCP(t *testing.T) { + sendAndReceive(t, 2, true) +} func generateRandomFile(megabytes int) (fname string) { // generate a random file diff --git a/src/croc/recipient.go b/src/croc/recipient.go index d88afef..63bdf68 100644 --- a/src/croc/recipient.go +++ b/src/croc/recipient.go @@ -82,9 +82,26 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, // both parties should have a weak key pw := []byte(codephrase) + // start the reader + websocketMessages := make(chan WebSocketMessage, 1024) + go func() { + defer func() { + if r := recover(); r != nil { + log.Debugf("recovered from %s", r) + } + }() + for { + messageType, message, err := c.ReadMessage() + websocketMessages <- WebSocketMessage{messageType, message, err} + } + }() + step := 0 for { - messageType, message, err := c.ReadMessage() + websocketMessage := <-websocketMessages + messageType := websocketMessage.messageType + message := websocketMessage.message + err := websocketMessage.err if err != nil { return err } @@ -151,6 +168,7 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, // initialize TCP connections if using (possible, but unlikely, race condition) go func() { + log.Debug("initializing TCP connections") if !useWebsockets { log.Debugf("connecting to server") tcpConnections = make([]comm.Comm, len(tcpPorts)) @@ -406,29 +424,20 @@ func (cr *Croc) receive(forceSend int, serverAddress string, tcpPorts []string, startTime := time.Now() if useWebsockets { for { - var messageType int // read from websockets - messageType, message, err = c.ReadMessage() - if messageType != websocket.BinaryMessage { + websocketMessageData := <-websocketMessages + if websocketMessageData.messageType != websocket.BinaryMessage { continue } if err != nil { log.Error(err) return err } - if bytes.Equal(message, []byte("magic")) { + if bytes.Equal(websocketMessageData.message, []byte("magic")) { log.Debug("got magic") break } - dataChan <- message - // select { - // case dataChan <- message: - // default: - // log.Debug("blocked") - // // no message sent - // // block - // dataChan <- message - // } + dataChan <- websocketMessageData.message } } else { log.Debugf("starting listening with tcp with %d connections", len(tcpConnections)) diff --git a/src/croc/sender.go b/src/croc/sender.go index bea4a44..1ae0bee 100644 --- a/src/croc/sender.go +++ b/src/croc/sender.go @@ -102,9 +102,26 @@ func (cr *Croc) send(forceSend int, serverAddress string, tcpPorts []string, isL return } + // start the reader + websocketMessages := make(chan WebSocketMessage, 1024) + go func() { + defer func() { + if r := recover(); r != nil { + log.Debugf("recovered from %s", r) + } + }() + for { + messageType, message, err := c.ReadMessage() + websocketMessages <- WebSocketMessage{messageType, message, err} + } + }() + step := 0 for { - messageType, message, errRead := c.ReadMessage() + websocketMessage := <-websocketMessages + messageType := websocketMessage.messageType + message := websocketMessage.message + errRead := websocketMessage.err if errRead != nil { return errRead }