watchexec/lib/src/action.rs

174 lines
3.9 KiB
Rust
Raw Normal View History

2021-08-20 18:43:55 +02:00
//! Processor responsible for receiving events, filtering them, and scheduling actions in response.
2021-08-21 19:58:03 +02:00
use std::{
fmt,
sync::Arc,
time::{Duration, Instant},
};
2021-08-20 18:43:55 +02:00
2021-08-21 19:58:03 +02:00
use atomic_take::AtomicTake;
2021-08-22 10:27:51 +02:00
use command_group::Signal;
use once_cell::sync::OnceCell;
2021-08-21 19:58:03 +02:00
use tokio::{
sync::{mpsc, watch},
time::timeout,
};
use tracing::{debug, trace};
2021-08-20 18:43:55 +02:00
use crate::{
error::{CriticalError, RuntimeError},
event::Event,
2021-08-21 19:58:03 +02:00
handler::{rte, Handler},
2021-08-20 18:43:55 +02:00
};
2021-08-21 19:58:03 +02:00
#[derive(Clone)]
2021-08-20 18:43:55 +02:00
#[non_exhaustive]
pub struct WorkingData {
pub throttle: Duration,
2021-08-22 10:26:48 +02:00
/// TODO: notes on how outcome is read immediately after handler returns
2021-08-21 19:58:03 +02:00
pub action_handler: Arc<AtomicTake<Box<dyn Handler<Action> + Send>>>,
}
impl fmt::Debug for WorkingData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WorkingData")
.field("throttle", &self.throttle)
.finish_non_exhaustive()
}
2021-08-20 18:43:55 +02:00
}
impl Default for WorkingData {
fn default() -> Self {
Self {
2021-08-22 10:26:48 +02:00
// set to 50ms here, but will remain 100ms on cli until 2022
throttle: Duration::from_millis(50),
2021-08-21 19:58:03 +02:00
action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
2021-08-20 18:43:55 +02:00
}
}
}
#[derive(Debug, Default)]
2021-08-21 19:58:03 +02:00
pub struct Action {
pub events: Vec<Event>,
outcome: Arc<OnceCell<Outcome>>,
}
impl Action {
fn new(events: Vec<Event>) -> Self {
Self {
events,
..Self::default()
}
}
/// Set the action's outcome.
///
/// This takes `self` and `Action` is not `Clone`, so it's only possible to call it once.
/// Regardless, if you _do_ manage to call it twice, it will do nothing beyond the first call.
pub fn outcome(self, outcome: Outcome) {
self.outcome.set(outcome).ok();
}
}
2021-08-22 10:27:51 +02:00
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum Outcome {
2021-08-22 10:27:51 +02:00
/// Stop processing this action silently.
DoNothing,
2021-08-22 10:27:51 +02:00
/// If the command isn't running, start it.
Start,
/// Wait for command completion, then start a new one.
Queue,
2021-08-22 10:27:51 +02:00
/// Stop the command, then start a new one.
Restart,
2021-08-22 10:27:51 +02:00
/// Send this signal to the command.
Signal(Signal),
2021-08-22 10:27:51 +02:00
/// When command is running, do the first, otherwise the second.
IfRunning(Box<Outcome>, Box<Outcome>),
/// Clear the screen before doing the inner outcome.
ClearAnd(Box<Outcome>),
}
impl Default for Outcome {
fn default() -> Self {
2021-08-22 10:27:51 +02:00
Self::DoNothing
}
}
impl Outcome {
pub fn if_running(then: Outcome, otherwise: Outcome) -> Self {
Self::IfRunning(Box::new(then), Box::new(otherwise))
}
pub fn clear_and(then: Outcome) -> Self {
Self::ClearAnd(Box::new(then))
}
2021-08-21 19:58:03 +02:00
}
2021-08-20 18:43:55 +02:00
pub async fn worker(
2021-08-21 16:54:02 +02:00
working: watch::Receiver<WorkingData>,
2021-08-20 18:43:55 +02:00
errors: mpsc::Sender<RuntimeError>,
2021-08-21 16:54:02 +02:00
mut events: mpsc::Receiver<Event>,
2021-08-20 18:43:55 +02:00
) -> Result<(), CriticalError> {
2021-08-21 16:54:02 +02:00
let mut last = Instant::now();
let mut set = Vec::new();
2021-08-21 19:58:03 +02:00
let mut handler =
{ working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?;
2021-08-21 16:54:02 +02:00
loop {
let maxtime = working.borrow().throttle.saturating_sub(last.elapsed());
if maxtime.is_zero() {
trace!("out of throttle on recycle");
} else {
trace!(?maxtime, "waiting for event");
match timeout(maxtime, events.recv()).await {
Err(_timeout) => {
trace!("timed out");
}
Ok(None) => break,
Ok(Some(event)) => {
trace!(?event, "got event");
set.push(event);
let elapsed = last.elapsed();
if elapsed < working.borrow().throttle {
trace!(?elapsed, "still within throttle window, cycling");
continue;
}
2021-08-21 16:54:02 +02:00
}
}
}
trace!("out of throttle, starting action process");
2021-08-21 16:54:02 +02:00
last = Instant::now();
2021-08-21 19:58:03 +02:00
let action = Action::new(set.drain(..).collect());
2021-08-21 19:58:03 +02:00
debug!(?action, "action constructed");
if let Some(h) = working.borrow().action_handler.take() {
trace!("action handler updated");
handler = h;
}
let outcome = action.outcome.clone();
2021-08-21 19:58:03 +02:00
let err = handler.handle(action).map_err(|e| rte("action worker", e));
if let Err(err) = err {
errors.send(err).await?;
}
let outcome = outcome.get().cloned().unwrap_or_default();
debug!(?outcome, "handler finished");
2021-08-21 16:54:02 +02:00
}
debug!("action worker finished");
2021-08-20 18:43:55 +02:00
Ok(())
}