fix bugs in relay

This commit is contained in:
Zack Scholl 2018-06-24 06:55:43 -07:00
parent cc79aaeecb
commit 992e109279
2 changed files with 94 additions and 32 deletions

View File

@ -228,18 +228,20 @@ func (c *Connection) Run() error {
}
}
// broadcast local connection from sender
log.Debug("settings payload to ", c.Code)
go func() {
log.Debug("listening for local croc relay...")
go peerdiscovery.Discover(peerdiscovery.Settings{
Limit: 1,
TimeLimit: 600 * time.Second,
Delay: 50 * time.Millisecond,
Payload: []byte(c.Code),
})
runClientError <- c.runClient("localhost")
}()
if c.Server != "localhost" {
// broadcast local connection from sender
log.Debug("settings payload to ", c.Code)
go func() {
log.Debug("listening for local croc relay...")
go peerdiscovery.Discover(peerdiscovery.Settings{
Limit: 1,
TimeLimit: 600 * time.Second,
Delay: 50 * time.Millisecond,
Payload: []byte(c.Code),
})
runClientError <- c.runClient("localhost")
}()
}
}
log.Debug("checking code validity")
@ -464,6 +466,7 @@ func (c *Connection) runClient(serverName string) error {
} else { // this is a receiver
log.Debug("waiting for meta data from sender")
message = receiveMessage(connection)
log.Debugf("message from server: %s", message)
if message == "no" {
if id == 0 {
fmt.Println("The specifed code is already in use by a sender.")
@ -586,7 +589,7 @@ func (c *Connection) runClient(serverName string) error {
responses.startTime = time.Now()
responses.Unlock()
if !c.Debug && id == 0 {
// c.bar.SetMax(c.File.Size)
c.bar.Finish()
c.bar.Reset()
} else {
// try to let the first thread start first
@ -613,13 +616,13 @@ func (c *Connection) runClient(serverName string) error {
timeSinceStart := time.Since(responses.startTime).Nanoseconds()
if !fileTransfered {
fmt.Fprintf(os.Stderr, "\nNo mutual consent")
return nil
} else if c.IsSender {
if c.IsSender {
if responses.gotTimeout {
fmt.Println("Timeout waiting for receiver")
return nil
} else if !fileTransfered {
fmt.Fprintf(os.Stderr, "\nNo mutual consent")
return nil
}
fileOrFolder := "File"
if c.File.IsDir {
@ -630,6 +633,9 @@ func (c *Connection) runClient(serverName string) error {
if responses.notPresent {
fmt.Println("Either code is incorrect or sender is not ready. Use -wait to wait until sender connects.")
return nil
} else if !fileTransfered {
fmt.Fprintf(os.Stderr, "\nNo mutual consent")
return nil
}
if !responses.gotOK {
return errors.New("Transfer interrupted")

View File

@ -34,13 +34,6 @@ func (c *connectionMap) IsSenderConnected(key string) (found bool) {
return
}
func (c *connectionMap) IsPotentialReceiverConnected(key string) (found bool) {
c.RLock()
defer c.RUnlock()
_, found = c.potentialReceivers[key]
return
}
type Relay struct {
connections connectionMap
Debug bool
@ -141,11 +134,16 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
switch connectionType {
case "s": // sender connection
defer func() {
startTime := time.Now()
deleteAll := func() {
r.connections.Lock()
// close connections
r.connections.sender[key].Close()
r.connections.receiver[key].Close()
if _, ok := r.connections.sender[key]; ok {
r.connections.sender[key].Close()
}
if _, ok := r.connections.receiver[key]; ok {
r.connections.receiver[key].Close()
}
// delete connctions
delete(r.connections.sender, key)
delete(r.connections.receiver, key)
@ -157,7 +155,9 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
delete(r.connections.passphrase, key)
r.connections.Unlock()
logger.Debug("deleted sender and receiver")
}()
}
defer deleteAll()
if r.connections.IsSenderConnected(key) {
sendMessage("no", connection)
return
@ -172,10 +172,11 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
receiversAddress := ""
receiversPublicKey := ""
isTimeout := time.Duration(0)
log.Debug("waiting for reciever for sender")
for {
if CONNECTION_TIMEOUT <= isTimeout {
sendMessage("timeout", connection)
break
return
}
r.connections.RLock()
if _, ok := r.connections.receiver[key]; ok {
@ -212,6 +213,7 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
r.connections.Unlock()
// wait for receiver ready
startTime = time.Now()
for {
r.connections.RLock()
if _, ok := r.connections.receiverReady[key]; ok {
@ -219,6 +221,9 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
break
}
r.connections.RUnlock()
if time.Since(startTime) > 5*time.Minute {
return
}
}
// go reciever ready tell sender to go
sendMessage("go", connection)
@ -231,12 +236,20 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
Pipe(con1, con2)
logger.Debug("done piping")
case "r", "c": // receiver
if r.connections.IsPotentialReceiverConnected(key) {
startTime := time.Now()
log.Debug("is receiver")
r.connections.RLock()
_, foundReceiver := r.connections.potentialReceivers[key]
r.connections.RUnlock()
if foundReceiver {
log.Debug("already have receiver")
sendMessage("no", connection)
return
}
// add as a potential receiver
logger.Debug("adding as potential reciever")
r.connections.Lock()
r.connections.potentialReceivers[key] = struct{}{}
r.connections.rpublicKey[key] = publicKey
@ -245,8 +258,15 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
// wait for sender's metadata
sendersAddress := ""
sendersPublicKey := ""
startTime = time.Now()
for {
r.connections.RLock()
// check if been deleted
if _, ok := r.connections.potentialReceivers[key]; !ok {
log.Debug("deleting and finishing")
r.connections.RUnlock()
return
}
if _, ok := r.connections.metadata[key]; ok {
if _, ok2 := r.connections.sender[key]; ok2 {
sendersAddress = r.connections.sender[key].RemoteAddr().String()
@ -270,6 +290,9 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
return
}
time.Sleep(100 * time.Millisecond)
if time.Since(startTime) > 5*time.Minute {
return
}
}
// send meta data
r.connections.RLock()
@ -279,16 +302,26 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
// check for senders consent
sendersConsent := ""
startTime = time.Now()
for {
r.connections.RLock()
// check if been deleted
if _, ok := r.connections.potentialReceivers[key]; !ok {
log.Debug("deleting and finishing")
r.connections.RUnlock()
return
}
if _, ok := r.connections.sconsent[key]; ok {
sendersConsent = r.connections.sconsent[key]
logger.Debugf("got sender passphrase: %s", sendersConsent)
}
r.connections.RUnlock()
if sendersConsent != "" {
break
}
time.Sleep(100 * time.Millisecond)
if time.Since(startTime) > 5*time.Minute {
return
}
}
if sendersConsent != "ok" {
// TODO: delete everything
@ -297,8 +330,16 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
// now get passphrase
sendersPassphrase := ""
startTime = time.Now()
for {
r.connections.RLock()
// check if been deleted
if _, ok := r.connections.potentialReceivers[key]; !ok {
log.Debug("deleting and finishing")
r.connections.RUnlock()
return
}
if _, ok := r.connections.passphrase[key]; ok {
sendersPassphrase = r.connections.passphrase[key]
logger.Debugf("got sender passphrase: %s", sendersPassphrase)
@ -307,6 +348,10 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
if sendersPassphrase != "" {
break
}
if time.Since(startTime) > 5*time.Minute {
return
}
time.Sleep(100 * time.Millisecond)
}
// check for receiver's consent
@ -316,8 +361,16 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
logger.Debug("got consent")
// wait for encrypted passphrase
encryptedPassphrase := ""
startTime = time.Now()
for {
r.connections.RLock()
// check if been deleted
if _, ok := r.connections.potentialReceivers[key]; !ok {
log.Debug("deleting and finishing")
r.connections.RUnlock()
return
}
if _, ok := r.connections.passphrase[key]; ok {
encryptedPassphrase = r.connections.passphrase[key]
logger.Debugf("got passphrase: %s", r.connections.passphrase[key])
@ -326,7 +379,10 @@ func (r *Relay) clientCommuncation(id int, connection net.Conn) {
if encryptedPassphrase != "" {
break
}
time.Sleep(10 * time.Millisecond)
if time.Since(startTime) > 5*time.Minute {
return
}
time.Sleep(100 * time.Millisecond)
}
sendMessage(encryptedPassphrase, connection)
}