From ce8135002c981447078da47d2d14690e09d4329d Mon Sep 17 00:00:00 2001 From: Zack Scholl Date: Thu, 28 Jun 2018 07:46:16 -0700 Subject: [PATCH] add api and divide into relay+server --- main.go | 2 +- src/api.go | 51 +++++++++++++ src/models.go | 65 +++++++++++----- src/relay.go | 201 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/server.go | 36 ++++----- 5 files changed, 311 insertions(+), 44 deletions(-) create mode 100644 src/api.go create mode 100644 src/relay.go diff --git a/main.go b/main.go index 5791014..edbc668 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,7 @@ package main import croc "github.com/schollz/croc/src" func main() { - err := croc.RunRelay("8002") + err := croc.Relay([]string{"27001", "27002", "27003", "27004"}, "8002") if err != nil { panic(err) } diff --git a/src/api.go b/src/api.go new file mode 100644 index 0000000..cee4eb7 --- /dev/null +++ b/src/api.go @@ -0,0 +1,51 @@ +package croc + +import "time" + +type Croc struct { + tcpPorts []string + serverPort string + timeout time.Duration + useEncryption bool + useCompression bool + curveType string +} + +// Init will initialize the croc relay +func Init() (c *Croc) { + c = new(Croc) + c.tcpPorts = []string{"27001", "27002", "27003", "27004"} + c.serverPort = "8003" + c.timeout = 10 * time.Minute + c.useEncryption = true + c.useCompression = true + c.curveType = "p521" + return +} + +// TODO: +// OptionTimeout +// OptionCurve +// OptionUseEncryption +// OptionUseCompression + +// Relay initiates a relay +func (c *Croc) Relay() error { + // start relay + go startRelay(c.tcpPorts) + + // start server + return startServer(c.tcpPorts, c.serverPort) +} + +// Send will take an existing file or folder and send it through the croc relay +func (c *Croc) Send(fname string) (err error) { + + return +} + +// Receive will receive something through the croc relay +func (c *Croc) Receive() (err error) { + + return +} diff --git a/src/models.go b/src/models.go index cbb1556..a3073ea 100644 --- a/src/models.go +++ b/src/models.go @@ -1,23 +1,48 @@ package croc -import "crypto/elliptic" +import ( + "crypto/elliptic" + "net" +) + +const ( + // maximum buffer size for initial TCP communication + bufferSize = 1024 +) + +var ( + // availableStates are the states available to the parties involved + availableStates = []string{"curve", "h_k", "hh_k", "x", "y"} +) type channelData struct { - // public - Name string `json:"name,omitempty"` - State map[string][]byte + // Public + // Name is the name of the channel + Name string `json:"name,omitempty"` + // State contains state variables that are public to both parties + State map[string][]byte `json:"state"` + // TransferReady is set by the relaying when both parties have connected + // with their credentials + TransferReady bool `json:"transfer_ready"` + // Ports returns which TCP ports to connect to + Ports []string `json:"ports"` - // private - stapled bool - uuids [2]string // 0 is sender, 1 is recipient - curve elliptic.Curve + // Private + // isopen determine whether or not the channel has been opened + isopen bool + // store a UUID of the parties to prevent other parties from joining + uuids [2]string // 0 is sender, 1 is recipient + // curve is the type of elliptic curve used for PAKE + curve elliptic.Curve + // connection information is stored when the clients do connect over TCP + connection [2]net.Conn } type response struct { // various responses - Channel string `json:"channel,omitempty"` - UUID string `json:"uuid,omitempty"` - State map[string][]byte `json:"state,omitempty"` + Channel string `json:"channel,omitempty"` + UUID string `json:"uuid,omitempty"` + Data *channelData `json:"data,omitempty"` // constant responses Success bool `json:"success"` @@ -25,9 +50,13 @@ type response struct { } type payloadOpen struct { + // Channel is used to designate the channel of interest Channel string `json:"channel"` - Role int `json:"role"` - Curve string `json:"curve"` + // Role designates which role the person will take; + // 0 for sender and 1 for recipient. + Role int `json:"role"` + // Curve is the curve to be used. + Curve string `json:"curve"` } type payloadChannel struct { @@ -41,12 +70,8 @@ func newChannelData(name string) (cd *channelData) { cd = new(channelData) cd.Name = name cd.State = make(map[string][]byte) - cd.State["x"] = []byte{} - cd.State["curve"] = []byte{} - cd.State["y"] = []byte{} - cd.State["hh_k"] = []byte{} - cd.State["sender_ready"] = []byte{0} - cd.State["recipient_ready"] = []byte{0} - cd.State["is_open"] = []byte{0} + for _, state := range availableStates { + cd.State[state] = []byte{} + } return } diff --git a/src/relay.go b/src/relay.go new file mode 100644 index 0000000..7d4fe6e --- /dev/null +++ b/src/relay.go @@ -0,0 +1,201 @@ +package croc + +import ( + "net" + "strings" + "sync" + "time" + + log "github.com/cihub/seelog" + "github.com/pkg/errors" +) + +func startRelay(ports []string) { + var wg sync.WaitGroup + wg.Add(len(ports)) + for _, port := range ports { + go func(port string, wg *sync.WaitGroup) { + defer wg.Done() + log.Debugf("listening on port %s", port) + if err := listener(port); err != nil { + log.Error(err) + return + } + }(port, &wg) + } + wg.Wait() +} + +func listener(port string) (err error) { + server, err := net.Listen("tcp", "0.0.0.0:"+port) + if err != nil { + return errors.Wrap(err, "Error listening on :"+port) + } + defer server.Close() + // spawn a new goroutine whenever a client connects + for { + connection, err := server.Accept() + if err != nil { + return errors.Wrap(err, "problem accepting connection") + } + log.Debugf("client %s connected", connection.RemoteAddr().String()) + go func(port string, connection net.Conn) { + errCommunication := clientCommuncation(port, connection) + if errCommunication != nil { + log.Warnf("relay-%s: %s", connection.RemoteAddr().String(), errCommunication.Error()) + } + }(port, connection) + } +} + +func clientCommuncation(port string, connection net.Conn) (err error) { + var con1, con2 net.Conn + + // get the channel and UUID from the client + err = sendMessage("channel and uuid?", connection) + if err != nil { + return + } + channel, err := receiveMessage(connection) + if err != nil { + return + } + uuid, err := receiveMessage(connection) + if err != nil { + return + } + log.Debugf("%s connected with channel %s and uuid %s", connection.RemoteAddr().String(), channel, uuid) + + // validate channel and UUID + rs.Lock() + if _, ok := rs.channel[channel]; !ok { + rs.Unlock() + err = errors.Errorf("channel %s does not exist", channel) + return + } + if uuid != rs.channel[channel].uuids[0] && + uuid != rs.channel[channel].uuids[1] { + rs.Unlock() + err = errors.Errorf("uuid '%s' is invalid", uuid) + return + } + role := 0 + if uuid == rs.channel[channel].uuids[1] { + role = 1 + } + rs.channel[channel].connection[role] = connection + + con1 = rs.channel[channel].connection[0] + con2 = rs.channel[channel].connection[1] + rs.Unlock() + + if con1 != nil && con2 != nil { + var wg sync.WaitGroup + wg.Add(2) + // first start piping + go func(con1 net.Conn, con2 net.Conn, wg *sync.WaitGroup) { + pipe(con1, con2) + wg.Done() + }(con1, con2, &wg) + // then set transfer ready + go func(channel string, wg *sync.WaitGroup) { + // set the channels to ready + rs.Lock() + rs.channel[channel].TransferReady = true + rs.Unlock() + wg.Done() + }(channel, &wg) + wg.Wait() + log.Debugf("finished transfer") + } + return +} + +func sendMessage(message string, connection net.Conn) (err error) { + message = fillString(message, bufferSize) + _, err = connection.Write([]byte(message)) + return +} + +func receiveMessage(connection net.Conn) (s string, err error) { + messageByte := make([]byte, bufferSize) + err = connection.SetReadDeadline(time.Now().Add(60 * time.Minute)) + if err != nil { + return + } + err = connection.SetDeadline(time.Now().Add(60 * time.Minute)) + if err != nil { + return + } + err = connection.SetWriteDeadline(time.Now().Add(60 * time.Minute)) + if err != nil { + return + } + _, err = connection.Read(messageByte) + if err != nil { + return + } + s = strings.TrimRight(string(messageByte), ":") + return +} + +func fillString(returnString string, toLength int) string { + for { + lengthString := len(returnString) + if lengthString < toLength { + returnString = returnString + ":" + continue + } + break + } + return returnString +} + +// chanFromConn creates a channel from a Conn object, and sends everything it +// Read()s from the socket to the channel. +func chanFromConn(conn net.Conn) chan []byte { + c := make(chan []byte) + + go func() { + b := make([]byte, bufferSize) + + for { + n, err := conn.Read(b) + if n > 0 { + res := make([]byte, n) + // Copy the buffer so it doesn't get changed while read by the recipient. + copy(res, b[:n]) + c <- res + } + if err != nil { + c <- nil + break + } + } + }() + + return c +} + +// pipe creates a full-duplex pipe between the two sockets and +// transfers data from one to the other. +func pipe(conn1 net.Conn, conn2 net.Conn) { + chan1 := chanFromConn(conn1) + chan2 := chanFromConn(conn2) + + for { + select { + case b1 := <-chan1: + if b1 == nil { + return + } + conn2.Write(b1) + + case b2 := <-chan2: + if b2 == nil { + return + } + conn1.Write(b2) + } + } +} diff --git a/src/server.go b/src/server.go index 3b66044..46f3351 100644 --- a/src/server.go +++ b/src/server.go @@ -1,7 +1,6 @@ package croc import ( - "bytes" "crypto/elliptic" "encoding/json" "fmt" @@ -27,17 +26,8 @@ func init() { rs.Unlock() } -const ( - state_curve = "curve" - state_hh_k = "hh_k" - state_is_open = "is_open" - state_recipient_ready = "recipient_ready" - state_sender_ready = "sender_ready" - state_x = "x" - state_y = "y" -) - -func RunRelay(port string) (err error) { +func startServer(tcpPorts []string, port string) (err error) { + // start server gin.SetMode(gin.ReleaseMode) r := gin.New() r.Use(middleWareHandler(), gin.Recovery()) @@ -88,15 +78,13 @@ func RunRelay(port string) (err error) { } // return the current state - r.State = make(map[string][]byte) - for key := range rs.channel[p.Channel].State { - r.State[key] = rs.channel[p.Channel].State[key] - } + r.Data = rs.channel[p.Channel] r.Message = fmt.Sprintf("assigned %d keys: %v", len(assignedKeys), assignedKeys) return }(c) if err != nil { + log.Debugf("bad /channel: %s", err.Error()) r.Message = err.Error() r.Success = false } @@ -132,7 +120,7 @@ func RunRelay(port string) (err error) { if _, ok := rs.channel[p.Channel]; ok { // channel is not empty if rs.channel[p.Channel].uuids[p.Role] != "" { - err = errors.Errorf("channel is already occupied by role %d", p.Role) + err = errors.Errorf("channel '%s' already occupied by role %d", p.Channel, p.Role) return } } @@ -144,9 +132,10 @@ func RunRelay(port string) (err error) { // assign UUID for the role in the channel rs.channel[r.Channel].uuids[p.Role] = uuid4.New().String() r.UUID = rs.channel[r.Channel].uuids[p.Role] + log.Debugf("(%s) %s has joined as role %d", r.Channel, r.UUID, p.Role) - // if channel is not open, determine curve - if bytes.Equal(rs.channel[r.Channel].State[state_is_open], []byte{0}) { + // if channel is not open, set curve + if !rs.channel[r.Channel].isopen { switch curve := p.Curve; curve { case "p224": rs.channel[r.Channel].curve = elliptic.P224() @@ -162,14 +151,17 @@ func RunRelay(port string) (err error) { p.Curve = "p256" rs.channel[r.Channel].curve = elliptic.P256() } - rs.channel[r.Channel].State[state_curve] = []byte(p.Curve) - rs.channel[r.Channel].State[state_is_open] = []byte{1} + log.Debugf("(%s) using curve '%s'", r.Channel, p.Curve) + rs.channel[r.Channel].State["curve"] = []byte(p.Curve) + rs.channel[r.Channel].Ports = tcpPorts + rs.channel[r.Channel].isopen = true } r.Message = fmt.Sprintf("assigned role %d in channel '%s'", p.Role, r.Channel) return }(c) if err != nil { + log.Debugf("bad /join: %s", err.Error()) r.Message = err.Error() r.Success = false } @@ -184,8 +176,6 @@ func RunRelay(port string) (err error) { func middleWareHandler() gin.HandlerFunc { return func(c *gin.Context) { t := time.Now() - // Add base headers - // addCORS(c) // Run next function c.Next() // Log request