2021-08-20 18:43:55 +02:00
|
|
|
//! Processor responsible for receiving events, filtering them, and scheduling actions in response.
|
|
|
|
|
2021-08-21 19:58:03 +02:00
|
|
|
use std::{
|
|
|
|
fmt,
|
|
|
|
sync::Arc,
|
|
|
|
time::{Duration, Instant},
|
|
|
|
};
|
2021-08-20 18:43:55 +02:00
|
|
|
|
2021-08-21 19:58:03 +02:00
|
|
|
use atomic_take::AtomicTake;
|
|
|
|
use tokio::{
|
|
|
|
sync::{mpsc, watch},
|
|
|
|
time::timeout,
|
|
|
|
};
|
|
|
|
use tracing::{debug, trace};
|
2021-08-20 18:43:55 +02:00
|
|
|
|
|
|
|
use crate::{
|
|
|
|
error::{CriticalError, RuntimeError},
|
|
|
|
event::Event,
|
2021-08-21 19:58:03 +02:00
|
|
|
handler::{rte, Handler},
|
2021-08-20 18:43:55 +02:00
|
|
|
};
|
|
|
|
|
2021-08-21 19:58:03 +02:00
|
|
|
#[derive(Clone)]
|
2021-08-20 18:43:55 +02:00
|
|
|
#[non_exhaustive]
|
|
|
|
pub struct WorkingData {
|
|
|
|
pub throttle: Duration,
|
2021-08-21 19:58:03 +02:00
|
|
|
pub action_handler: Arc<AtomicTake<Box<dyn Handler<Action> + Send>>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl fmt::Debug for WorkingData {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
f.debug_struct("WorkingData")
|
|
|
|
.field("throttle", &self.throttle)
|
|
|
|
.finish_non_exhaustive()
|
|
|
|
}
|
2021-08-20 18:43:55 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for WorkingData {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
|
|
|
throttle: Duration::from_millis(100),
|
2021-08-21 19:58:03 +02:00
|
|
|
action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
|
2021-08-20 18:43:55 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-21 19:58:03 +02:00
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub struct Action {
|
|
|
|
pub events: Vec<Event>,
|
|
|
|
}
|
|
|
|
|
2021-08-20 18:43:55 +02:00
|
|
|
pub async fn worker(
|
2021-08-21 16:54:02 +02:00
|
|
|
working: watch::Receiver<WorkingData>,
|
2021-08-20 18:43:55 +02:00
|
|
|
errors: mpsc::Sender<RuntimeError>,
|
2021-08-21 16:54:02 +02:00
|
|
|
mut events: mpsc::Receiver<Event>,
|
2021-08-20 18:43:55 +02:00
|
|
|
) -> Result<(), CriticalError> {
|
2021-08-21 16:54:02 +02:00
|
|
|
let mut last = Instant::now();
|
|
|
|
let mut set = Vec::new();
|
2021-08-21 19:58:03 +02:00
|
|
|
let mut handler =
|
|
|
|
{ working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?;
|
2021-08-21 16:54:02 +02:00
|
|
|
|
|
|
|
loop {
|
|
|
|
let maxtime = working.borrow().throttle;
|
|
|
|
match timeout(maxtime, events.recv()).await {
|
2021-08-21 19:58:03 +02:00
|
|
|
Err(_timeout) => {}
|
2021-08-21 16:54:02 +02:00
|
|
|
Ok(None) => break,
|
|
|
|
Ok(Some(event)) => {
|
|
|
|
set.push(event);
|
|
|
|
|
|
|
|
if last.elapsed() < working.borrow().throttle {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
last = Instant::now();
|
2021-08-21 19:58:03 +02:00
|
|
|
|
|
|
|
let action = Action {
|
|
|
|
events: set.drain(..).collect(),
|
|
|
|
};
|
|
|
|
debug!(?action, "action constructed");
|
|
|
|
|
|
|
|
if let Some(h) = working.borrow().action_handler.take() {
|
|
|
|
trace!("action handler updated");
|
|
|
|
handler = h;
|
|
|
|
}
|
|
|
|
|
|
|
|
let err = handler.handle(action).map_err(|e| rte("action worker", e));
|
|
|
|
if let Err(err) = err {
|
|
|
|
errors.send(err).await?;
|
|
|
|
}
|
2021-08-21 16:54:02 +02:00
|
|
|
}
|
2021-08-20 18:43:55 +02:00
|
|
|
Ok(())
|
|
|
|
}
|