diff --git a/test/connect.go b/test/connect.go index 1e37a89..7e125d1 100644 --- a/test/connect.go +++ b/test/connect.go @@ -11,9 +11,12 @@ import ( "sync" "time" + "github.com/gosuri/uiprogress" log "github.com/sirupsen/logrus" ) +var bars []*uiprogress.Bar + // runClient spawns threads for parallel uplink/downlink via TCP func runClient(connectionType string, codePhrase string) { logger := log.WithFields(log.Fields{ @@ -22,6 +25,10 @@ func runClient(connectionType string, codePhrase string) { }) var wg sync.WaitGroup wg.Add(numberConnections) + + uiprogress.Start() + bars = make([]*uiprogress.Bar, numberConnections) + fileNameToReceive := "" for id := 0; id < numberConnections; id++ { go func(id int) { defer wg.Done() @@ -33,66 +40,108 @@ func runClient(connectionType string, codePhrase string) { defer connection.Close() message := receiveMessage(connection) - logger.Infof("message: %s", message) + logger.Debugf("relay says: %s", message) + logger.Debugf("telling relay: %s", connectionType+"."+codePhrase) sendMessage(connectionType+"."+codePhrase, connection) if connectionType == "s" { // this is a sender - logger.Info("waiting for ok from relay") + logger.Debug("waiting for ok from relay") message = receiveMessage(connection) - logger.Info("got ok from relay") + logger.Debug("got ok from relay") // wait for pipe to be made - time.Sleep(1 * time.Second) + time.Sleep(100 * time.Millisecond) // Write data from file - sendFileToClient(id, connection) - - // TODO: Release from connection pool - // POST /release + logger.Debug("send file") + sendFile(id, connection) } else { // this is a receiver // receive file - receiveFile(id, connection) - - // TODO: Release from connection pool - // POST /release + logger.Debug("receive file") + fileNameToReceive = receiveFile(id, connection) } }(id) } wg.Wait() + + if connectionType == "r" { + catFile(fileNameToReceive) + } } -func receiveFile(id int, connection net.Conn) { +func catFile(fileNameToReceive string) { + // cat the file + os.Remove(fileNameToReceive) + finished, err := os.Create(fileNameToReceive) + defer finished.Close() + if err != nil { + log.Fatal(err) + } + for id := 0; id < numberConnections; id++ { + fh, err := os.Open(fileNameToReceive + "." + strconv.Itoa(id)) + if err != nil { + log.Fatal(err) + } + + _, err = io.Copy(finished, fh) + if err != nil { + log.Fatal(err) + } + fh.Close() + os.Remove(fileNameToReceive + "." + strconv.Itoa(id)) + } + + fmt.Println("\n\n\nDownloaded " + fileNameToReceive + "!") +} + +func receiveFile(id int, connection net.Conn) string { + logger := log.WithFields(log.Fields{ + "function": "receiveFile #" + strconv.Itoa(id), + }) bufferFileName := make([]byte, 64) bufferFileSize := make([]byte, 10) + logger.Debug("waiting for file size") connection.Read(bufferFileSize) fileSize, _ := strconv.ParseInt(strings.Trim(string(bufferFileSize), ":"), 10, 64) + logger.Debugf("filesize: %d", fileSize) connection.Read(bufferFileName) - fileName = strings.Trim(string(bufferFileName), ":") - os.Remove(fileName + "." + strconv.Itoa(id)) - newFile, err := os.Create(fileName + "." + strconv.Itoa(id)) + fileNameToReceive := strings.Trim(string(bufferFileName), ":") + logger.Debugf("fileName: %v", fileNameToReceive) + os.Remove(fileNameToReceive + "." + strconv.Itoa(id)) + newFile, err := os.Create(fileNameToReceive + "." + strconv.Itoa(id)) if err != nil { panic(err) } defer newFile.Close() + bars[id] = uiprogress.AddBar(int(fileSize)/1024 + 1).AppendCompleted().PrependElapsed() + + logger.Debug("waiting for file") var receivedBytes int64 for { + bars[id].Incr() if (fileSize - receivedBytes) < BUFFERSIZE { + logger.Debug("at the end") io.CopyN(newFile, connection, (fileSize - receivedBytes)) // Empty the remaining bytes that we don't need from the network buffer - connection.Read(make([]byte, (receivedBytes+BUFFERSIZE)-fileSize)) + if (receivedBytes+BUFFERSIZE)-fileSize < BUFFERSIZE { + logger.Debug("empty remaining bytes from network buffer") + connection.Read(make([]byte, (receivedBytes+BUFFERSIZE)-fileSize)) + } break } io.CopyN(newFile, connection, BUFFERSIZE) //Increment the counter receivedBytes += BUFFERSIZE } + logger.Debug("received file") + return fileNameToReceive } -func sendFileToClient(id int, connection net.Conn) { +func sendFile(id int, connection net.Conn) { logger := log.WithFields(log.Fields{ - "function": "sendFileToClient #" + strconv.Itoa(id), + "function": "sendFile #" + strconv.Itoa(id), }) defer connection.Close() //Open the file that needs to be send to the client @@ -118,18 +167,18 @@ func sendFileToClient(id int, connection net.Conn) { } fileSize := fillString(strconv.FormatInt(int64(bytesPerConnection), 10), 10) - fileName := fillString(fileInfo.Name(), 64) + fileNameToSend := fillString(fileInfo.Name(), 64) if id == 0 || id == numberConnections-1 { - logger.Infof("numChunks: %v", numChunks) - logger.Infof("chunksPerWorker: %v", chunksPerWorker) - logger.Infof("bytesPerConnection: %v", bytesPerConnection) - logger.Infof("fileName: %v", fileInfo.Name()) + logger.Debugf("numChunks: %v", numChunks) + logger.Debugf("chunksPerWorker: %v", chunksPerWorker) + logger.Debugf("bytesPerConnection: %v", bytesPerConnection) + logger.Debugf("fileNameToSend: %v", fileInfo.Name()) } - logger.Info("sending") + logger.Debugf("sending %s", fileSize) connection.Write([]byte(fileSize)) - connection.Write([]byte(fileName)) + connection.Write([]byte(fileNameToSend)) sendBuffer := make([]byte, BUFFERSIZE) chunkI := 0 @@ -137,7 +186,7 @@ func sendFileToClient(id int, connection net.Conn) { _, err = file.Read(sendBuffer) if err == io.EOF { //End of file reached, break out of for loop - logger.Info("EOF") + logger.Debug("EOF") break } if (chunkI >= chunksPerWorker*id && chunkI < chunksPerWorker*id+chunksPerWorker) || (id == numberConnections-1 && chunkI >= chunksPerWorker*id) { @@ -145,6 +194,6 @@ func sendFileToClient(id int, connection net.Conn) { } chunkI++ } - fmt.Println("File has been sent, closing connection!") + logger.Debug("file is sent") return } diff --git a/test/main.go b/test/main.go index e7a63d5..807a39e 100644 --- a/test/main.go +++ b/test/main.go @@ -3,22 +3,24 @@ package main import ( "flag" "fmt" + "os" log "github.com/sirupsen/logrus" ) const BUFFERSIZE = 1024 -const numberConnections = 1 +const numberConnections = 4 // Build flags var server, file string // Global varaibles var serverAddress, fileName, codePhraseFlag, connectionTypeFlag string -var runAsRelay bool +var runAsRelay, debugFlag bool func main() { flag.BoolVar(&runAsRelay, "relay", false, "run as relay") + flag.BoolVar(&debugFlag, "debug", false, "debug mode") flag.StringVar(&serverAddress, "server", "", "(run as client) server address to connect to") flag.StringVar(&fileName, "file", "", "(run as server) file to serve") flag.StringVar(&codePhraseFlag, "code", "", "(run as server) file to serve") @@ -31,6 +33,22 @@ func main() { if file != "" { fileName = file } + + if len(fileName) > 0 { + _, err := os.Open(fileName) + if err != nil { + log.Fatal(err) + return + } + } + + log.SetFormatter(&log.TextFormatter{}) + if debugFlag { + log.SetLevel(log.DebugLevel) + } else { + log.SetLevel(log.WarnLevel) + } + if runAsRelay { runServer() } else if len(serverAddress) != 0 { @@ -39,16 +57,3 @@ func main() { fmt.Println("You must specify either -file (for running as a server) or -server (for running as a client)") } } - -func init() { - // Log as JSON instead of the default ASCII formatter. - // log.SetFormatter(&log.JSONFormatter{}) - log.SetFormatter(&log.TextFormatter{}) - - // Output to stdout instead of the default stderr - // Can be any io.Writer, see below for File example - // log.SetOutput(os.Stdout) - - // Only log the warning severity or above. - log.SetLevel(log.DebugLevel) -} diff --git a/test/relay.go b/test/relay.go index f3fe54f..8bb228b 100644 --- a/test/relay.go +++ b/test/relay.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "net" "strconv" "strings" @@ -31,7 +30,7 @@ func runServer() { logger := log.WithFields(log.Fields{ "function": "main", }) - logger.Info("Initializing") + logger.Debug("Initializing") var wg sync.WaitGroup wg.Add(numberConnections) for id := 0; id < numberConnections; id++ { @@ -62,62 +61,60 @@ func listener(id int) (err error) { return errors.Wrap(err, "Error listening on "+serverAddress+":"+port) } defer server.Close() - logger.Info("waiting for connections") + logger.Debug("waiting for connections") //Spawn a new goroutine whenever a client connects for { connection, err := server.Accept() if err != nil { return errors.Wrap(err, "problem accepting connection") } - logger.Infof("Client %s connected", connection.RemoteAddr().String()) + logger.Debugf("Client %s connected", connection.RemoteAddr().String()) go clientCommuncation(id, connection) } } func clientCommuncation(id int, connection net.Conn) { - logger := log.WithFields(log.Fields{ - "id": id, - "connection": connection.RemoteAddr().String(), - }) - logger.Info("Asking who?") sendMessage("who?", connection) message := receiveMessage(connection) connectionType := strings.Split(message, ".")[0] - codePhrase := strings.Split(message, ".")[1] + codePhrase := strings.Split(message, ".")[1] + "-" + strconv.Itoa(id) + logger := log.WithFields(log.Fields{ + "id": id, + "codePhrase": codePhrase, + }) if connectionType == "s" { - logger.Info("got sender") + logger.Debug("got sender") connections.Lock() connections.sender[codePhrase] = connection connections.Unlock() for { - logger.Info("waiting for reciever") connections.RLock() if _, ok := connections.reciever[codePhrase]; ok { - logger.Info("got reciever") + logger.Debug("got reciever") connections.RUnlock() break } connections.RUnlock() time.Sleep(100 * time.Millisecond) } - logger.Info("telling sender ok") + logger.Debug("telling sender ok") sendMessage("ok", connection) - logger.Info("preparing pipe") + logger.Debug("preparing pipe") connections.Lock() con1 := connections.sender[codePhrase] con2 := connections.reciever[codePhrase] connections.Unlock() - fmt.Println("piping connections") + logger.Debug("piping connections") Pipe(con1, con2) - fmt.Println("done piping") + logger.Debug("done piping") connections.Lock() delete(connections.sender, codePhrase) delete(connections.reciever, codePhrase) connections.Unlock() - fmt.Println("deleted " + codePhrase) + logger.Debug("deleted sender and receiver") } else { - fmt.Println("Got reciever") + logger.Debug("got reciever") connections.Lock() connections.reciever[codePhrase] = connection connections.Unlock() @@ -132,8 +129,7 @@ func sendMessage(message string, connection net.Conn) { func receiveMessage(connection net.Conn) string { messageByte := make([]byte, BUFFERSIZE) - n, err := connection.Read(messageByte) - fmt.Println(n, err) + connection.Read(messageByte) return strings.Replace(string(messageByte), ":", "", -1) }