Add process supervisor to watch command to completion

Also change the concept of a completion handler to instead sending a
synthetic "process completed" event down the same path as usual.

That makes handling completion the job of the action handler, but also
means it's immediately possible to launch a process or do an action in
response to the process completing. Win win!
This commit is contained in:
Félix Saparelli 2021-09-03 05:22:15 +12:00
parent 0f247e9e5c
commit 8e4994abca
No known key found for this signature in database
GPG key ID: B948C4BAE44FC474
9 changed files with 560 additions and 351 deletions

View file

@ -4,7 +4,15 @@ use std::{
use clap::ArgMatches; use clap::ArgMatches;
use color_eyre::eyre::{eyre, Result}; use color_eyre::eyre::{eyre, Result};
use watchexec::{action::{Action, Outcome, Signal}, command::Shell, config::{InitConfig, RuntimeConfig}, event::Event, fs::Watcher, handler::PrintDisplay, signal::Signal as InputSignal}; use watchexec::{
action::{Action, Outcome, Signal},
command::Shell,
config::{InitConfig, RuntimeConfig},
event::Event,
fs::Watcher,
handler::PrintDisplay,
signal::Signal as InputSignal,
};
pub fn new(args: &ArgMatches<'static>) -> Result<(InitConfig, RuntimeConfig)> { pub fn new(args: &ArgMatches<'static>) -> Result<(InitConfig, RuntimeConfig)> {
Ok((init(args)?, runtime(args)?)) Ok((init(args)?, runtime(args)?))
@ -89,7 +97,12 @@ fn runtime(args: &ArgMatches<'static>) -> Result<RuntimeConfig> {
if print_events { if print_events {
for (n, event) in action.events.iter().enumerate() { for (n, event) in action.events.iter().enumerate() {
for path in event.paths() { for path in event.paths() {
eprintln!("[EVENT {}] Path: {} -- {:?}", n, path.display(), event.metadata); eprintln!(
"[EVENT {}] Path: {} -- {:?}",
n,
path.display(),
event.metadata
);
} }
for signal in event.signals() { for signal in event.signals() {

View file

@ -7,7 +7,6 @@ use std::{
}; };
use atomic_take::AtomicTake; use atomic_take::AtomicTake;
use command_group::AsyncCommandGroup;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use tokio::{ use tokio::{
process::Command, process::Command,
@ -17,7 +16,7 @@ use tokio::{
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
use crate::{ use crate::{
command::{Process, Shell}, command::{Shell, Supervisor},
error::{CriticalError, RuntimeError}, error::{CriticalError, RuntimeError},
event::Event, event::Event,
handler::{rte, Handler}, handler::{rte, Handler},
@ -33,7 +32,6 @@ pub struct WorkingData {
pub action_handler: Arc<AtomicTake<Box<dyn Handler<Action> + Send>>>, pub action_handler: Arc<AtomicTake<Box<dyn Handler<Action> + Send>>>,
pub pre_spawn_handler: Arc<AtomicTake<Box<dyn Handler<PreSpawn> + Send>>>, pub pre_spawn_handler: Arc<AtomicTake<Box<dyn Handler<PreSpawn> + Send>>>,
pub post_spawn_handler: Arc<AtomicTake<Box<dyn Handler<PostSpawn> + Send>>>, pub post_spawn_handler: Arc<AtomicTake<Box<dyn Handler<PostSpawn> + Send>>>,
pub completion_handler: Arc<AtomicTake<Box<dyn Handler<Action> + Send>>>,
pub shell: Shell, pub shell: Shell,
@ -62,7 +60,6 @@ impl Default for WorkingData {
action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
pre_spawn_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), pre_spawn_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
post_spawn_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), post_spawn_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
completion_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
shell: Shell::default(), shell: Shell::default(),
command: Vec::new(), command: Vec::new(),
grouped: true, grouped: true,
@ -198,11 +195,12 @@ impl Outcome {
pub async fn worker( pub async fn worker(
working: watch::Receiver<WorkingData>, working: watch::Receiver<WorkingData>,
errors: mpsc::Sender<RuntimeError>, errors: mpsc::Sender<RuntimeError>,
events_tx: mpsc::Sender<Event>,
mut events: mpsc::Receiver<Event>, mut events: mpsc::Receiver<Event>,
) -> Result<(), CriticalError> { ) -> Result<(), CriticalError> {
let mut last = Instant::now(); let mut last = Instant::now();
let mut set = Vec::new(); let mut set = Vec::new();
let mut process: Option<Process> = None; let mut process: Option<Supervisor> = None;
let mut action_handler = let mut action_handler =
{ working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?; { working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?;
@ -210,8 +208,6 @@ pub async fn worker(
{ working.borrow().pre_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?; { working.borrow().pre_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?;
let mut post_spawn_handler = let mut post_spawn_handler =
{ working.borrow().post_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?; { working.borrow().post_spawn_handler.take() }.ok_or(CriticalError::MissingHandler)?;
let mut completion_handler =
{ working.borrow().completion_handler.take() }.ok_or(CriticalError::MissingHandler)?;
loop { loop {
let maxtime = if set.is_empty() { let maxtime = if set.is_empty() {
@ -277,11 +273,6 @@ pub async fn worker(
post_spawn_handler = h; post_spawn_handler = h;
} }
if let Some(h) = working.borrow().completion_handler.take() {
trace!("completion handler updated");
completion_handler = h;
}
debug!("running action handler"); debug!("running action handler");
let outcome = action.outcome.clone(); let outcome = action.outcome.clone();
let err = action_handler let err = action_handler
@ -296,15 +287,7 @@ pub async fn worker(
let outcome = outcome.get().cloned().unwrap_or_default(); let outcome = outcome.get().cloned().unwrap_or_default();
debug!(?outcome, "handler finished"); debug!(?outcome, "handler finished");
let is_running = match process.as_mut().map(|p| p.is_running()).transpose() { let is_running = process.as_ref().map(|p| p.is_running()).unwrap_or(false);
Err(err) => {
errors.send(err).await?;
false
}
Ok(Some(ir)) => ir,
Ok(None) => false,
};
let outcome = outcome.resolve(is_running); let outcome = outcome.resolve(is_running);
debug!(?outcome, "outcome resolved"); debug!(?outcome, "outcome resolved");
@ -315,6 +298,8 @@ pub async fn worker(
&mut process, &mut process,
&mut pre_spawn_handler, &mut pre_spawn_handler,
&mut post_spawn_handler, &mut post_spawn_handler,
errors.clone(),
events_tx.clone(),
) )
.await; .await;
if let Err(err) = rerr { if let Err(err) = rerr {
@ -330,9 +315,11 @@ pub async fn worker(
async fn apply_outcome( async fn apply_outcome(
outcome: Outcome, outcome: Outcome,
working: WorkingData, working: WorkingData,
process: &mut Option<Process>, process: &mut Option<Supervisor>,
pre_spawn_handler: &mut Box<dyn Handler<PreSpawn> + Send>, pre_spawn_handler: &mut Box<dyn Handler<PreSpawn> + Send>,
post_spawn_handler: &mut Box<dyn Handler<PostSpawn> + Send>, post_spawn_handler: &mut Box<dyn Handler<PostSpawn> + Send>,
errors: mpsc::Sender<RuntimeError>,
events: mpsc::Sender<Event>,
) -> Result<(), RuntimeError> { ) -> Result<(), RuntimeError> {
match (process.as_mut(), outcome) { match (process.as_mut(), outcome) {
(_, Outcome::DoNothing) => {} (_, Outcome::DoNothing) => {}
@ -365,38 +352,31 @@ async fn apply_outcome(
.map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))? .map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))?
.into_inner(); .into_inner();
debug!(grouped=%working.grouped, ?command, "spawning command"); trace!("spawing supervisor for command");
let (proc, id) = if working.grouped { let sup = Supervisor::spawn(
let proc = command.group_spawn()?; errors.clone(),
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?; events.clone(),
debug!(pgid=%id, "process group spawned"); &mut command,
(Process::Grouped(proc), id) working.grouped,
} else { )?;
let proc = command.spawn()?;
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
debug!(pid=%id, "process spawned");
(Process::Ungrouped(proc), id)
};
debug!("running post-spawn handler"); debug!("running post-spawn handler");
let post_spawn = PostSpawn { let post_spawn = PostSpawn {
command: working.command.clone(), command: working.command.clone(),
id, id: sup.id(),
grouped: working.grouped, grouped: working.grouped,
}; };
post_spawn_handler post_spawn_handler
.handle(post_spawn) .handle(post_spawn)
.map_err(|e| rte("action post-spawn", e))?; .map_err(|e| rte("action post-spawn", e))?;
*process = Some(proc); *process = Some(sup);
// TODO: post-stop hook (immediately after *process* ends, not when Stop is applied)
} }
} }
(Some(p), Outcome::Signal(sig)) => { (Some(p), Outcome::Signal(sig)) => {
// TODO: windows // TODO: windows
p.signal(sig)?; p.signal(sig).await?;
} }
(_, Outcome::Clear) => { (_, Outcome::Clear) => {
@ -410,6 +390,8 @@ async fn apply_outcome(
process, process,
pre_spawn_handler, pre_spawn_handler,
post_spawn_handler, post_spawn_handler,
errors,
events,
) )
.await?; .await?;
} }
@ -420,6 +402,8 @@ async fn apply_outcome(
process, process,
pre_spawn_handler, pre_spawn_handler,
post_spawn_handler, post_spawn_handler,
errors,
events,
) )
.await?; .await?;
} }
@ -431,6 +415,8 @@ async fn apply_outcome(
process, process,
pre_spawn_handler, pre_spawn_handler,
post_spawn_handler, post_spawn_handler,
errors.clone(),
events.clone(),
) )
.await?; .await?;
apply_outcome( apply_outcome(
@ -439,6 +425,8 @@ async fn apply_outcome(
process, process,
pre_spawn_handler, pre_spawn_handler,
post_spawn_handler, post_spawn_handler,
errors,
events,
) )
.await?; .await?;
} }

View file

@ -1,312 +1,14 @@
//! Command construction and configuration thereof. //! Command construction, configuration, and tracking.
use std::process::ExitStatus; #[doc(inline)]
pub use process::Process;
use command_group::{AsyncGroupChild, Signal}; #[doc(inline)]
use tokio::process::{Child, Command}; pub use shell::Shell;
use tracing::{debug, trace};
use crate::error::RuntimeError; #[doc(inline)]
pub use supervisor::Supervisor;
/// Shell to use to run commands. mod process;
/// mod shell;
/// `Cmd` and `Powershell` are special-cased because they have different calling mod supervisor;
/// conventions. Also `Cmd` is only available in Windows, while `Powershell` is
/// also available on unices (provided the end-user has it installed, of course).
///
/// See [`Config.cmd`] for the semantics of `None` vs the
/// other options.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Shell {
/// Use no shell, and execute the command directly.
None,
/// Use the given string as a unix shell invocation.
///
/// This means two things:
/// - the program is invoked with `-c` followed by the command, and
/// - the string will be split on space, and the resulting vec used as
/// execvp(3) arguments: first is the shell program, rest are additional
/// arguments (which come before the `-c` mentioned above). This is a very
/// simplistic approach deliberately: it will not support quoted
/// arguments, for example. Use [`Shell::None`] with a custom command vec
/// if you want that.
Unix(String),
/// Use the Windows CMD.EXE shell.
///
/// This is invoked with `/C` followed by the command.
#[cfg(windows)]
Cmd,
/// Use Powershell, on Windows or elsewhere.
///
/// This is invoked with `-Command` followed by the command.
///
/// This is preferred over `Unix("pwsh")`, though that will also work
/// on unices due to Powershell supporting the `-c` short option.
Powershell,
}
impl Default for Shell {
#[cfg(windows)]
fn default() -> Self {
Self::Powershell
}
#[cfg(not(windows))]
fn default() -> Self {
Self::Unix("sh".into())
}
}
impl Shell {
/// Obtain a [`Command`] given a list of command parts.
///
/// Behaves as described in the enum documentation.
///
/// # Panics
///
/// - Panics if `cmd` is empty.
/// - Panics if the string in the `Unix` variant is empty or only whitespace.
pub fn to_command(&self, cmd: &[String]) -> Command {
assert!(!cmd.is_empty(), "cmd was empty");
trace!(shell=?self, ?cmd, "constructing command");
match self {
Shell::None => {
// UNWRAP: checked by assert
#[allow(clippy::unwrap_used)]
let (first, rest) = cmd.split_first().unwrap();
let mut c = Command::new(first);
c.args(rest);
c
}
#[cfg(windows)]
Shell::Cmd => {
let mut c = Command::new("cmd.exe");
c.arg("/C").arg(cmd.join(" "));
c
}
Shell::Powershell if cfg!(windows) => {
let mut c = Command::new("powershell.exe");
c.arg("-Command").arg(cmd.join(" "));
c
}
Shell::Powershell => {
let mut c = Command::new("pwsh");
c.arg("-Command").arg(cmd.join(" "));
c
}
Shell::Unix(name) => {
assert!(!name.is_empty(), "shell program was empty");
let sh = name.split_ascii_whitespace().collect::<Vec<_>>();
// UNWRAP: checked by assert
#[allow(clippy::unwrap_used)]
let (shprog, shopts) = sh.split_first().unwrap();
let mut c = Command::new(shprog);
c.args(shopts);
c.arg("-c").arg(cmd.join(" "));
c
}
}
}
}
#[derive(Debug)]
pub enum Process {
None,
Grouped(AsyncGroupChild),
Ungrouped(Child),
Done(ExitStatus),
}
impl Default for Process {
fn default() -> Self {
Process::None
}
}
impl Process {
#[cfg(unix)]
pub fn signal(&mut self, sig: Signal) -> Result<(), RuntimeError> {
use command_group::UnixChildExt;
match self {
Self::None | Self::Done(_) => Ok(()),
Self::Grouped(c) => {
debug!(signal=%sig, pgid=?c.id(), "sending signal to process group");
c.signal(sig)
}
Self::Ungrouped(c) => {
debug!(signal=%sig, pid=?c.id(), "sending signal to process");
c.signal(sig)
}
}
.map_err(RuntimeError::Process)
}
pub async fn kill(&mut self) -> Result<(), RuntimeError> {
match self {
Self::None | Self::Done(_) => Ok(()),
Self::Grouped(c) => {
debug!(pgid=?c.id(), "killing process group");
c.kill()
}
Self::Ungrouped(c) => {
debug!(pid=?c.id(), "killing process");
c.kill().await
}
}
.map_err(RuntimeError::Process)
}
pub fn is_running(&mut self) -> Result<bool, RuntimeError> {
match self {
Self::None | Self::Done(_) => Ok(false),
Self::Grouped(c) => c.try_wait().map(|status| {
trace!("try-waiting on process group");
if let Some(status) = status {
trace!(?status, "converting to ::Done");
*self = Self::Done(status);
true
} else {
false
}
}),
Self::Ungrouped(c) => c.try_wait().map(|status| {
trace!("try-waiting on process");
if let Some(status) = status {
trace!(?status, "converting to ::Done");
*self = Self::Done(status);
true
} else {
false
}
}),
}
.map_err(RuntimeError::Process)
}
pub async fn wait(&mut self) -> Result<Option<ExitStatus>, RuntimeError> {
match self {
Self::None => Ok(None),
Self::Done(status) => Ok(Some(*status)),
Self::Grouped(c) => {
trace!("waiting on process group");
let status = c.wait().await?;
trace!(?status, "converting to ::Done");
*self = Self::Done(status);
Ok(Some(status))
}
Self::Ungrouped(c) => {
trace!("waiting on process");
let status = c.wait().await?;
trace!(?status, "converting to ::Done");
*self = Self::Done(status);
Ok(Some(status))
}
}
.map_err(RuntimeError::Process)
}
}
#[cfg(test)]
mod test {
use super::Shell;
use command_group::AsyncCommandGroup;
#[tokio::test]
#[cfg(unix)]
async fn unix_shell_default() -> Result<(), std::io::Error> {
assert!(Shell::default()
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(unix)]
async fn unix_shell_none() -> Result<(), std::io::Error> {
assert!(Shell::None
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(unix)]
async fn unix_shell_alternate() -> Result<(), std::io::Error> {
assert!(Shell::Unix("bash".into())
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(unix)]
async fn unix_shell_alternate_shopts() -> Result<(), std::io::Error> {
assert!(Shell::Unix("bash -o errexit".into())
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(windows)]
async fn windows_shell_default() -> Result<(), std::io::Error> {
assert!(Shell::default()
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(windows)]
async fn windows_shell_cmd() -> Result<(), std::io::Error> {
assert!(Shell::Cmd
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(windows)]
async fn windows_shell_powershell() -> Result<(), std::io::Error> {
assert!(Shell::Powershell
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(windows)]
async fn windows_shell_unix_style_powershell() -> Result<(), std::io::Error> {
assert!(Shell::Unix("powershell.exe".into())
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
}

105
lib/src/command/process.rs Normal file
View file

@ -0,0 +1,105 @@
use std::process::ExitStatus;
use command_group::{AsyncGroupChild, Signal};
use tokio::process::Child;
use tracing::{debug, trace};
use crate::error::RuntimeError;
#[derive(Debug)]
pub enum Process {
None,
Grouped(AsyncGroupChild),
Ungrouped(Child),
Done(ExitStatus),
}
impl Default for Process {
fn default() -> Self {
Process::None
}
}
impl Process {
#[cfg(unix)]
pub fn signal(&mut self, sig: Signal) -> Result<(), RuntimeError> {
use command_group::UnixChildExt;
match self {
Self::None | Self::Done(_) => Ok(()),
Self::Grouped(c) => {
debug!(signal=%sig, pgid=?c.id(), "sending signal to process group");
c.signal(sig)
}
Self::Ungrouped(c) => {
debug!(signal=%sig, pid=?c.id(), "sending signal to process");
c.signal(sig)
}
}
.map_err(RuntimeError::Process)
}
pub async fn kill(&mut self) -> Result<(), RuntimeError> {
match self {
Self::None | Self::Done(_) => Ok(()),
Self::Grouped(c) => {
debug!(pgid=?c.id(), "killing process group");
c.kill()
}
Self::Ungrouped(c) => {
debug!(pid=?c.id(), "killing process");
c.kill().await
}
}
.map_err(RuntimeError::Process)
}
pub fn is_running(&mut self) -> Result<bool, RuntimeError> {
match self {
Self::None | Self::Done(_) => Ok(false),
Self::Grouped(c) => c.try_wait().map(|status| {
trace!("try-waiting on process group");
if let Some(status) = status {
trace!(?status, "converting to ::Done");
*self = Self::Done(status);
true
} else {
false
}
}),
Self::Ungrouped(c) => c.try_wait().map(|status| {
trace!("try-waiting on process");
if let Some(status) = status {
trace!(?status, "converting to ::Done");
*self = Self::Done(status);
true
} else {
false
}
}),
}
.map_err(RuntimeError::Process)
}
pub async fn wait(&mut self) -> Result<Option<ExitStatus>, RuntimeError> {
match self {
Self::None => Ok(None),
Self::Done(status) => Ok(Some(*status)),
Self::Grouped(c) => {
trace!("waiting on process group");
let status = c.wait().await?;
trace!(?status, "converting to ::Done");
*self = Self::Done(status);
Ok(Some(status))
}
Self::Ungrouped(c) => {
trace!("waiting on process");
let status = c.wait().await?;
trace!(?status, "converting to ::Done");
*self = Self::Done(status);
Ok(Some(status))
}
}
.map_err(RuntimeError::Process)
}
}

207
lib/src/command/shell.rs Normal file
View file

@ -0,0 +1,207 @@
use tokio::process::Command;
use tracing::trace;
/// Shell to use to run commands.
///
/// `Cmd` and `Powershell` are special-cased because they have different calling
/// conventions. Also `Cmd` is only available in Windows, while `Powershell` is
/// also available on unices (provided the end-user has it installed, of course).
///
/// See [`Config.cmd`] for the semantics of `None` vs the
/// other options.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Shell {
/// Use no shell, and execute the command directly.
None,
/// Use the given string as a unix shell invocation.
///
/// This means two things:
/// - the program is invoked with `-c` followed by the command, and
/// - the string will be split on space, and the resulting vec used as
/// execvp(3) arguments: first is the shell program, rest are additional
/// arguments (which come before the `-c` mentioned above). This is a very
/// simplistic approach deliberately: it will not support quoted
/// arguments, for example. Use [`Shell::None`] with a custom command vec
/// if you want that.
Unix(String),
/// Use the Windows CMD.EXE shell.
///
/// This is invoked with `/C` followed by the command.
#[cfg(windows)]
Cmd,
/// Use Powershell, on Windows or elsewhere.
///
/// This is invoked with `-Command` followed by the command.
///
/// This is preferred over `Unix("pwsh")`, though that will also work
/// on unices due to Powershell supporting the `-c` short option.
Powershell,
}
impl Default for Shell {
#[cfg(windows)]
fn default() -> Self {
Self::Powershell
}
#[cfg(not(windows))]
fn default() -> Self {
Self::Unix("sh".into())
}
}
impl Shell {
/// Obtain a [`Command`] given a list of command parts.
///
/// Behaves as described in the enum documentation.
///
/// # Panics
///
/// - Panics if `cmd` is empty.
/// - Panics if the string in the `Unix` variant is empty or only whitespace.
pub fn to_command(&self, cmd: &[String]) -> Command {
assert!(!cmd.is_empty(), "cmd was empty");
trace!(shell=?self, ?cmd, "constructing command");
match self {
Shell::None => {
// UNWRAP: checked by assert
#[allow(clippy::unwrap_used)]
let (first, rest) = cmd.split_first().unwrap();
let mut c = Command::new(first);
c.args(rest);
c
}
#[cfg(windows)]
Shell::Cmd => {
let mut c = Command::new("cmd.exe");
c.arg("/C").arg(cmd.join(" "));
c
}
Shell::Powershell if cfg!(windows) => {
let mut c = Command::new("powershell.exe");
c.arg("-Command").arg(cmd.join(" "));
c
}
Shell::Powershell => {
let mut c = Command::new("pwsh");
c.arg("-Command").arg(cmd.join(" "));
c
}
Shell::Unix(name) => {
assert!(!name.is_empty(), "shell program was empty");
let sh = name.split_ascii_whitespace().collect::<Vec<_>>();
// UNWRAP: checked by assert
#[allow(clippy::unwrap_used)]
let (shprog, shopts) = sh.split_first().unwrap();
let mut c = Command::new(shprog);
c.args(shopts);
c.arg("-c").arg(cmd.join(" "));
c
}
}
}
}
#[cfg(test)]
mod test {
use super::Shell;
use command_group::AsyncCommandGroup;
#[tokio::test]
#[cfg(unix)]
async fn unix_shell_default() -> Result<(), std::io::Error> {
assert!(Shell::default()
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(unix)]
async fn unix_shell_none() -> Result<(), std::io::Error> {
assert!(Shell::None
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(unix)]
async fn unix_shell_alternate() -> Result<(), std::io::Error> {
assert!(Shell::Unix("bash".into())
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(unix)]
async fn unix_shell_alternate_shopts() -> Result<(), std::io::Error> {
assert!(Shell::Unix("bash -o errexit".into())
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(windows)]
async fn windows_shell_default() -> Result<(), std::io::Error> {
assert!(Shell::default()
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(windows)]
async fn windows_shell_cmd() -> Result<(), std::io::Error> {
assert!(Shell::Cmd
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(windows)]
async fn windows_shell_powershell() -> Result<(), std::io::Error> {
assert!(Shell::Powershell
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
#[tokio::test]
#[cfg(windows)]
async fn windows_shell_unix_style_powershell() -> Result<(), std::io::Error> {
assert!(Shell::Unix("powershell.exe".into())
.to_command(&["echo".into(), "hi".into()])
.group_status()
.await?
.success());
Ok(())
}
}

View file

@ -0,0 +1,183 @@
use command_group::{AsyncCommandGroup, Signal};
use tokio::{
process::Command,
select, spawn,
sync::{
mpsc::{self, Sender},
watch,
},
task::JoinHandle,
};
use tracing::{debug, error, trace};
use crate::{
error::RuntimeError,
event::{Event, Particle},
};
use super::Process;
#[derive(Clone, Copy, Debug)]
enum Intervention {
Kill,
#[cfg(unix)]
Signal(Signal),
}
#[derive(Debug)]
pub struct Supervisor {
id: u32,
completion: watch::Receiver<bool>,
intervene: Sender<Intervention>,
handle: JoinHandle<()>,
}
impl Supervisor {
pub fn spawn(
errors: Sender<RuntimeError>,
events: Sender<Event>,
command: &mut Command,
grouped: bool,
) -> Result<Self, RuntimeError> {
debug!(%grouped, ?command, "spawning command");
let (process, id) = if grouped {
let proc = command.group_spawn()?;
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
debug!(pgid=%id, "process group spawned");
(Process::Grouped(proc), id)
} else {
let proc = command.spawn()?;
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
debug!(pid=%id, "process spawned");
(Process::Ungrouped(proc), id)
};
let (mark_done, completion) = watch::channel(false);
let (int_s, int_r) = mpsc::channel(8);
let handle = spawn(async move {
let mut process = process;
let mut int = int_r;
debug!(?process, "starting task to watch on process");
loop {
select! {
p = process.wait() => {
match p {
Ok(_) => break, // deal with it below
Err(err) => {
error!(%err, "while waiting on process");
errors.send(err).await.ok();
trace!("marking process as done");
mark_done.send(true).ok();
return;
}
}
},
Some(int) = int.recv() => {
match int {
Intervention::Kill => {
if let Err(err) = process.kill().await {
error!(%err, "while killing process");
errors.send(err).await.ok();
trace!("continuing to watch command");
}
}
#[cfg(unix)]
Intervention::Signal(sig) => {
if let Err(err) = process.signal(sig) {
error!(%err, "while sending signal to process");
errors.send(err).await.ok();
trace!("continuing to watch command");
}
}
}
}
else => break,
}
}
trace!("got out of loop, waiting once more");
match process.wait().await {
Err(err) => {
error!(%err, "while waiting on process");
errors.send(err).await.ok();
}
Ok(status) => {
let event = Event {
particulars: vec![Particle::ProcessCompletion(status)],
metadata: Default::default(),
};
debug!(?event, "creating synthetic process completion event");
if let Err(err) = events.send(event).await {
error!(%err, "while sending process completion event");
errors
.send(RuntimeError::EventChannelSend {
ctx: "command supervisor",
err,
})
.await
.ok();
}
}
}
trace!("marking process as done");
mark_done.send(true).ok();
});
Ok(Self {
id,
completion,
intervene: int_s,
handle, // TODO: is there anything useful to do with this? do we need to keep it?
})
}
pub fn id(&self) -> u32 {
self.id
}
#[cfg(unix)]
pub async fn signal(&self, signal: Signal) -> Result<(), RuntimeError> {
trace!(?signal, "sending signal intervention");
self.intervene
.send(Intervention::Signal(signal))
.await
.map_err(|err| RuntimeError::InternalSupervisor(err.to_string()))
}
pub async fn kill(&self) -> Result<(), RuntimeError> {
trace!("sending kill intervention");
self.intervene
.send(Intervention::Kill)
.await
.map_err(|err| RuntimeError::InternalSupervisor(err.to_string()))
}
pub fn is_running(&self) -> bool {
!*self.completion.borrow()
}
pub async fn wait(&mut self) -> Result<(), RuntimeError> {
debug!("waiting on supervisor completion");
loop {
self.completion
.changed()
.await
.map_err(|err| RuntimeError::InternalSupervisor(err.to_string()))?;
if *self.completion.borrow() {
break;
} else {
debug!("got completion change event, but it wasn't done (waiting more)");
}
}
debug!("supervisor completed");
Ok(())
}
}

View file

@ -108,6 +108,11 @@ pub enum RuntimeError {
err: notify::Error, err: notify::Error,
}, },
/// Opaque internal error from a command supervisor.
#[error("internal: command supervisor: {0}")]
#[diagnostic(code(watchexec::runtime::internal_supervisor))]
InternalSupervisor(String),
/// Error received when an event cannot be sent to the event channel. /// Error received when an event cannot be sent to the event channel.
#[error("cannot send event from {ctx}: {err}")] #[error("cannot send event from {ctx}: {err}")]
#[diagnostic(code(watchexec::runtime::event_channel_send))] #[diagnostic(code(watchexec::runtime::event_channel_send))]

View file

@ -9,6 +9,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
path::{Path, PathBuf}, path::{Path, PathBuf},
process::ExitStatus,
}; };
use crate::signal::Signal; use crate::signal::Signal;
@ -20,7 +21,8 @@ pub struct Event {
pub metadata: HashMap<String, Vec<String>>, pub metadata: HashMap<String, Vec<String>>,
} }
/// Something which can be used to filter an event. // TODO: this really needs a better name (along with "particulars")
/// Something which can be used to filter or qualify an event.
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive] #[non_exhaustive]
pub enum Particle { pub enum Particle {
@ -28,6 +30,7 @@ pub enum Particle {
Source(Source), Source(Source),
Process(u32), Process(u32),
Signal(Signal), Signal(Signal),
ProcessCompletion(Option<ExitStatus>),
} }
/// The general origin of the event. /// The general origin of the event.

View file

@ -81,7 +81,10 @@ impl Watchexec {
}}; }};
} }
let action = subtask!(action, action::worker(ac_r, er_s.clone(), ev_r)); let action = subtask!(
action,
action::worker(ac_r, er_s.clone(), ev_s.clone(), ev_r)
);
let fs = subtask!(fs, fs::worker(fs_r, er_s.clone(), ev_s.clone())); let fs = subtask!(fs, fs::worker(fs_r, er_s.clone(), ev_s.clone()));
let signal = subtask!(signal, signal::worker(er_s.clone(), ev_s.clone())); let signal = subtask!(signal, signal::worker(er_s.clone(), ev_s.clone()));