diff --git a/go.sum b/go.sum index 76fd821..74c7c04 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,7 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/schollz/croc v3.0.6+incompatible h1:rCfc8MGgcGjNW2/qSoulPh8CRGH+Ej4i3RWYOwhX9pE= github.com/schollz/mnemonicode v1.0.1 h1:LiH5hwADZwjwnfXsaD4xgnMyTAtaKHN+e5AyjRU6WSU= github.com/schollz/mnemonicode v1.0.1/go.mod h1:cl4UAOhUV0mkdjMj/QYaUZbZZdF8BnOqoz8rHMzwboY= github.com/schollz/pake v1.1.0 h1:+tYqsPVkuirFpmeRePjYTUhIHHKLufdmd7QfuspaXCk= diff --git a/src/relay/conn.go b/src/relay/conn.go deleted file mode 100644 index 71a565a..0000000 --- a/src/relay/conn.go +++ /dev/null @@ -1,123 +0,0 @@ -package relay - -import ( - "net/http" - "time" - - log "github.com/cihub/seelog" - "github.com/gorilla/websocket" - "github.com/schollz/croc/src/models" -) - -const ( - // Time allowed to write a message to the peer. - writeWait = 6000 * time.Second - - // Time allowed to read the next pong message from the peer. - pongWait = 6000 * time.Second - - // Send pings to peer with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 - - // Maximum message size allowed from peer. - maxMessageSize = models.WEBSOCKET_BUFFER_SIZE / 2 -) - -var upgrader = websocket.Upgrader{ - ReadBufferSize: models.WEBSOCKET_BUFFER_SIZE, - WriteBufferSize: models.WEBSOCKET_BUFFER_SIZE, -} - -// connection is an middleman between the websocket connection and the hub. -type connection struct { - // The websocket connection. - ws *websocket.Conn - - // Buffered channel of outbound messages. - send chan messageChannel -} - -type messageChannel struct { - data []byte - messageType int -} - -// readPump pumps messages from the websocket connection to the hub. -func (s subscription) readPump() { - c := s.conn - defer func() { - h.unregister <- s - c.ws.Close() - }() - c.ws.SetReadLimit(maxMessageSize) - c.ws.SetReadDeadline(time.Now().Add(pongWait)) - c.ws.SetPongHandler(func(string) error { c.ws.SetReadDeadline(time.Now().Add(pongWait)); return nil }) - for { - messageType, msg, err := c.ws.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { - log.Debugf("unexpected close: %v", err) - } - break - } - h.broadcast <- message{messageChannel{msg, messageType}, s.room, c.ws.RemoteAddr().String()} - } -} - -// write writes a message with the given message type and payload. -func (c *connection) write(mt int, payload []byte) error { - c.ws.SetWriteDeadline(time.Now().Add(writeWait)) - return c.ws.WriteMessage(mt, payload) -} - -// writePump pumps messages from the hub to the websocket connection. -func (s *subscription) writePump() { - c := s.conn - ticker := time.NewTicker(pingPeriod) - defer func() { - ticker.Stop() - c.ws.Close() - }() - for { - select { - case message, ok := <-c.send: - if !ok { - err := c.write(websocket.CloseMessage, []byte{}) - if err != nil { - log.Debug(err) - } - return - } - if err := c.write(message.messageType, message.data); err != nil { - log.Debug(err) - return - } - case <-ticker.C: - if err := c.write(websocket.PingMessage, []byte{}); err != nil { - log.Debug(err) - return - } - } - } -} - -// serveWs handles websocket requests from the peer. -func serveWs(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Debug(err) - return - } - vals := r.URL.Query() - room := "default" - rooms, ok := vals["room"] - if ok { - room = rooms[0] - } - - c := &connection{send: make(chan messageChannel, 256), ws: ws} - s := subscription{c, room} - h.register <- s - go s.writePump() - s.readPump() -} diff --git a/src/relay/hub.go b/src/relay/hub.go deleted file mode 100644 index bbb11b3..0000000 --- a/src/relay/hub.go +++ /dev/null @@ -1,105 +0,0 @@ -package relay - -import ( - "sync" - - log "github.com/cihub/seelog" -) - -type message struct { - msg messageChannel - room string - remoteOrigin string -} - -type subscription struct { - conn *connection - room string -} - -// hub maintains the set of active connections and broadcasts messages to the -// connections. -type hub struct { - // Registered connections. - rooms roomMap - - // Inbound messages from the connections. - broadcast chan message - - // Register requests from the connections. - register chan subscription - - // Unregister requests from connections. - unregister chan subscription -} - -type roomMap struct { - rooms map[string]map[*connection]bool - sync.Mutex -} - -var h = hub{ - broadcast: make(chan message), - register: make(chan subscription), - unregister: make(chan subscription), - rooms: roomMap{rooms: make(map[string]map[*connection]bool)}, -} - -func (h *hub) run() { - for { - if stop { - log.Debug("stopping hub") - return - } - select { - case s := <-h.register: - log.Debugf("adding connection to %s", s.room) - h.rooms.Lock() - connections := h.rooms.rooms[s.room] - if connections == nil { - connections = make(map[*connection]bool) - h.rooms.rooms[s.room] = connections - } - h.rooms.rooms[s.room][s.conn] = true - if len(h.rooms.rooms) > 2 { - // if more than three, close all of them - for connection := range h.rooms.rooms[s.room] { - close(connection.send) - } - log.Debugf("deleting room %s", s.room) - delete(h.rooms.rooms, s.room) - } - h.rooms.Unlock() - case s := <-h.unregister: - // if one leaves, close all of them - h.rooms.Lock() - if _, ok := h.rooms.rooms[s.room]; ok { - for connection := range h.rooms.rooms[s.room] { - close(connection.send) - } - log.Debugf("deleting room %s", s.room) - delete(h.rooms.rooms, s.room) - } - h.rooms.Unlock() - case m := <-h.broadcast: - h.rooms.Lock() - connections := h.rooms.rooms[m.room] - for c := range connections { - if c.ws.RemoteAddr().String() == m.remoteOrigin { - continue - } - select { - case c.send <- m.msg: - default: - close(c.send) - delete(connections, c) - if len(connections) == 0 { - log.Debugf("deleting room %s", m.room) - delete(h.rooms.rooms, m.room) - } - } - } - h.rooms.Unlock() - } - } -} diff --git a/src/relay/relay.go b/src/relay/relay.go deleted file mode 100644 index 951a1ea..0000000 --- a/src/relay/relay.go +++ /dev/null @@ -1,55 +0,0 @@ -package relay - -import ( - "context" - "fmt" - "net/http" - "time" - - log "github.com/cihub/seelog" - "github.com/schollz/croc/src/logger" - "github.com/schollz/croc/src/tcp" -) - -var DebugLevel string -var stop bool - -func Stop() { - log.Debug("got stop signal") - stop = true -} - -// Run is the async operation for running a server -func Run(port string, tcpPorts []string) (err error) { - logger.SetLogLevel(DebugLevel) - - if len(tcpPorts) > 0 { - for _, tcpPort := range tcpPorts { - go tcp.Run(DebugLevel, tcpPort) - } - } - - go h.run() - log.Debug("running relay on " + port) - m := http.NewServeMux() - s := http.Server{Addr: ":" + port, Handler: m} - m.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { - serveWs(w, r) - }) - m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "ok") - }) - go func() { - for { - if stop { - s.Shutdown(context.Background()) - log.Debug("stopping http server") - return - } - time.Sleep(10 * time.Millisecond) - } - }() - s.ListenAndServe() - log.Debug("finished") - return -}