Add pre-spawn and post-spawn hooks

This commit is contained in:
Félix Saparelli 2021-08-25 04:41:14 +12:00
parent 1fd5c85317
commit ef453193af
No known key found for this signature in database
GPG key ID: B948C4BAE44FC474
3 changed files with 205 additions and 15 deletions

View file

@ -2,7 +2,7 @@
use std::{ use std::{
fmt, fmt,
sync::Arc, sync::{Arc, Weak},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -10,7 +10,8 @@ use atomic_take::AtomicTake;
use command_group::AsyncCommandGroup; use command_group::AsyncCommandGroup;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use tokio::{ use tokio::{
sync::{mpsc, watch}, process::Command,
sync::{mpsc, watch, Mutex, OwnedMutexGuard},
time::timeout, time::timeout,
}; };
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
@ -30,6 +31,9 @@ pub struct WorkingData {
pub throttle: Duration, pub throttle: Duration,
pub action_handler: Arc<AtomicTake<Box<dyn Handler<Action> + Send>>>, pub action_handler: Arc<AtomicTake<Box<dyn Handler<Action> + Send>>>,
pub pre_spawn_handler: Arc<AtomicTake<Box<dyn Handler<PreSpawn> + Send>>>,
pub post_spawn_handler: Arc<AtomicTake<Box<dyn Handler<PostSpawn> + Send>>>,
pub completion_handler: Arc<AtomicTake<Box<dyn Handler<Action> + Send>>>,
pub shell: Shell, pub shell: Shell,
@ -56,6 +60,9 @@ impl Default for WorkingData {
// set to 50ms here, but will remain 100ms on cli until 2022 // set to 50ms here, but will remain 100ms on cli until 2022
throttle: Duration::from_millis(50), throttle: Duration::from_millis(50),
action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
pre_spawn_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
post_spawn_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
completion_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
shell: Shell::default(), shell: Shell::default(),
command: Vec::new(), command: Vec::new(),
grouped: true, grouped: true,
@ -81,11 +88,57 @@ impl Action {
/// ///
/// This takes `self` and `Action` is not `Clone`, so it's only possible to call it once. /// This takes `self` and `Action` is not `Clone`, so it's only possible to call it once.
/// Regardless, if you _do_ manage to call it twice, it will do nothing beyond the first call. /// Regardless, if you _do_ manage to call it twice, it will do nothing beyond the first call.
///
/// See the [`Action`] documentation about handlers to learn why it's a bad idea to clone or
/// send it elsewhere, and what kind of handlers you cannot use.
pub fn outcome(self, outcome: Outcome) { pub fn outcome(self, outcome: Outcome) {
self.outcome.set(outcome).ok(); self.outcome.set(outcome).ok();
} }
} }
#[derive(Debug)]
#[non_exhaustive]
pub struct PreSpawn {
pub command: Vec<String>,
command_w: Weak<Mutex<Command>>,
}
impl PreSpawn {
fn new(command: Command, cmd: Vec<String>) -> (Self, Arc<Mutex<Command>>) {
let arc = Arc::new(Mutex::new(command));
(
Self {
command: cmd,
command_w: Arc::downgrade(&arc),
},
arc.clone(),
)
}
/// Get write access to the command that will be spawned.
///
/// Keeping the lock alive beyond the end of the handler may cause the command to be cancelled,
/// but note no guarantees are made on this behaviour. Just don't do it. See the [`Action`]
/// documentation about handlers for more.
///
/// This will always return `Some()` under normal circumstances.
pub async fn command(&self) -> Option<OwnedMutexGuard<Command>> {
if let Some(arc) = self.command_w.upgrade() {
Some(arc.lock_owned().await)
} else {
None
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct PostSpawn {
pub command: Vec<String>,
pub id: u32,
pub grouped: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive] #[non_exhaustive]
pub enum Outcome { pub enum Outcome {
@ -149,9 +202,16 @@ pub async fn worker(
) -> Result<(), CriticalError> { ) -> Result<(), CriticalError> {
let mut last = Instant::now(); let mut last = Instant::now();
let mut set = Vec::new(); let mut set = Vec::new();
let mut process: Option<Process> = None;
let mut action_handler = let mut action_handler =
{ working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?; { working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?;
let mut process: Option<Process> = None; let mut pre_spawn_handler =
{ working.borrow().pre_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?;
let mut post_spawn_handler =
{ working.borrow().post_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?;
let mut completion_handler =
{ working.borrow().completion_handler.take() }.ok_or(CriticalError::MissingHandler)?;
loop { loop {
let maxtime = if set.is_empty() { let maxtime = if set.is_empty() {
@ -207,12 +267,30 @@ pub async fn worker(
action_handler = h; action_handler = h;
} }
if let Some(h) = working.borrow().pre_spawn_handler.take() {
trace!("pre-spawn handler updated");
pre_spawn_handler = h;
}
if let Some(h) = working.borrow().post_spawn_handler.take() {
trace!("post-spawn handler updated");
post_spawn_handler = h;
}
if let Some(h) = working.borrow().completion_handler.take() {
trace!("completion handler updated");
completion_handler = h;
}
debug!("running action handler");
let outcome = action.outcome.clone(); let outcome = action.outcome.clone();
let err = action_handler let err = action_handler
.handle(action) .handle(action)
.map_err(|e| rte("action worker", e)); .map_err(|e| rte("action worker", e));
if let Err(err) = err { if let Err(err) = err {
errors.send(err).await?; errors.send(err).await?;
debug!("action handler errored, skipping");
continue;
} }
let outcome = outcome.get().cloned().unwrap_or_default(); let outcome = outcome.get().cloned().unwrap_or_default();
@ -231,7 +309,14 @@ pub async fn worker(
debug!(?outcome, "outcome resolved"); debug!(?outcome, "outcome resolved");
let w = working.borrow().clone(); let w = working.borrow().clone();
let rerr = apply_outcome(outcome, w, &mut process).await; let rerr = apply_outcome(
outcome,
w,
&mut process,
&mut pre_spawn_handler,
&mut post_spawn_handler,
)
.await;
if let Err(err) = rerr { if let Err(err) = rerr {
errors.send(err).await?; errors.send(err).await?;
} }
@ -246,6 +331,8 @@ async fn apply_outcome(
outcome: Outcome, outcome: Outcome,
working: WorkingData, working: WorkingData,
process: &mut Option<Process>, process: &mut Option<Process>,
pre_spawn_handler: &mut Box<dyn Handler<PreSpawn> + Send>,
post_spawn_handler: &mut Box<dyn Handler<PostSpawn> + Send>,
) -> Result<(), RuntimeError> { ) -> Result<(), RuntimeError> {
match (process.as_mut(), outcome) { match (process.as_mut(), outcome) {
(_, Outcome::DoNothing) => {} (_, Outcome::DoNothing) => {}
@ -266,18 +353,40 @@ async fn apply_outcome(
if working.command.is_empty() { if working.command.is_empty() {
warn!("tried to start a command without anything to run"); warn!("tried to start a command without anything to run");
} else { } else {
let mut command = working.shell.to_command(&working.command); let command = working.shell.to_command(&working.command);
let (pre_spawn, command) = PreSpawn::new(command, working.command.clone());
// TODO: pre-spawn hook debug!("running pre-spawn handler");
pre_spawn_handler
.handle(pre_spawn)
.map_err(|e| rte("action pre-spawn", e))?;
let mut command = Arc::try_unwrap(command)
.map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))?
.into_inner();
debug!(grouped=%working.grouped, ?command, "spawning command"); debug!(grouped=%working.grouped, ?command, "spawning command");
let proc = if working.grouped { let (proc, id) = if working.grouped {
Process::Grouped(command.group_spawn()?) let proc = command.group_spawn()?;
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
debug!(pgid=%id, "process group spawned");
(Process::Grouped(proc), id)
} else { } else {
Process::Ungrouped(command.spawn()?) let proc = command.spawn()?;
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
debug!(pid=%id, "process spawned");
(Process::Ungrouped(proc), id)
}; };
// TODO: post-spawn hook debug!("running post-spawn handler");
let post_spawn = PostSpawn {
command: working.command.clone(),
id,
grouped: working.grouped,
};
post_spawn_handler
.handle(post_spawn)
.map_err(|e| rte("action post-spawn", e))?;
*process = Some(proc); *process = Some(proc);
@ -295,15 +404,43 @@ async fn apply_outcome(
} }
(Some(_), Outcome::IfRunning(then, _)) => { (Some(_), Outcome::IfRunning(then, _)) => {
apply_outcome(*then, working, process).await?; apply_outcome(
*then,
working,
process,
pre_spawn_handler,
post_spawn_handler,
)
.await?;
} }
(None, Outcome::IfRunning(_, otherwise)) => { (None, Outcome::IfRunning(_, otherwise)) => {
apply_outcome(*otherwise, working, process).await?; apply_outcome(
*otherwise,
working,
process,
pre_spawn_handler,
post_spawn_handler,
)
.await?;
} }
(_, Outcome::Both(one, two)) => { (_, Outcome::Both(one, two)) => {
apply_outcome(*one, working.clone(), process).await?; apply_outcome(
apply_outcome(*two, working, process).await?; *one,
working.clone(),
process,
pre_spawn_handler,
post_spawn_handler,
)
.await?;
apply_outcome(
*two,
working,
process,
pre_spawn_handler,
post_spawn_handler,
)
.await?;
} }
} }

View file

@ -5,7 +5,13 @@ use std::{fmt, path::Path, sync::Arc, time::Duration};
use atomic_take::AtomicTake; use atomic_take::AtomicTake;
use derive_builder::Builder; use derive_builder::Builder;
use crate::{action::Action, command::Shell, error::RuntimeError, fs::Watcher, handler::Handler}; use crate::{
action::{Action, PostSpawn, PreSpawn},
command::Shell,
error::RuntimeError,
fs::Watcher,
handler::Handler,
};
/// Runtime configuration for [`Watchexec`][crate::Watchexec]. /// Runtime configuration for [`Watchexec`][crate::Watchexec].
/// ///
@ -92,6 +98,43 @@ impl RuntimeConfig {
self.action.action_handler = Arc::new(AtomicTake::empty()); self.action.action_handler = Arc::new(AtomicTake::empty());
self self
} }
/// Set the pre-spawn handler.
///
/// TODO: notes on locks held by handler
pub fn on_pre_spawn(&mut self, handler: impl Handler<PreSpawn> + Send + 'static) -> &mut Self {
self.action.pre_spawn_handler = Arc::new(AtomicTake::new(Box::new(handler) as _));
self
}
/// Keep the pre-spawn handler the same.
///
/// This is especially useful when reconfiguring _within_ the action handler.
///
/// Passing this config to `Watchexec::new()` will cause a `CriticalError::MissingHandler`.
pub fn keep_pre_spawn(&mut self) -> &mut Self {
self.action.pre_spawn_handler = Arc::new(AtomicTake::empty());
self
}
/// Set the post-spawn handler.
pub fn on_post_spawn(
&mut self,
handler: impl Handler<PostSpawn> + Send + 'static,
) -> &mut Self {
self.action.post_spawn_handler = Arc::new(AtomicTake::new(Box::new(handler) as _));
self
}
/// Keep the post-spawn handler the same.
///
/// This is especially useful when reconfiguring _within_ the action handler.
///
/// Passing this config to `Watchexec::new()` will cause a `CriticalError::MissingHandler`.
pub fn keep_post_spawn(&mut self) -> &mut Self {
self.action.post_spawn_handler = Arc::new(AtomicTake::empty());
self
}
} }
/// Initialisation configuration for [`Watchexec`][crate::Watchexec]. /// Initialisation configuration for [`Watchexec`][crate::Watchexec].

View file

@ -133,11 +133,21 @@ pub enum RuntimeError {
#[diagnostic(code(watchexec::runtime::handler))] #[diagnostic(code(watchexec::runtime::handler))]
Handler { ctx: &'static str, err: String }, Handler { ctx: &'static str, err: String },
/// Error received when a [`Handler`][crate::handler::Handler] which has been passed a lock has kept that lock open after the handler has completed.
#[error("{0} handler returned while holding a lock alive")]
#[diagnostic(code(watchexec::runtime::handler_lock_held))]
HandlerLockHeld(&'static str),
/// Error received when operating on a process. /// Error received when operating on a process.
#[error("when operating on process: {0}")] #[error("when operating on process: {0}")]
#[diagnostic(code(watchexec::runtime::process))] #[diagnostic(code(watchexec::runtime::process))]
Process(#[source] std::io::Error), Process(#[source] std::io::Error),
/// Error received when a process did not start correctly, or finished before we could even tell.
#[error("process was dead on arrival")]
#[diagnostic(code(watchexec::runtime::process_doa))]
ProcessDeadOnArrival,
/// Error received when clearing the screen. /// Error received when clearing the screen.
#[error("clear screen: {0}")] #[error("clear screen: {0}")]
#[diagnostic(code(watchexec::runtime::clearscreen))] #[diagnostic(code(watchexec::runtime::clearscreen))]