package main import ( "fmt" "io" "math" "net" "os" "strconv" "strings" "sync" "time" log "github.com/sirupsen/logrus" ) // runClient spawns threads for parallel uplink/downlink via TCP func runClient(connectionType string, codePhrase string) { logger := log.WithFields(log.Fields{ "codePhrase": codePhrase, "connection": connectionType, }) var wg sync.WaitGroup wg.Add(numberConnections) for id := 0; id < numberConnections; id++ { go func(id int) { defer wg.Done() port := strconv.Itoa(27001 + id) connection, err := net.Dial("tcp", "localhost:"+port) if err != nil { panic(err) } defer connection.Close() message := receiveMessage(connection) logger.Infof("message: %s", message) sendMessage(connectionType+"."+codePhrase, connection) if connectionType == "s" { // this is a sender logger.Info("waiting for ok from relay") message = receiveMessage(connection) logger.Info("got ok from relay") // wait for pipe to be made time.Sleep(1 * time.Second) // Write data from file sendFileToClient(id, connection) // TODO: Release from connection pool // POST /release } else { // this is a receiver // receive file receiveFile(id, connection) // TODO: Release from connection pool // POST /release } }(id) } wg.Wait() } func receiveFile(id int, connection net.Conn) { bufferFileName := make([]byte, 64) bufferFileSize := make([]byte, 10) connection.Read(bufferFileSize) fileSize, _ := strconv.ParseInt(strings.Trim(string(bufferFileSize), ":"), 10, 64) connection.Read(bufferFileName) fileName = strings.Trim(string(bufferFileName), ":") os.Remove(fileName + "." + strconv.Itoa(id)) newFile, err := os.Create(fileName + "." + strconv.Itoa(id)) if err != nil { panic(err) } defer newFile.Close() var receivedBytes int64 for { if (fileSize - receivedBytes) < BUFFERSIZE { io.CopyN(newFile, connection, (fileSize - receivedBytes)) // Empty the remaining bytes that we don't need from the network buffer connection.Read(make([]byte, (receivedBytes+BUFFERSIZE)-fileSize)) break } io.CopyN(newFile, connection, BUFFERSIZE) //Increment the counter receivedBytes += BUFFERSIZE } } func sendFileToClient(id int, connection net.Conn) { logger := log.WithFields(log.Fields{ "function": "sendFileToClient #" + strconv.Itoa(id), }) defer connection.Close() //Open the file that needs to be send to the client file, err := os.Open(fileName) if err != nil { fmt.Println(err) return } defer file.Close() //Get the filename and filesize fileInfo, err := file.Stat() if err != nil { fmt.Println(err) return } numChunks := math.Ceil(float64(fileInfo.Size()) / float64(BUFFERSIZE)) chunksPerWorker := int(math.Ceil(numChunks / float64(numberConnections))) bytesPerConnection := int64(chunksPerWorker * BUFFERSIZE) if id+1 == numberConnections { bytesPerConnection = fileInfo.Size() - (numberConnections-1)*bytesPerConnection } fileSize := fillString(strconv.FormatInt(int64(bytesPerConnection), 10), 10) fileName := fillString(fileInfo.Name(), 64) if id == 0 || id == numberConnections-1 { logger.Infof("numChunks: %v", numChunks) logger.Infof("chunksPerWorker: %v", chunksPerWorker) logger.Infof("bytesPerConnection: %v", bytesPerConnection) logger.Infof("fileName: %v", fileInfo.Name()) } logger.Info("sending") connection.Write([]byte(fileSize)) connection.Write([]byte(fileName)) sendBuffer := make([]byte, BUFFERSIZE) chunkI := 0 for { _, err = file.Read(sendBuffer) if err == io.EOF { //End of file reached, break out of for loop logger.Info("EOF") break } if (chunkI >= chunksPerWorker*id && chunkI < chunksPerWorker*id+chunksPerWorker) || (id == numberConnections-1 && chunkI >= chunksPerWorker*id) { connection.Write(sendBuffer) } chunkI++ } fmt.Println("File has been sent, closing connection!") return }