diff --git a/test/client.go b/test/client.go deleted file mode 100644 index e239fc1..0000000 --- a/test/client.go +++ /dev/null @@ -1,53 +0,0 @@ -package main - -import ( - "fmt" - "net" - "strconv" - "sync" - "time" -) - -// runClient spawns threads for parallel uplink/downlink via TCP -func runClient(connectionType string, codePhrase string) { - var wg sync.WaitGroup - wg.Add(numberConnections) - for id := 0; id < numberConnections; id++ { - go func(id int) { - defer wg.Done() - port := strconv.Itoa(27001 + id) - connection, err := net.Dial("tcp", "localhost:"+port) - if err != nil { - panic(err) - } - defer connection.Close() - - message := receiveMessage(connection) - fmt.Println(message) - sendMessage(connectionType+"."+codePhrase, connection) - if connectionType == "s" { - message = receiveMessage(connection) - fmt.Println(message) - // Send file name - sendMessage("filename", connection) - // Send file size - time.Sleep(3 * time.Second) - sendMessage("filesize", connection) - // TODO: Write data from file - - // TODO: Release from connection pool - // POST /release - } else { - fileName := receiveMessage(connection) - fileSize := receiveMessage(connection) - fmt.Println(fileName, fileSize) - // TODO: Pull data and write to file - - // TODO: Release from connection pool - // POST /release - } - - }(id) - } - wg.Wait() -} diff --git a/test/connect.go b/test/connect.go new file mode 100644 index 0000000..34e9c61 --- /dev/null +++ b/test/connect.go @@ -0,0 +1,127 @@ +package main + +import ( + "fmt" + "io" + "math" + "net" + "os" + "strconv" + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +// runClient spawns threads for parallel uplink/downlink via TCP +func runClient(connectionType string, codePhrase string) { + logger := log.WithFields(log.Fields{ + "codePhrase": codePhrase, + "connection": connectionType, + }) + var wg sync.WaitGroup + wg.Add(numberConnections) + for id := 0; id < numberConnections; id++ { + go func(id int) { + defer wg.Done() + port := strconv.Itoa(27001 + id) + connection, err := net.Dial("tcp", "localhost:"+port) + if err != nil { + panic(err) + } + defer connection.Close() + + message := receiveMessage(connection) + logger.Infof("message: %s", message) + sendMessage(connectionType+"."+codePhrase, connection) + if connectionType == "s" { + logger.Info("waiting for ok from relay") + message = receiveMessage(connection) + logger.Info("got ok from relay") + // wait for pipe to be made + time.Sleep(1 * time.Second) + // Send file name + logger.Info("sending filename") + sendMessage("filename", connection) + // Send file size + time.Sleep(3 * time.Second) + logger.Info("sending filesize") + sendMessage("filesize", connection) + // TODO: Write data from file + + // TODO: Release from connection pool + // POST /release + } else { + fileName := receiveMessage(connection) + fileSize := receiveMessage(connection) + logger.Infof("fileName: %s", fileName) + logger.Infof("fileSize: %s", fileSize) + // TODO: Pull data and write to file + + // TODO: Release from connection pool + // POST /release + } + + }(id) + } + wg.Wait() +} + +func sendFileToClient(id int, connection net.Conn) { + logger := log.WithFields(log.Fields{ + "function": "sendFileToClient #" + strconv.Itoa(id), + }) + defer connection.Close() + //Open the file that needs to be send to the client + file, err := os.Open(fileName) + if err != nil { + fmt.Println(err) + return + } + defer file.Close() + //Get the filename and filesize + fileInfo, err := file.Stat() + if err != nil { + fmt.Println(err) + return + } + + numChunks := math.Ceil(float64(fileInfo.Size()) / float64(BUFFERSIZE)) + chunksPerWorker := int(math.Ceil(numChunks / float64(numberConnections))) + + bytesPerConnection := int64(chunksPerWorker * BUFFERSIZE) + if id+1 == numberConnections { + bytesPerConnection = fileInfo.Size() - (numberConnections-1)*bytesPerConnection + } + fileSize := fillString(strconv.FormatInt(int64(bytesPerConnection), 10), 10) + + fileName := fillString(fileInfo.Name(), 64) + + if id == 0 || id == numberConnections-1 { + logger.Infof("numChunks: %v", numChunks) + logger.Infof("chunksPerWorker: %v", chunksPerWorker) + logger.Infof("bytesPerConnection: %v", bytesPerConnection) + logger.Infof("fileName: %v", fileInfo.Name()) + } + + logger.Info("sending") + connection.Write([]byte(fileSize)) + connection.Write([]byte(fileName)) + sendBuffer := make([]byte, BUFFERSIZE) + + chunkI := 0 + for { + _, err = file.Read(sendBuffer) + if err == io.EOF { + //End of file reached, break out of for loop + logger.Info("EOF") + break + } + if (chunkI >= chunksPerWorker*id && chunkI < chunksPerWorker*id+chunksPerWorker) || (id == numberConnections-1 && chunkI >= chunksPerWorker*id) { + connection.Write(sendBuffer) + } + chunkI++ + } + fmt.Println("File has been sent, closing connection!") + return +} diff --git a/test/main.go b/test/main.go index 26108a5..e7a63d5 100644 --- a/test/main.go +++ b/test/main.go @@ -15,8 +15,10 @@ var server, file string // Global varaibles var serverAddress, fileName, codePhraseFlag, connectionTypeFlag string +var runAsRelay bool func main() { + flag.BoolVar(&runAsRelay, "relay", false, "run as relay") flag.StringVar(&serverAddress, "server", "", "(run as client) server address to connect to") flag.StringVar(&fileName, "file", "", "(run as server) file to serve") flag.StringVar(&codePhraseFlag, "code", "", "(run as server) file to serve") @@ -29,7 +31,7 @@ func main() { if file != "" { fileName = file } - if len(fileName) != 0 { + if runAsRelay { runServer() } else if len(serverAddress) != 0 { runClient(connectionTypeFlag, codePhraseFlag) diff --git a/test/server.go b/test/relay.go similarity index 69% rename from test/server.go rename to test/relay.go index 058c509..f3fe54f 100644 --- a/test/server.go +++ b/test/relay.go @@ -75,29 +75,35 @@ func listener(id int) (err error) { } func clientCommuncation(id int, connection net.Conn) { + logger := log.WithFields(log.Fields{ + "id": id, + "connection": connection.RemoteAddr().String(), + }) + logger.Info("Asking who?") sendMessage("who?", connection) message := receiveMessage(connection) connectionType := strings.Split(message, ".")[0] codePhrase := strings.Split(message, ".")[1] - // If reciever if connectionType == "s" { - fmt.Println("Got sender") + logger.Info("got sender") connections.Lock() connections.sender[codePhrase] = connection connections.Unlock() for { - fmt.Println("waiting for reciever") + logger.Info("waiting for reciever") connections.RLock() if _, ok := connections.reciever[codePhrase]; ok { + logger.Info("got reciever") connections.RUnlock() break } connections.RUnlock() time.Sleep(100 * time.Millisecond) } + logger.Info("telling sender ok") sendMessage("ok", connection) - fmt.Println("preparing pipe") + logger.Info("preparing pipe") connections.Lock() con1 := connections.sender[codePhrase] con2 := connections.reciever[codePhrase] @@ -142,3 +148,52 @@ func fillString(retunString string, toLength int) string { } return retunString } + +// chanFromConn creates a channel from a Conn object, and sends everything it +// Read()s from the socket to the channel. +func chanFromConn(conn net.Conn) chan []byte { + c := make(chan []byte) + + go func() { + b := make([]byte, BUFFERSIZE) + + for { + n, err := conn.Read(b) + if n > 0 { + res := make([]byte, n) + // Copy the buffer so it doesn't get changed while read by the recipient. + copy(res, b[:n]) + c <- res + } + if err != nil { + c <- nil + break + } + } + }() + + return c +} + +// Pipe creates a full-duplex pipe between the two sockets and transfers data from one to the other. +func Pipe(conn1 net.Conn, conn2 net.Conn) { + chan1 := chanFromConn(conn1) + chan2 := chanFromConn(conn2) + + for { + select { + case b1 := <-chan1: + if b1 == nil { + return + } else { + conn2.Write(b1) + } + case b2 := <-chan2: + if b2 == nil { + return + } else { + conn1.Write(b2) + } + } + } +} diff --git a/test/rendevouz.go b/test/rendevouz.go deleted file mode 100644 index 76c04e1..0000000 --- a/test/rendevouz.go +++ /dev/null @@ -1,97 +0,0 @@ -package main - -import ( - "net" -) - -// also see : http://archive.is/4Um4u - -// func runRendevouz() { -// // Listen on the specified TCP port on all interfaces. -// from := "0.0.0.0:27001" -// to := "0.0.0.0:27009" -// l, err := net.Listen("tcp", to) -// if err != nil { -// log.Fatal(err) -// } -// defer l.Close() -// for { -// // Wait for a connection. -// c, err := l.Accept() -// if err != nil { -// log.Fatal(err) -// } - -// // handle the connection in a goroutine -// go wormhole(c, from) -// } -// } - -// func relay(c net.Conn, from string) { -// defer c.Close() -// log.Println("Opening relay to", c.RemoteAddr()) - -// // connect to the destination tcp port -// destConn, err := net.Dial("tcp", *to) -// if err != nil { -// log.Fatal("Error connecting to destination port") -// } -// defer destConn.Close() -// log.Println("Wormhole open from", c.RemoteAddr()) - -// go func() { io.Copy(c, destConn) }() -// io.Copy(destConn, c) - -// log.Println("Stopping wormhole from", c.RemoteAddr()) -// } - -// BETTER? - -// chanFromConn creates a channel from a Conn object, and sends everything it -// Read()s from the socket to the channel. -func chanFromConn(conn net.Conn) chan []byte { - c := make(chan []byte) - - go func() { - b := make([]byte, BUFFERSIZE) - - for { - n, err := conn.Read(b) - if n > 0 { - res := make([]byte, n) - // Copy the buffer so it doesn't get changed while read by the recipient. - copy(res, b[:n]) - c <- res - } - if err != nil { - c <- nil - break - } - } - }() - - return c -} - -// Pipe creates a full-duplex pipe between the two sockets and transfers data from one to the other. -func Pipe(conn1 net.Conn, conn2 net.Conn) { - chan1 := chanFromConn(conn1) - chan2 := chanFromConn(conn2) - - for { - select { - case b1 := <-chan1: - if b1 == nil { - return - } else { - conn2.Write(b1) - } - case b2 := <-chan2: - if b2 == nil { - return - } else { - conn1.Write(b2) - } - } - } -}