From 8e4994abcab1a264c7cde7cb1e1d49354ca32894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Fri, 3 Sep 2021 05:22:15 +1200 Subject: [PATCH] Add process supervisor to watch command to completion Also change the concept of a completion handler to instead sending a synthetic "process completed" event down the same path as usual. That makes handling completion the job of the action handler, but also means it's immediately possible to launch a process or do an action in response to the process completing. Win win! --- cli/src/config.rs | 17 +- lib/src/action.rs | 66 +++---- lib/src/command.rs | 318 ++-------------------------------- lib/src/command/process.rs | 105 +++++++++++ lib/src/command/shell.rs | 207 ++++++++++++++++++++++ lib/src/command/supervisor.rs | 183 +++++++++++++++++++ lib/src/error.rs | 5 + lib/src/event.rs | 5 +- lib/src/watchexec.rs | 5 +- 9 files changed, 560 insertions(+), 351 deletions(-) create mode 100644 lib/src/command/process.rs create mode 100644 lib/src/command/shell.rs create mode 100644 lib/src/command/supervisor.rs diff --git a/cli/src/config.rs b/cli/src/config.rs index 840ef9c6..8941f040 100644 --- a/cli/src/config.rs +++ b/cli/src/config.rs @@ -4,7 +4,15 @@ use std::{ use clap::ArgMatches; use color_eyre::eyre::{eyre, Result}; -use watchexec::{action::{Action, Outcome, Signal}, command::Shell, config::{InitConfig, RuntimeConfig}, event::Event, fs::Watcher, handler::PrintDisplay, signal::Signal as InputSignal}; +use watchexec::{ + action::{Action, Outcome, Signal}, + command::Shell, + config::{InitConfig, RuntimeConfig}, + event::Event, + fs::Watcher, + handler::PrintDisplay, + signal::Signal as InputSignal, +}; pub fn new(args: &ArgMatches<'static>) -> Result<(InitConfig, RuntimeConfig)> { Ok((init(args)?, runtime(args)?)) @@ -89,7 +97,12 @@ fn runtime(args: &ArgMatches<'static>) -> Result { if print_events { for (n, event) in action.events.iter().enumerate() { for path in event.paths() { - eprintln!("[EVENT {}] Path: {} -- {:?}", n, path.display(), event.metadata); + eprintln!( + "[EVENT {}] Path: {} -- {:?}", + n, + path.display(), + event.metadata + ); } for signal in event.signals() { diff --git a/lib/src/action.rs b/lib/src/action.rs index 10cfae26..7c0da868 100644 --- a/lib/src/action.rs +++ b/lib/src/action.rs @@ -7,7 +7,6 @@ use std::{ }; use atomic_take::AtomicTake; -use command_group::AsyncCommandGroup; use once_cell::sync::OnceCell; use tokio::{ process::Command, @@ -17,7 +16,7 @@ use tokio::{ use tracing::{debug, trace, warn}; use crate::{ - command::{Process, Shell}, + command::{Shell, Supervisor}, error::{CriticalError, RuntimeError}, event::Event, handler::{rte, Handler}, @@ -33,7 +32,6 @@ pub struct WorkingData { pub action_handler: Arc + Send>>>, pub pre_spawn_handler: Arc + Send>>>, pub post_spawn_handler: Arc + Send>>>, - pub completion_handler: Arc + Send>>>, pub shell: Shell, @@ -62,7 +60,6 @@ impl Default for WorkingData { action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), pre_spawn_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), post_spawn_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), - completion_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), shell: Shell::default(), command: Vec::new(), grouped: true, @@ -198,11 +195,12 @@ impl Outcome { pub async fn worker( working: watch::Receiver, errors: mpsc::Sender, + events_tx: mpsc::Sender, mut events: mpsc::Receiver, ) -> Result<(), CriticalError> { let mut last = Instant::now(); let mut set = Vec::new(); - let mut process: Option = None; + let mut process: Option = None; let mut action_handler = { working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?; @@ -210,8 +208,6 @@ pub async fn worker( { working.borrow().pre_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?; let mut post_spawn_handler = { working.borrow().post_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?; - let mut completion_handler = - { working.borrow().completion_handler.take() }.ok_or(CriticalError::MissingHandler)?; loop { let maxtime = if set.is_empty() { @@ -277,11 +273,6 @@ pub async fn worker( post_spawn_handler = h; } - if let Some(h) = working.borrow().completion_handler.take() { - trace!("completion handler updated"); - completion_handler = h; - } - debug!("running action handler"); let outcome = action.outcome.clone(); let err = action_handler @@ -296,15 +287,7 @@ pub async fn worker( let outcome = outcome.get().cloned().unwrap_or_default(); debug!(?outcome, "handler finished"); - let is_running = match process.as_mut().map(|p| p.is_running()).transpose() { - Err(err) => { - errors.send(err).await?; - false - } - Ok(Some(ir)) => ir, - Ok(None) => false, - }; - + let is_running = process.as_ref().map(|p| p.is_running()).unwrap_or(false); let outcome = outcome.resolve(is_running); debug!(?outcome, "outcome resolved"); @@ -315,6 +298,8 @@ pub async fn worker( &mut process, &mut pre_spawn_handler, &mut post_spawn_handler, + errors.clone(), + events_tx.clone(), ) .await; if let Err(err) = rerr { @@ -330,9 +315,11 @@ pub async fn worker( async fn apply_outcome( outcome: Outcome, working: WorkingData, - process: &mut Option, + process: &mut Option, pre_spawn_handler: &mut Box + Send>, post_spawn_handler: &mut Box + Send>, + errors: mpsc::Sender, + events: mpsc::Sender, ) -> Result<(), RuntimeError> { match (process.as_mut(), outcome) { (_, Outcome::DoNothing) => {} @@ -365,38 +352,31 @@ async fn apply_outcome( .map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))? .into_inner(); - debug!(grouped=%working.grouped, ?command, "spawning command"); - let (proc, id) = if working.grouped { - let proc = command.group_spawn()?; - let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?; - debug!(pgid=%id, "process group spawned"); - (Process::Grouped(proc), id) - } else { - let proc = command.spawn()?; - let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?; - debug!(pid=%id, "process spawned"); - (Process::Ungrouped(proc), id) - }; + trace!("spawing supervisor for command"); + let sup = Supervisor::spawn( + errors.clone(), + events.clone(), + &mut command, + working.grouped, + )?; debug!("running post-spawn handler"); let post_spawn = PostSpawn { command: working.command.clone(), - id, + id: sup.id(), grouped: working.grouped, }; post_spawn_handler .handle(post_spawn) .map_err(|e| rte("action post-spawn", e))?; - *process = Some(proc); - - // TODO: post-stop hook (immediately after *process* ends, not when Stop is applied) + *process = Some(sup); } } (Some(p), Outcome::Signal(sig)) => { // TODO: windows - p.signal(sig)?; + p.signal(sig).await?; } (_, Outcome::Clear) => { @@ -410,6 +390,8 @@ async fn apply_outcome( process, pre_spawn_handler, post_spawn_handler, + errors, + events, ) .await?; } @@ -420,6 +402,8 @@ async fn apply_outcome( process, pre_spawn_handler, post_spawn_handler, + errors, + events, ) .await?; } @@ -431,6 +415,8 @@ async fn apply_outcome( process, pre_spawn_handler, post_spawn_handler, + errors.clone(), + events.clone(), ) .await?; apply_outcome( @@ -439,6 +425,8 @@ async fn apply_outcome( process, pre_spawn_handler, post_spawn_handler, + errors, + events, ) .await?; } diff --git a/lib/src/command.rs b/lib/src/command.rs index e7f56737..fc03b0b3 100644 --- a/lib/src/command.rs +++ b/lib/src/command.rs @@ -1,312 +1,14 @@ -//! Command construction and configuration thereof. +//! Command construction, configuration, and tracking. -use std::process::ExitStatus; +#[doc(inline)] +pub use process::Process; -use command_group::{AsyncGroupChild, Signal}; -use tokio::process::{Child, Command}; -use tracing::{debug, trace}; +#[doc(inline)] +pub use shell::Shell; -use crate::error::RuntimeError; +#[doc(inline)] +pub use supervisor::Supervisor; -/// Shell to use to run commands. -/// -/// `Cmd` and `Powershell` are special-cased because they have different calling -/// conventions. Also `Cmd` is only available in Windows, while `Powershell` is -/// also available on unices (provided the end-user has it installed, of course). -/// -/// See [`Config.cmd`] for the semantics of `None` vs the -/// other options. -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum Shell { - /// Use no shell, and execute the command directly. - None, - - /// Use the given string as a unix shell invocation. - /// - /// This means two things: - /// - the program is invoked with `-c` followed by the command, and - /// - the string will be split on space, and the resulting vec used as - /// execvp(3) arguments: first is the shell program, rest are additional - /// arguments (which come before the `-c` mentioned above). This is a very - /// simplistic approach deliberately: it will not support quoted - /// arguments, for example. Use [`Shell::None`] with a custom command vec - /// if you want that. - Unix(String), - - /// Use the Windows CMD.EXE shell. - /// - /// This is invoked with `/C` followed by the command. - #[cfg(windows)] - Cmd, - - /// Use Powershell, on Windows or elsewhere. - /// - /// This is invoked with `-Command` followed by the command. - /// - /// This is preferred over `Unix("pwsh")`, though that will also work - /// on unices due to Powershell supporting the `-c` short option. - Powershell, -} - -impl Default for Shell { - #[cfg(windows)] - fn default() -> Self { - Self::Powershell - } - - #[cfg(not(windows))] - fn default() -> Self { - Self::Unix("sh".into()) - } -} - -impl Shell { - /// Obtain a [`Command`] given a list of command parts. - /// - /// Behaves as described in the enum documentation. - /// - /// # Panics - /// - /// - Panics if `cmd` is empty. - /// - Panics if the string in the `Unix` variant is empty or only whitespace. - pub fn to_command(&self, cmd: &[String]) -> Command { - assert!(!cmd.is_empty(), "cmd was empty"); - trace!(shell=?self, ?cmd, "constructing command"); - - match self { - Shell::None => { - // UNWRAP: checked by assert - #[allow(clippy::unwrap_used)] - let (first, rest) = cmd.split_first().unwrap(); - let mut c = Command::new(first); - c.args(rest); - c - } - - #[cfg(windows)] - Shell::Cmd => { - let mut c = Command::new("cmd.exe"); - c.arg("/C").arg(cmd.join(" ")); - c - } - - Shell::Powershell if cfg!(windows) => { - let mut c = Command::new("powershell.exe"); - c.arg("-Command").arg(cmd.join(" ")); - c - } - - Shell::Powershell => { - let mut c = Command::new("pwsh"); - c.arg("-Command").arg(cmd.join(" ")); - c - } - - Shell::Unix(name) => { - assert!(!name.is_empty(), "shell program was empty"); - let sh = name.split_ascii_whitespace().collect::>(); - - // UNWRAP: checked by assert - #[allow(clippy::unwrap_used)] - let (shprog, shopts) = sh.split_first().unwrap(); - - let mut c = Command::new(shprog); - c.args(shopts); - c.arg("-c").arg(cmd.join(" ")); - c - } - } - } -} - -#[derive(Debug)] -pub enum Process { - None, - Grouped(AsyncGroupChild), - Ungrouped(Child), - Done(ExitStatus), -} - -impl Default for Process { - fn default() -> Self { - Process::None - } -} - -impl Process { - #[cfg(unix)] - pub fn signal(&mut self, sig: Signal) -> Result<(), RuntimeError> { - use command_group::UnixChildExt; - - match self { - Self::None | Self::Done(_) => Ok(()), - Self::Grouped(c) => { - debug!(signal=%sig, pgid=?c.id(), "sending signal to process group"); - c.signal(sig) - } - Self::Ungrouped(c) => { - debug!(signal=%sig, pid=?c.id(), "sending signal to process"); - c.signal(sig) - } - } - .map_err(RuntimeError::Process) - } - - pub async fn kill(&mut self) -> Result<(), RuntimeError> { - match self { - Self::None | Self::Done(_) => Ok(()), - Self::Grouped(c) => { - debug!(pgid=?c.id(), "killing process group"); - c.kill() - } - Self::Ungrouped(c) => { - debug!(pid=?c.id(), "killing process"); - c.kill().await - } - } - .map_err(RuntimeError::Process) - } - - pub fn is_running(&mut self) -> Result { - match self { - Self::None | Self::Done(_) => Ok(false), - Self::Grouped(c) => c.try_wait().map(|status| { - trace!("try-waiting on process group"); - if let Some(status) = status { - trace!(?status, "converting to ::Done"); - *self = Self::Done(status); - true - } else { - false - } - }), - Self::Ungrouped(c) => c.try_wait().map(|status| { - trace!("try-waiting on process"); - if let Some(status) = status { - trace!(?status, "converting to ::Done"); - *self = Self::Done(status); - true - } else { - false - } - }), - } - .map_err(RuntimeError::Process) - } - - pub async fn wait(&mut self) -> Result, RuntimeError> { - match self { - Self::None => Ok(None), - Self::Done(status) => Ok(Some(*status)), - Self::Grouped(c) => { - trace!("waiting on process group"); - let status = c.wait().await?; - trace!(?status, "converting to ::Done"); - *self = Self::Done(status); - Ok(Some(status)) - } - Self::Ungrouped(c) => { - trace!("waiting on process"); - let status = c.wait().await?; - trace!(?status, "converting to ::Done"); - *self = Self::Done(status); - Ok(Some(status)) - } - } - .map_err(RuntimeError::Process) - } -} - -#[cfg(test)] -mod test { - use super::Shell; - use command_group::AsyncCommandGroup; - - #[tokio::test] - #[cfg(unix)] - async fn unix_shell_default() -> Result<(), std::io::Error> { - assert!(Shell::default() - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(unix)] - async fn unix_shell_none() -> Result<(), std::io::Error> { - assert!(Shell::None - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(unix)] - async fn unix_shell_alternate() -> Result<(), std::io::Error> { - assert!(Shell::Unix("bash".into()) - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(unix)] - async fn unix_shell_alternate_shopts() -> Result<(), std::io::Error> { - assert!(Shell::Unix("bash -o errexit".into()) - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(windows)] - async fn windows_shell_default() -> Result<(), std::io::Error> { - assert!(Shell::default() - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(windows)] - async fn windows_shell_cmd() -> Result<(), std::io::Error> { - assert!(Shell::Cmd - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(windows)] - async fn windows_shell_powershell() -> Result<(), std::io::Error> { - assert!(Shell::Powershell - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(windows)] - async fn windows_shell_unix_style_powershell() -> Result<(), std::io::Error> { - assert!(Shell::Unix("powershell.exe".into()) - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } -} +mod process; +mod shell; +mod supervisor; diff --git a/lib/src/command/process.rs b/lib/src/command/process.rs new file mode 100644 index 00000000..4122213d --- /dev/null +++ b/lib/src/command/process.rs @@ -0,0 +1,105 @@ +use std::process::ExitStatus; + +use command_group::{AsyncGroupChild, Signal}; +use tokio::process::Child; +use tracing::{debug, trace}; + +use crate::error::RuntimeError; + +#[derive(Debug)] +pub enum Process { + None, + Grouped(AsyncGroupChild), + Ungrouped(Child), + Done(ExitStatus), +} + +impl Default for Process { + fn default() -> Self { + Process::None + } +} + +impl Process { + #[cfg(unix)] + pub fn signal(&mut self, sig: Signal) -> Result<(), RuntimeError> { + use command_group::UnixChildExt; + + match self { + Self::None | Self::Done(_) => Ok(()), + Self::Grouped(c) => { + debug!(signal=%sig, pgid=?c.id(), "sending signal to process group"); + c.signal(sig) + } + Self::Ungrouped(c) => { + debug!(signal=%sig, pid=?c.id(), "sending signal to process"); + c.signal(sig) + } + } + .map_err(RuntimeError::Process) + } + + pub async fn kill(&mut self) -> Result<(), RuntimeError> { + match self { + Self::None | Self::Done(_) => Ok(()), + Self::Grouped(c) => { + debug!(pgid=?c.id(), "killing process group"); + c.kill() + } + Self::Ungrouped(c) => { + debug!(pid=?c.id(), "killing process"); + c.kill().await + } + } + .map_err(RuntimeError::Process) + } + + pub fn is_running(&mut self) -> Result { + match self { + Self::None | Self::Done(_) => Ok(false), + Self::Grouped(c) => c.try_wait().map(|status| { + trace!("try-waiting on process group"); + if let Some(status) = status { + trace!(?status, "converting to ::Done"); + *self = Self::Done(status); + true + } else { + false + } + }), + Self::Ungrouped(c) => c.try_wait().map(|status| { + trace!("try-waiting on process"); + if let Some(status) = status { + trace!(?status, "converting to ::Done"); + *self = Self::Done(status); + true + } else { + false + } + }), + } + .map_err(RuntimeError::Process) + } + + pub async fn wait(&mut self) -> Result, RuntimeError> { + match self { + Self::None => Ok(None), + Self::Done(status) => Ok(Some(*status)), + Self::Grouped(c) => { + trace!("waiting on process group"); + let status = c.wait().await?; + trace!(?status, "converting to ::Done"); + *self = Self::Done(status); + Ok(Some(status)) + } + Self::Ungrouped(c) => { + trace!("waiting on process"); + let status = c.wait().await?; + trace!(?status, "converting to ::Done"); + *self = Self::Done(status); + Ok(Some(status)) + } + } + .map_err(RuntimeError::Process) + } +} diff --git a/lib/src/command/shell.rs b/lib/src/command/shell.rs new file mode 100644 index 00000000..600f6e82 --- /dev/null +++ b/lib/src/command/shell.rs @@ -0,0 +1,207 @@ +use tokio::process::Command; +use tracing::trace; + +/// Shell to use to run commands. +/// +/// `Cmd` and `Powershell` are special-cased because they have different calling +/// conventions. Also `Cmd` is only available in Windows, while `Powershell` is +/// also available on unices (provided the end-user has it installed, of course). +/// +/// See [`Config.cmd`] for the semantics of `None` vs the +/// other options. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Shell { + /// Use no shell, and execute the command directly. + None, + + /// Use the given string as a unix shell invocation. + /// + /// This means two things: + /// - the program is invoked with `-c` followed by the command, and + /// - the string will be split on space, and the resulting vec used as + /// execvp(3) arguments: first is the shell program, rest are additional + /// arguments (which come before the `-c` mentioned above). This is a very + /// simplistic approach deliberately: it will not support quoted + /// arguments, for example. Use [`Shell::None`] with a custom command vec + /// if you want that. + Unix(String), + + /// Use the Windows CMD.EXE shell. + /// + /// This is invoked with `/C` followed by the command. + #[cfg(windows)] + Cmd, + + /// Use Powershell, on Windows or elsewhere. + /// + /// This is invoked with `-Command` followed by the command. + /// + /// This is preferred over `Unix("pwsh")`, though that will also work + /// on unices due to Powershell supporting the `-c` short option. + Powershell, +} + +impl Default for Shell { + #[cfg(windows)] + fn default() -> Self { + Self::Powershell + } + + #[cfg(not(windows))] + fn default() -> Self { + Self::Unix("sh".into()) + } +} + +impl Shell { + /// Obtain a [`Command`] given a list of command parts. + /// + /// Behaves as described in the enum documentation. + /// + /// # Panics + /// + /// - Panics if `cmd` is empty. + /// - Panics if the string in the `Unix` variant is empty or only whitespace. + pub fn to_command(&self, cmd: &[String]) -> Command { + assert!(!cmd.is_empty(), "cmd was empty"); + trace!(shell=?self, ?cmd, "constructing command"); + + match self { + Shell::None => { + // UNWRAP: checked by assert + #[allow(clippy::unwrap_used)] + let (first, rest) = cmd.split_first().unwrap(); + let mut c = Command::new(first); + c.args(rest); + c + } + + #[cfg(windows)] + Shell::Cmd => { + let mut c = Command::new("cmd.exe"); + c.arg("/C").arg(cmd.join(" ")); + c + } + + Shell::Powershell if cfg!(windows) => { + let mut c = Command::new("powershell.exe"); + c.arg("-Command").arg(cmd.join(" ")); + c + } + + Shell::Powershell => { + let mut c = Command::new("pwsh"); + c.arg("-Command").arg(cmd.join(" ")); + c + } + + Shell::Unix(name) => { + assert!(!name.is_empty(), "shell program was empty"); + let sh = name.split_ascii_whitespace().collect::>(); + + // UNWRAP: checked by assert + #[allow(clippy::unwrap_used)] + let (shprog, shopts) = sh.split_first().unwrap(); + + let mut c = Command::new(shprog); + c.args(shopts); + c.arg("-c").arg(cmd.join(" ")); + c + } + } + } +} + +#[cfg(test)] +mod test { + use super::Shell; + use command_group::AsyncCommandGroup; + + #[tokio::test] + #[cfg(unix)] + async fn unix_shell_default() -> Result<(), std::io::Error> { + assert!(Shell::default() + .to_command(&["echo".into(), "hi".into()]) + .group_status() + .await? + .success()); + Ok(()) + } + + #[tokio::test] + #[cfg(unix)] + async fn unix_shell_none() -> Result<(), std::io::Error> { + assert!(Shell::None + .to_command(&["echo".into(), "hi".into()]) + .group_status() + .await? + .success()); + Ok(()) + } + + #[tokio::test] + #[cfg(unix)] + async fn unix_shell_alternate() -> Result<(), std::io::Error> { + assert!(Shell::Unix("bash".into()) + .to_command(&["echo".into(), "hi".into()]) + .group_status() + .await? + .success()); + Ok(()) + } + + #[tokio::test] + #[cfg(unix)] + async fn unix_shell_alternate_shopts() -> Result<(), std::io::Error> { + assert!(Shell::Unix("bash -o errexit".into()) + .to_command(&["echo".into(), "hi".into()]) + .group_status() + .await? + .success()); + Ok(()) + } + + #[tokio::test] + #[cfg(windows)] + async fn windows_shell_default() -> Result<(), std::io::Error> { + assert!(Shell::default() + .to_command(&["echo".into(), "hi".into()]) + .group_status() + .await? + .success()); + Ok(()) + } + + #[tokio::test] + #[cfg(windows)] + async fn windows_shell_cmd() -> Result<(), std::io::Error> { + assert!(Shell::Cmd + .to_command(&["echo".into(), "hi".into()]) + .group_status() + .await? + .success()); + Ok(()) + } + + #[tokio::test] + #[cfg(windows)] + async fn windows_shell_powershell() -> Result<(), std::io::Error> { + assert!(Shell::Powershell + .to_command(&["echo".into(), "hi".into()]) + .group_status() + .await? + .success()); + Ok(()) + } + + #[tokio::test] + #[cfg(windows)] + async fn windows_shell_unix_style_powershell() -> Result<(), std::io::Error> { + assert!(Shell::Unix("powershell.exe".into()) + .to_command(&["echo".into(), "hi".into()]) + .group_status() + .await? + .success()); + Ok(()) + } +} diff --git a/lib/src/command/supervisor.rs b/lib/src/command/supervisor.rs new file mode 100644 index 00000000..90048fea --- /dev/null +++ b/lib/src/command/supervisor.rs @@ -0,0 +1,183 @@ +use command_group::{AsyncCommandGroup, Signal}; +use tokio::{ + process::Command, + select, spawn, + sync::{ + mpsc::{self, Sender}, + watch, + }, + task::JoinHandle, +}; +use tracing::{debug, error, trace}; + +use crate::{ + error::RuntimeError, + event::{Event, Particle}, +}; + +use super::Process; + +#[derive(Clone, Copy, Debug)] +enum Intervention { + Kill, + #[cfg(unix)] + Signal(Signal), +} + +#[derive(Debug)] +pub struct Supervisor { + id: u32, + completion: watch::Receiver, + intervene: Sender, + handle: JoinHandle<()>, +} + +impl Supervisor { + pub fn spawn( + errors: Sender, + events: Sender, + command: &mut Command, + grouped: bool, + ) -> Result { + debug!(%grouped, ?command, "spawning command"); + let (process, id) = if grouped { + let proc = command.group_spawn()?; + let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?; + debug!(pgid=%id, "process group spawned"); + (Process::Grouped(proc), id) + } else { + let proc = command.spawn()?; + let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?; + debug!(pid=%id, "process spawned"); + (Process::Ungrouped(proc), id) + }; + + let (mark_done, completion) = watch::channel(false); + let (int_s, int_r) = mpsc::channel(8); + + let handle = spawn(async move { + let mut process = process; + let mut int = int_r; + + debug!(?process, "starting task to watch on process"); + + loop { + select! { + p = process.wait() => { + match p { + Ok(_) => break, // deal with it below + Err(err) => { + error!(%err, "while waiting on process"); + errors.send(err).await.ok(); + trace!("marking process as done"); + mark_done.send(true).ok(); + return; + } + } + }, + Some(int) = int.recv() => { + match int { + Intervention::Kill => { + if let Err(err) = process.kill().await { + error!(%err, "while killing process"); + errors.send(err).await.ok(); + trace!("continuing to watch command"); + } + } + #[cfg(unix)] + Intervention::Signal(sig) => { + if let Err(err) = process.signal(sig) { + error!(%err, "while sending signal to process"); + errors.send(err).await.ok(); + trace!("continuing to watch command"); + } + } + } + } + else => break, + } + } + + trace!("got out of loop, waiting once more"); + match process.wait().await { + Err(err) => { + error!(%err, "while waiting on process"); + errors.send(err).await.ok(); + } + Ok(status) => { + let event = Event { + particulars: vec![Particle::ProcessCompletion(status)], + metadata: Default::default(), + }; + + debug!(?event, "creating synthetic process completion event"); + if let Err(err) = events.send(event).await { + error!(%err, "while sending process completion event"); + errors + .send(RuntimeError::EventChannelSend { + ctx: "command supervisor", + err, + }) + .await + .ok(); + } + } + } + + trace!("marking process as done"); + mark_done.send(true).ok(); + }); + + Ok(Self { + id, + completion, + intervene: int_s, + handle, // TODO: is there anything useful to do with this? do we need to keep it? + }) + } + + pub fn id(&self) -> u32 { + self.id + } + + #[cfg(unix)] + pub async fn signal(&self, signal: Signal) -> Result<(), RuntimeError> { + trace!(?signal, "sending signal intervention"); + self.intervene + .send(Intervention::Signal(signal)) + .await + .map_err(|err| RuntimeError::InternalSupervisor(err.to_string())) + } + + pub async fn kill(&self) -> Result<(), RuntimeError> { + trace!("sending kill intervention"); + self.intervene + .send(Intervention::Kill) + .await + .map_err(|err| RuntimeError::InternalSupervisor(err.to_string())) + } + + pub fn is_running(&self) -> bool { + !*self.completion.borrow() + } + + pub async fn wait(&mut self) -> Result<(), RuntimeError> { + debug!("waiting on supervisor completion"); + + loop { + self.completion + .changed() + .await + .map_err(|err| RuntimeError::InternalSupervisor(err.to_string()))?; + + if *self.completion.borrow() { + break; + } else { + debug!("got completion change event, but it wasn't done (waiting more)"); + } + } + + debug!("supervisor completed"); + Ok(()) + } +} diff --git a/lib/src/error.rs b/lib/src/error.rs index 4e2af7c4..4f4517a2 100644 --- a/lib/src/error.rs +++ b/lib/src/error.rs @@ -108,6 +108,11 @@ pub enum RuntimeError { err: notify::Error, }, + /// Opaque internal error from a command supervisor. + #[error("internal: command supervisor: {0}")] + #[diagnostic(code(watchexec::runtime::internal_supervisor))] + InternalSupervisor(String), + /// Error received when an event cannot be sent to the event channel. #[error("cannot send event from {ctx}: {err}")] #[diagnostic(code(watchexec::runtime::event_channel_send))] diff --git a/lib/src/event.rs b/lib/src/event.rs index df2f3716..3cb12081 100644 --- a/lib/src/event.rs +++ b/lib/src/event.rs @@ -9,6 +9,7 @@ use std::{ collections::HashMap, path::{Path, PathBuf}, + process::ExitStatus, }; use crate::signal::Signal; @@ -20,7 +21,8 @@ pub struct Event { pub metadata: HashMap>, } -/// Something which can be used to filter an event. +// TODO: this really needs a better name (along with "particulars") +/// Something which can be used to filter or qualify an event. #[derive(Clone, Debug, Eq, PartialEq)] #[non_exhaustive] pub enum Particle { @@ -28,6 +30,7 @@ pub enum Particle { Source(Source), Process(u32), Signal(Signal), + ProcessCompletion(Option), } /// The general origin of the event. diff --git a/lib/src/watchexec.rs b/lib/src/watchexec.rs index e280077a..0567f8b8 100644 --- a/lib/src/watchexec.rs +++ b/lib/src/watchexec.rs @@ -81,7 +81,10 @@ impl Watchexec { }}; } - let action = subtask!(action, action::worker(ac_r, er_s.clone(), ev_r)); + let action = subtask!( + action, + action::worker(ac_r, er_s.clone(), ev_s.clone(), ev_r) + ); let fs = subtask!(fs, fs::worker(fs_r, er_s.clone(), ev_s.clone())); let signal = subtask!(signal, signal::worker(er_s.clone(), ev_s.clone()));