From 2812a723ffadd138dbc3c0fe5c16a12cc6e60337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Sun, 22 Aug 2021 02:54:02 +1200 Subject: [PATCH] Write action throttling code --- lib/src/action.rs | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/lib/src/action.rs b/lib/src/action.rs index 576c86f..d263a40 100644 --- a/lib/src/action.rs +++ b/lib/src/action.rs @@ -1,8 +1,8 @@ //! Processor responsible for receiving events, filtering them, and scheduling actions in response. -use std::time::Duration; +use std::time::{Duration, Instant}; -use tokio::sync::{mpsc, watch}; +use tokio::{sync::{mpsc, watch}, time::timeout}; use crate::{ error::{CriticalError, RuntimeError}, @@ -24,9 +24,29 @@ impl Default for WorkingData { } pub async fn worker( - mut working: watch::Receiver, + working: watch::Receiver, errors: mpsc::Sender, - events: mpsc::Receiver, + mut events: mpsc::Receiver, ) -> Result<(), CriticalError> { + let mut last = Instant::now(); + let mut set = Vec::new(); + + loop { + let maxtime = working.borrow().throttle; + match timeout(maxtime, events.recv()).await { + Err(_timeout) => {}, + Ok(None) => break, + Ok(Some(event)) => { + set.push(event); + + if last.elapsed() < working.borrow().throttle { + continue; + } + } + } + + last = Instant::now(); + set.drain(..); // TODO: do action with the set + } Ok(()) }