From 8ff3fb9b8947d7b9bdcbb4d96eab2bc173020dab Mon Sep 17 00:00:00 2001 From: Marcel Battista Date: Tue, 19 Oct 2021 02:46:54 +0200 Subject: [PATCH] added throttle upload feature --- go.mod | 1 + go.sum | 2 ++ src/cli/cli.go | 2 ++ src/croc/croc.go | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 38 insertions(+) diff --git a/go.mod b/go.mod index 1e0a994..0a0bc87 100644 --- a/go.mod +++ b/go.mod @@ -23,5 +23,6 @@ require ( golang.org/x/net v0.0.0-20210929193557-e81a3d93ecf6 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect + golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect ) diff --git a/go.sum b/go.sum index 79160de..d186dcc 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= +golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= diff --git a/src/cli/cli.go b/src/cli/cli.go index 1a47870..fe826bc 100644 --- a/src/cli/cli.go +++ b/src/cli/cli.go @@ -102,6 +102,7 @@ func Run() (err error) { &cli.StringFlag{Name: "out", Value: ".", Usage: "specify an output folder to receive the file"}, &cli.StringFlag{Name: "pass", Value: models.DEFAULT_PASSPHRASE, Usage: "password for the relay", EnvVars: []string{"CROC_PASS"}}, &cli.StringFlag{Name: "socks5", Value: "", Usage: "add a socks5 proxy", EnvVars: []string{"SOCKS5_PROXY"}}, + &cli.StringFlag{Name: "throttleUpload", Value: "", Usage: "Throttle the upload speed e.g. 500k"}, } app.EnableBashCompletion = true app.HideHelp = false @@ -207,6 +208,7 @@ func send(c *cli.Context) (err error) { Overwrite: c.Bool("overwrite"), Curve: c.String("curve"), HashAlgorithm: c.String("hash"), + ThrottleUpload: c.String("throttleUpload"), } if crocOptions.RelayAddress != models.DEFAULT_RELAY { crocOptions.RelayAddress6 = "" diff --git a/src/croc/croc.go b/src/croc/croc.go index 215230d..4414eb1 100644 --- a/src/croc/croc.go +++ b/src/croc/croc.go @@ -17,6 +17,7 @@ import ( "strings" "sync" "time" + "golang.org/x/time/rate" "github.com/denisbrodbeck/machineid" log "github.com/schollz/logger" @@ -73,6 +74,7 @@ type Options struct { Overwrite bool Curve string HashAlgorithm string + ThrottleUpload string } // Client holds the state of the croc transfer @@ -105,6 +107,7 @@ type Client struct { TotalSent int64 TotalChunksTransfered int chunkMap map[uint64]struct{} + limiter *rate.Limiter // tcp connections conn []*comm.Comm @@ -175,6 +178,31 @@ func New(ops Options) (c *Client, err error) { c.conn = make([]*comm.Comm, 16) + if len(c.Options.ThrottleUpload) > 0 && c.Options.IsSender { + upload := c.Options.ThrottleUpload[:len(c.Options.ThrottleUpload)-1] + uploadLimit, err := strconv.ParseInt(upload, 10, 64) + if err != nil { + panic("Could not parse given Upload Limit") + } + minBurstSize := models.TCP_BUFFER_SIZE + var rt rate.Limit + switch unit := string(c.Options.ThrottleUpload[len(c.Options.ThrottleUpload)-1:]); unit { + case "g", "G": + uploadLimit = uploadLimit*1024*1024*1024 + case "m", "M": + uploadLimit = uploadLimit*1024*1024 + case "k", "K": + uploadLimit = uploadLimit*1024 + } + // Somehow 4* is neccessary + rt = rate.Every(time.Second / (4*time.Duration(uploadLimit))) + if (int(uploadLimit) > minBurstSize) { + minBurstSize = int(uploadLimit) + } + c.limiter = rate.NewLimiter(rt, minBurstSize) + log.Debugf("Throttling Upload to %#v", c.limiter.Limit()) + } + // initialize pake for recipient if !c.Options.IsSender { c.Pake, err = pake.InitCurve([]byte(c.Options.SharedSecret[5:]), 0, c.Options.Curve) @@ -1519,6 +1547,11 @@ func (c *Client) sendData(i int) { n, errRead := c.fread.ReadAt(data, readingPos) // log.Debugf("%d read %d bytes", i, n) readingPos += int64(n) + if (c.limiter != nil) { + r := c.limiter.ReserveN(time.Now(), n) + log.Debugf("Limiting Upload for %d", r.Delay()) + time.Sleep(r.Delay()) + } if math.Mod(curi, float64(len(c.Options.RelayPorts))) == float64(i) { // check to see if this is a chunk that the recipient wants