watchexec/lib/src/action.rs

313 lines
7.1 KiB
Rust
Raw Normal View History

2021-08-20 18:43:55 +02:00
//! Processor responsible for receiving events, filtering them, and scheduling actions in response.
2021-08-21 19:58:03 +02:00
use std::{
sync::Arc,
2021-08-21 19:58:03 +02:00
time::{Duration, Instant},
};
2021-08-20 18:43:55 +02:00
2021-09-03 15:55:25 +02:00
use clearscreen::ClearScreen;
2021-08-21 19:58:03 +02:00
use tokio::{
2021-09-18 07:20:05 +02:00
sync::{mpsc, watch},
2021-08-21 19:58:03 +02:00
time::timeout,
};
2021-08-22 14:28:20 +02:00
use tracing::{debug, trace, warn};
2021-08-20 18:43:55 +02:00
2021-09-18 07:20:05 +02:00
pub use command_group::Signal;
2021-08-20 18:43:55 +02:00
use crate::{
command::Supervisor,
2021-08-20 18:43:55 +02:00
error::{CriticalError, RuntimeError},
event::Event,
2021-08-21 19:58:03 +02:00
handler::{rte, Handler},
2021-08-20 18:43:55 +02:00
};
2021-09-18 07:20:05 +02:00
#[doc(inline)]
pub use outcome::Outcome;
#[doc(inline)]
pub use workingdata::*;
2021-09-18 07:20:05 +02:00
mod outcome;
mod workingdata;
2021-08-21 19:58:03 +02:00
2021-08-20 18:43:55 +02:00
pub async fn worker(
2021-08-21 16:54:02 +02:00
working: watch::Receiver<WorkingData>,
2021-08-20 18:43:55 +02:00
errors: mpsc::Sender<RuntimeError>,
events_tx: mpsc::Sender<Event>,
2021-08-21 16:54:02 +02:00
mut events: mpsc::Receiver<Event>,
2021-08-20 18:43:55 +02:00
) -> Result<(), CriticalError> {
2021-08-21 16:54:02 +02:00
let mut last = Instant::now();
let mut set = Vec::new();
let mut process: Option<Supervisor> = None;
2021-08-24 18:41:14 +02:00
2021-08-22 16:33:23 +02:00
let mut action_handler =
2021-08-21 19:58:03 +02:00
{ working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?;
2021-08-24 18:41:14 +02:00
let mut pre_spawn_handler =
{ working.borrow().pre_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?;
let mut post_spawn_handler =
{ working.borrow().post_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?;
2021-08-21 16:54:02 +02:00
loop {
2021-08-22 16:33:57 +02:00
let maxtime = if set.is_empty() {
trace!("nothing in set, waiting forever for next event");
Duration::from_secs(u64::MAX)
} else {
working.borrow().throttle.saturating_sub(last.elapsed())
};
if maxtime.is_zero() {
2021-08-22 16:33:57 +02:00
if set.is_empty() {
trace!("out of throttle but nothing to do, resetting");
last = Instant::now();
continue;
} else {
trace!("out of throttle on recycle");
}
} else {
trace!(?maxtime, "waiting for event");
match timeout(maxtime, events.recv()).await {
Err(_timeout) => {
2021-08-22 16:33:57 +02:00
trace!("timed out, cycling");
continue;
}
Ok(None) => break,
Ok(Some(event)) => {
trace!(?event, "got event");
2021-08-22 17:12:23 +02:00
let filtered = working.borrow().filterer.check_event(&event);
match filtered {
Err(err) => {
trace!(%err, "filter errored on event");
errors.send(err).await?;
continue;
}
Ok(false) => {
trace!("filter rejected event");
continue;
}
Ok(true) => {
trace!("filter passed event");
}
}
2021-09-18 07:20:05 +02:00
2021-08-22 17:12:23 +02:00
if set.is_empty() {
trace!("event is the first, resetting throttle window");
last = Instant::now();
}
set.push(event);
let elapsed = last.elapsed();
if elapsed < working.borrow().throttle {
trace!(?elapsed, "still within throttle window, cycling");
continue;
}
2021-08-21 16:54:02 +02:00
}
}
}
trace!("out of throttle, starting action process");
2021-08-21 16:54:02 +02:00
last = Instant::now();
2021-08-21 19:58:03 +02:00
let action = Action::new(set.drain(..).collect());
2021-08-21 19:58:03 +02:00
debug!(?action, "action constructed");
if let Some(h) = working.borrow().action_handler.take() {
trace!("action handler updated");
2021-08-22 16:33:23 +02:00
action_handler = h;
2021-08-21 19:58:03 +02:00
}
2021-08-24 18:41:14 +02:00
if let Some(h) = working.borrow().pre_spawn_handler.take() {
trace!("pre-spawn handler updated");
pre_spawn_handler = h;
}
if let Some(h) = working.borrow().post_spawn_handler.take() {
trace!("post-spawn handler updated");
post_spawn_handler = h;
}
debug!("running action handler");
let outcome = action.outcome.clone();
2021-08-22 16:33:57 +02:00
let err = action_handler
.handle(action)
.map_err(|e| rte("action worker", e));
2021-08-21 19:58:03 +02:00
if let Err(err) = err {
errors.send(err).await?;
2021-08-24 18:41:14 +02:00
debug!("action handler errored, skipping");
continue;
2021-08-21 19:58:03 +02:00
}
let outcome = outcome.get().cloned().unwrap_or_default();
debug!(?outcome, "handler finished");
2021-08-22 13:23:01 +02:00
let is_running = process.as_ref().map(|p| p.is_running()).unwrap_or(false);
2021-08-22 13:23:01 +02:00
let outcome = outcome.resolve(is_running);
debug!(?outcome, "outcome resolved");
2021-08-22 14:28:20 +02:00
let w = working.borrow().clone();
2021-08-24 18:41:14 +02:00
let rerr = apply_outcome(
outcome,
w,
&mut process,
&mut pre_spawn_handler,
&mut post_spawn_handler,
errors.clone(),
events_tx.clone(),
2021-08-24 18:41:14 +02:00
)
.await;
2021-08-22 14:28:20 +02:00
if let Err(err) = rerr {
errors.send(err).await?;
}
2021-08-21 16:54:02 +02:00
}
debug!("action worker finished");
2021-08-20 18:43:55 +02:00
Ok(())
}
2021-08-22 14:28:20 +02:00
#[async_recursion::async_recursion]
async fn apply_outcome(
outcome: Outcome,
working: WorkingData,
process: &mut Option<Supervisor>,
2021-08-24 18:41:14 +02:00
pre_spawn_handler: &mut Box<dyn Handler<PreSpawn> + Send>,
post_spawn_handler: &mut Box<dyn Handler<PostSpawn> + Send>,
errors: mpsc::Sender<RuntimeError>,
events: mpsc::Sender<Event>,
2021-08-22 14:28:20 +02:00
) -> Result<(), RuntimeError> {
trace!(?outcome, "applying outcome");
2021-08-22 14:28:20 +02:00
match (process.as_mut(), outcome) {
(_, Outcome::DoNothing) => {}
2021-08-22 16:32:08 +02:00
(_, Outcome::Exit) => {
return Err(RuntimeError::Exit);
}
2021-08-22 14:28:20 +02:00
(Some(p), Outcome::Stop) => {
p.kill().await;
2021-08-22 14:28:20 +02:00
p.wait().await?;
2021-08-22 17:12:23 +02:00
*process = None;
2021-08-22 14:28:20 +02:00
}
2021-09-02 23:25:06 +02:00
(None, o @ Outcome::Stop) | (None, o @ Outcome::Wait) | (None, o @ Outcome::Signal(_)) => {
debug!(outcome=?o, "meaningless without a process, not doing anything");
2021-08-22 14:28:20 +02:00
}
2021-09-02 23:25:06 +02:00
(_, Outcome::Start) => {
if working.command.is_empty() {
warn!("tried to start a command without anything to run");
} else {
2021-08-24 18:41:14 +02:00
let command = working.shell.to_command(&working.command);
let (pre_spawn, command) = PreSpawn::new(command, working.command.clone());
debug!("running pre-spawn handler");
pre_spawn_handler
.handle(pre_spawn)
.map_err(|e| rte("action pre-spawn", e))?;
2021-08-22 14:28:20 +02:00
2021-08-24 18:41:14 +02:00
let mut command = Arc::try_unwrap(command)
.map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))?
.into_inner();
2021-08-22 14:28:20 +02:00
trace!("spawing supervisor for command");
let sup = Supervisor::spawn(
errors.clone(),
events.clone(),
&mut command,
working.grouped,
)?;
2021-08-22 14:28:20 +02:00
2021-08-24 18:41:14 +02:00
debug!("running post-spawn handler");
let post_spawn = PostSpawn {
command: working.command.clone(),
id: sup.id(),
2021-08-24 18:41:14 +02:00
grouped: working.grouped,
};
post_spawn_handler
.handle(post_spawn)
.map_err(|e| rte("action post-spawn", e))?;
2021-08-22 14:28:20 +02:00
2021-09-02 23:25:06 +02:00
// TODO: consider what we want to do for processes still running here?
*process = Some(sup);
}
2021-08-22 14:28:20 +02:00
}
(Some(p), Outcome::Signal(sig)) => {
// TODO: windows
p.signal(sig).await;
2021-08-22 14:28:20 +02:00
}
(Some(p), Outcome::Wait) => {
p.wait().await?;
}
2021-08-22 14:28:20 +02:00
(_, Outcome::Clear) => {
clearscreen::clear()?;
}
2021-09-03 15:55:25 +02:00
(_, Outcome::Reset) => {
for cs in [
ClearScreen::WindowsCooked,
ClearScreen::WindowsVt,
2021-09-03 16:00:57 +02:00
ClearScreen::VtLeaveAlt,
2021-09-03 15:55:25 +02:00
ClearScreen::VtWellDone,
ClearScreen::default(),
] {
cs.clear()?;
}
}
2021-08-22 14:28:20 +02:00
(Some(_), Outcome::IfRunning(then, _)) => {
2021-08-24 18:41:14 +02:00
apply_outcome(
*then,
working,
process,
pre_spawn_handler,
post_spawn_handler,
errors,
events,
2021-08-24 18:41:14 +02:00
)
.await?;
2021-08-22 14:28:20 +02:00
}
(None, Outcome::IfRunning(_, otherwise)) => {
2021-08-24 18:41:14 +02:00
apply_outcome(
*otherwise,
working,
process,
pre_spawn_handler,
post_spawn_handler,
errors,
events,
2021-08-24 18:41:14 +02:00
)
.await?;
2021-08-22 14:28:20 +02:00
}
(_, Outcome::Both(one, two)) => {
if let Err(err) = apply_outcome(
2021-08-24 18:41:14 +02:00
*one,
working.clone(),
process,
pre_spawn_handler,
post_spawn_handler,
errors.clone(),
events.clone(),
2021-08-24 18:41:14 +02:00
)
.await
{
debug!(
"first outcome failed, sending an error but proceeding to the second anyway"
);
errors.send(err).await.ok();
}
2021-08-24 18:41:14 +02:00
apply_outcome(
*two,
working,
process,
pre_spawn_handler,
post_spawn_handler,
errors,
events,
2021-08-24 18:41:14 +02:00
)
.await?;
2021-08-22 14:28:20 +02:00
}
}
Ok(())
}