diff --git a/crates/cli/src/config/runtime.rs b/crates/cli/src/config/runtime.rs index 3ea8934..13333b2 100644 --- a/crates/cli/src/config/runtime.rs +++ b/crates/cli/src/config/runtime.rs @@ -9,8 +9,9 @@ use notify_rust::Notification; use tracing::debug; use watchexec::{ action::{Action, Outcome, PostSpawn, PreSpawn}, - command::Shell, + command::{Command, Shell}, config::RuntimeConfig, + error::RuntimeError, event::ProcessEnd, fs::Watcher, handler::SyncFnHandler, @@ -21,10 +22,7 @@ use watchexec::{ pub fn runtime(args: &ArgMatches) -> Result { let mut config = RuntimeConfig::default(); - config.command( - args.values_of("command") - .expect("(clap) Bug: command is not present") - ); + config.command(interpret_command_args(args)?); config.pathset(match args.values_of_os("paths") { Some(paths) => paths.map(|os| Path::new(os).to_owned()).collect(), @@ -48,22 +46,6 @@ pub fn runtime(args: &ArgMatches) -> Result { config.command_grouped(false); } - config.command_shell(if args.is_present("no-shell") { - Shell::None - } else if let Some(s) = args.value_of("shell") { - if s.eq_ignore_ascii_case("powershell") { - Shell::Powershell - } else if s.eq_ignore_ascii_case("none") { - Shell::None - } else if s.eq_ignore_ascii_case("cmd") { - cmd_shell(s.into()) - } else { - Shell::Unix(s.into()) - } - } else { - default_shell() - }); - let clear = args.is_present("clear"); let notif = args.is_present("notif"); let mut on_busy = args @@ -253,7 +235,7 @@ pub fn runtime(args: &ArgMatches) -> Result { if notif { Notification::new() .summary("Watchexec: change detected") - .body(&format!("Running `{}`", postspawn.command.join(" "))) + .body(&format!("Running {}", postspawn.command)) .show() .map(drop) .unwrap_or_else(|err| { @@ -267,6 +249,55 @@ pub fn runtime(args: &ArgMatches) -> Result { Ok(config) } +fn interpret_command_args(args: &ArgMatches) -> Result { + let mut cmd = args + .values_of("command") + .expect("(clap) Bug: command is not present") + .map(|s| s.to_string()) + .collect::>(); + + Ok(if args.is_present("no-shell") { + Command::Exec { + prog: cmd.remove(0), + args: cmd, + } + } else { + let (shell, shopts) = if let Some(s) = args.value_of("shell") { + if s.is_empty() { + return Err(RuntimeError::CommandShellEmptyShell).into_diagnostic(); + } else if s.eq_ignore_ascii_case("powershell") { + (Shell::Powershell, Vec::new()) + } else if s.eq_ignore_ascii_case("none") { + return Ok(Command::Exec { + prog: cmd.remove(0), + args: cmd, + }); + } else if s.eq_ignore_ascii_case("cmd") { + (cmd_shell(s.into()), Vec::new()) + } else { + let sh = s.split_ascii_whitespace().collect::>(); + + // UNWRAP: checked by first if branch + #[allow(clippy::unwrap_used)] + let (shprog, shopts) = sh.split_first().unwrap(); + + ( + Shell::Unix(shprog.to_string()), + shopts.iter().map(|s| s.to_string()).collect(), + ) + } + } else { + (default_shell(), Vec::new()) + }; + + Command::Shell { + shell, + args: shopts, + command: cmd.join(" "), + } + }) +} + // until 2.0, then Powershell #[cfg(windows)] fn default_shell() -> Shell { diff --git a/crates/lib/CHANGELOG.md b/crates/lib/CHANGELOG.md index 38e5a1d..5c6a7a6 100644 --- a/crates/lib/CHANGELOG.md +++ b/crates/lib/CHANGELOG.md @@ -10,6 +10,14 @@ First "stable" release of the library. - These five new crates live in the watchexec monorepo, rather than being completely separate like `command-group` and `clearscreen` - This makes the main library bit less likely to change as often as it did, so it was finally time to release 2.0.0! +- **Change: the Action worker now launches a set of Commands** + - A new type `Command` replaces and augments `Shell`, making explicit which style of calling will be used + - The action working data now takes a `Vec`, so multiple commands to be run as a set + - Commands in the set are run sequentially, with an error interrupting the sequence + - It is thus possible to run both "shelled" and "raw exec" commands in a set + - `PreSpawn` and `PostSpawn` handlers are run per Command, not per command set + - This new style should be preferred over sending command lines like `cmd1 && cmd2` + - **Change: the event queue is now a priority queue** - Shutting down the runtime is faster and more predictable. No more hanging after hitting Ctrl-C if there's tonnes of events coming in! - Signals sent to the main process have higher priority @@ -22,6 +30,7 @@ First "stable" release of the library. - Improvement: the main subtasks of the runtime are now aborted on error - Improvement: the event queue is explicitly closed when shutting down - Improvement: the action worker will check if the event queue is closed more often, to shutdown early +- Improvement: `kill_on_drop` is set on Commands, which will be a little more eager to terminate processes when we're done with them Other miscellaneous: diff --git a/crates/lib/examples/demo.rs b/crates/lib/examples/demo.rs index 0048793..8a36a4e 100644 --- a/crates/lib/examples/demo.rs +++ b/crates/lib/examples/demo.rs @@ -3,6 +3,7 @@ use std::time::Duration; use miette::{IntoDiagnostic, Result}; use watchexec::{ action::{Action, Outcome}, + command::Command, config::{InitConfig, RuntimeConfig}, error::ReconfigError, fs::Watcher, @@ -23,7 +24,10 @@ async fn main() -> Result<()> { let mut runtime = RuntimeConfig::default(); runtime.pathset(["src", "dontexist", "examples"]); - runtime.command(["date"]); + runtime.command(Command::Exec { + prog: "date".into(), + args: Vec::new(), + }); let wx = Watchexec::new(init, runtime.clone())?; let w = wx.clone(); diff --git a/crates/lib/src/action/outcome_worker.rs b/crates/lib/src/action/outcome_worker.rs index d48f48d..feb3817 100644 --- a/crates/lib/src/action/outcome_worker.rs +++ b/crates/lib/src/action/outcome_worker.rs @@ -16,10 +16,9 @@ use crate::{ command::Supervisor, error::RuntimeError, event::{Event, Priority}, - handler::rte, }; -use super::{process_holder::ProcessHolder, Outcome, PostSpawn, PreSpawn, WorkingData}; +use super::{process_holder::ProcessHolder, Outcome, WorkingData}; #[derive(Clone)] pub struct OutcomeWorker { @@ -119,50 +118,29 @@ impl OutcomeWorker { debug!(outcome=?o, "meaningless without a process, not doing anything"); } (_, Outcome::Start) => { - let (cmd, shell, grouped, pre_spawn_handler, post_spawn_handler) = { + let (cmds, grouped, pre_spawn_handler, post_spawn_handler) = { let wrk = self.working.borrow(); ( - wrk.command.clone(), - wrk.shell.clone(), + wrk.commands.clone(), wrk.grouped, wrk.pre_spawn_handler.clone(), wrk.post_spawn_handler.clone(), ) }; - if cmd.is_empty() { - warn!("tried to start a command without anything to run"); + if cmds.is_empty() { + warn!("tried to start commands without anything to run"); } else { - let command = shell.to_command(&cmd); - let (pre_spawn, command) = - PreSpawn::new(command, cmd.clone(), self.events.clone()); - - debug!("running pre-spawn handler"); - notry!(pre_spawn_handler.call(pre_spawn)) - .map_err(|e| rte("action pre-spawn", e))?; - - let mut command = Arc::try_unwrap(command) - .map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))? - .into_inner(); - trace!("spawning supervisor for command"); let sup = Supervisor::spawn( self.errors_c.clone(), self.events_c.clone(), - &mut command, + cmds, grouped, + self.events.clone(), + pre_spawn_handler, + post_spawn_handler, )?; - - debug!("running post-spawn handler"); - let post_spawn = PostSpawn { - command: cmd.clone(), - events: self.events.clone(), - id: sup.id(), - grouped, - }; - notry!(post_spawn_handler.call(post_spawn)) - .map_err(|e| rte("action post-spawn", e))?; - notry!(self.process.replace(sup)); } } diff --git a/crates/lib/src/action/workingdata.rs b/crates/lib/src/action/workingdata.rs index 892f424..1a39856 100644 --- a/crates/lib/src/action/workingdata.rs +++ b/crates/lib/src/action/workingdata.rs @@ -6,11 +6,11 @@ use std::{ use once_cell::sync::OnceCell; use tokio::{ - process::Command, + process::Command as TokioCommand, sync::{Mutex, OwnedMutexGuard}, }; -use crate::{command::Shell, event::Event, filter::Filterer, handler::HandlerLock}; +use crate::{command::Command, event::Event, filter::Filterer, handler::HandlerLock}; use super::Outcome; @@ -47,8 +47,8 @@ pub struct WorkingData { /// A handler triggered before a command is spawned. /// /// This handler is called with the [`PreSpawn`] environment, which provides mutable access to - /// the [`Command`] which is about to be run. See the notes on the [`PreSpawn::command()`] - /// method for important information on what you can do with it. + /// the [`Command`](TokioCommand) which is about to be run. See the notes on the + /// [`PreSpawn::command()`] method for important information on what you can do with it. /// /// Returning an error from the handler will stop the action from processing further, and issue /// a [`RuntimeError`][crate::error::RuntimeError] to the error channel. @@ -64,13 +64,10 @@ pub struct WorkingData { /// issue a [`RuntimeError`][crate::error::RuntimeError] to the error channel. pub post_spawn_handler: HandlerLock, - /// Command to execute. + /// Commands to execute. /// - /// When `shell` is [`Shell::None`], this is expected to be in “execvp(3)” format: first - /// program, rest arguments. Otherwise, all elements will be joined together with a single space - /// and passed to the shell. More control can then be obtained by providing a 1-element vec, and - /// doing your own joining and/or escaping there. - pub command: Vec, + /// These will be run in order, and an error will stop early. + pub commands: Vec, /// Whether to use process groups (on Unix) or job control (on Windows) to run the command. /// @@ -81,11 +78,6 @@ pub struct WorkingData { /// meantime. pub grouped: bool, - /// The shell to use to run the command. - /// - /// See the [`Shell`] enum documentation for more details. - pub shell: Shell, - /// The filterer implementation to use when filtering events. /// /// The default is a no-op, which will always pass every event. @@ -96,8 +88,7 @@ impl fmt::Debug for WorkingData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WorkingData") .field("throttle", &self.throttle) - .field("shell", &self.shell) - .field("command", &self.command) + .field("commands", &self.commands) .field("grouped", &self.grouped) .field("filterer", &self.filterer) .finish_non_exhaustive() @@ -107,13 +98,11 @@ impl fmt::Debug for WorkingData { impl Default for WorkingData { fn default() -> Self { Self { - // set to 50ms here, but will remain 100ms on cli until 2022 throttle: Duration::from_millis(50), action_handler: Default::default(), pre_spawn_handler: Default::default(), post_spawn_handler: Default::default(), - command: Vec::new(), - shell: Shell::default(), + commands: Vec::new(), grouped: true, filterer: Arc::new(()), } @@ -165,28 +154,26 @@ impl Action { #[non_exhaustive] pub struct PreSpawn { /// The command which is about to be spawned. - /// - /// This is the final command, after the [`Shell`] has been applied. - pub command: Vec, + pub command: Command, /// The collected events which triggered the action this command issues from. pub events: Arc<[Event]>, - command_w: Weak>, + to_spawn_w: Weak>, } impl PreSpawn { - pub(super) fn new( + pub(crate) fn new( command: Command, - cmd: Vec, + to_spawn: TokioCommand, events: Arc<[Event]>, - ) -> (Self, Arc>) { - let arc = Arc::new(Mutex::new(command)); + ) -> (Self, Arc>) { + let arc = Arc::new(Mutex::new(to_spawn)); ( Self { - command: cmd, + command, events, - command_w: Arc::downgrade(&arc), + to_spawn_w: Arc::downgrade(&arc), }, arc.clone(), ) @@ -199,8 +186,8 @@ impl PreSpawn { /// documentation about handlers for more. /// /// This will always return `Some()` under normal circumstances. - pub async fn command(&self) -> Option> { - if let Some(arc) = self.command_w.upgrade() { + pub async fn command(&self) -> Option> { + if let Some(arc) = self.to_spawn_w.upgrade() { Some(arc.lock_owned().await) } else { None @@ -216,8 +203,8 @@ impl PreSpawn { #[derive(Clone, Debug)] #[non_exhaustive] pub struct PostSpawn { - /// The final command the process was spawned with. - pub command: Vec, + /// The command the process was spawned with. + pub command: Command, /// The collected events which triggered the action the command issues from. pub events: Arc<[Event]>, diff --git a/crates/lib/src/command.rs b/crates/lib/src/command.rs index fc03b0b..3a0c3dd 100644 --- a/crates/lib/src/command.rs +++ b/crates/lib/src/command.rs @@ -1,14 +1,156 @@ //! Command construction, configuration, and tracking. +use std::fmt; + +use tokio::process::Command as TokioCommand; +use tracing::trace; + +use crate::error::RuntimeError; + #[doc(inline)] pub use process::Process; -#[doc(inline)] -pub use shell::Shell; - #[doc(inline)] pub use supervisor::Supervisor; mod process; -mod shell; mod supervisor; + +#[cfg(test)] +mod tests; + +/// A command to execute. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum Command { + /// A raw command which will be executed as-is. + Exec { + /// The program to run. + prog: String, + + /// The arguments to pass. + args: Vec, + }, + + /// A shelled command line. + Shell { + /// The shell to run. + shell: Shell, + + /// Additional options or arguments to pass to the shell. + /// + /// These will be inserted before the `-c` (or equivalent) option immediately preceding the + /// command line string. + args: Vec, + + /// The command line to pass to the shell. + command: String, + }, +} + +/// Shell to use to run shelled commands. +/// +/// `Cmd` and `Powershell` are special-cased because they have different calling conventions. Also +/// `Cmd` is only available in Windows, while `Powershell` is also available on unices (provided the +/// end-user has it installed, of course). +/// +/// There is no default implemented: as consumer of this library you are encouraged to set your own +/// default as makes sense in your application / for your platform. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum Shell { + /// Use the given string as a unix shell invocation. + /// + /// This means two things: + /// - the program is invoked with `-c` followed by the command, and + /// - the string will be split on space, and the resulting vec used as execvp(3) arguments: + /// first is the shell program, rest are additional arguments (which come before the `-c` + /// mentioned above). This is a very simplistic approach deliberately: it will not support + /// quoted arguments, for example. Use [`Shell::None`] with a custom command vec for that. + Unix(String), + + /// Use the Windows CMD.EXE shell. + /// + /// This is invoked with `/C` followed by the command. + #[cfg(windows)] + Cmd, + + /// Use Powershell, on Windows or elsewhere. + /// + /// This is invoked with `-Command` followed by the command. + /// + /// This is preferred over `Unix("pwsh")`, though that will also work on unices due to + /// Powershell supporting the `-c` short option. + Powershell, +} + +impl Command { + /// Obtain a [`tokio::process::Command`] from a [`Command`]. + /// + /// Behaves as described in the [`Command`] and [`Shell`] documentation. + /// + /// # Errors + /// + /// - Errors if the `command` of a `Command::Shell` is empty. + /// - Errors if the `shell` of a `Shell::Unix(shell)` is empty. + pub fn to_spawnable(&self) -> Result { + trace!(cmd=?self, "constructing command"); + + match self { + Command::Exec { prog, args } => { + let mut c = TokioCommand::new(prog); + c.args(args); + Ok(c) + } + + Command::Shell { + shell, + args, + command, + } => { + let (shcmd, shcliopt) = match shell { + #[cfg(windows)] + Shell::Cmd => ("cmd.exe", "/C"), + + #[cfg(windows)] + Shell::Powershell => ("powershell.exe", "-Command"), + #[cfg(not(windows))] + Shell::Powershell => ("pwsh", "-c"), + + Shell::Unix(cmd) => { + if cmd.is_empty() { + return Err(RuntimeError::CommandShellEmptyShell); + } + + (cmd.as_str(), "-c") + } + }; + + if command.is_empty() { + return Err(RuntimeError::CommandShellEmptyCommand); + } + + let mut c = TokioCommand::new(shcmd); + c.args(args); + c.arg(shcliopt).arg(command); + Ok(c) + } + } + } +} + +impl fmt::Display for Command { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Command::Exec { prog, args } => { + write!(f, "{}", prog)?; + for arg in args { + write!(f, " {}", arg)?; + } + + Ok(()) + } + Command::Shell { command, .. } => { + write!(f, "{}", command) + } + } + } +} diff --git a/crates/lib/src/command/shell.rs b/crates/lib/src/command/shell.rs deleted file mode 100644 index 92a73d2..0000000 --- a/crates/lib/src/command/shell.rs +++ /dev/null @@ -1,201 +0,0 @@ -use tokio::process::Command; -use tracing::trace; - -/// Shell to use to run commands. -/// -/// `Cmd` and `Powershell` are special-cased because they have different calling conventions. Also -/// `Cmd` is only available in Windows, while `Powershell` is also available on unices (provided the -/// end-user has it installed, of course). -/// -/// See [`Config.cmd`] for the semantics of `None` vs the other options. -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum Shell { - /// Use no shell, and execute the command directly. - /// - /// This is the default, however as consumer of this library you are encouraged to set your own - /// default as makes sense in your application / for your platform. - None, - - /// Use the given string as a unix shell invocation. - /// - /// This means two things: - /// - the program is invoked with `-c` followed by the command, and - /// - the string will be split on space, and the resulting vec used as execvp(3) arguments: - /// first is the shell program, rest are additional arguments (which come before the `-c` - /// mentioned above). This is a very simplistic approach deliberately: it will not support - /// quoted arguments, for example. Use [`Shell::None`] with a custom command vec for that. - Unix(String), - - /// Use the Windows CMD.EXE shell. - /// - /// This is invoked with `/C` followed by the command. - #[cfg(windows)] - Cmd, - - /// Use Powershell, on Windows or elsewhere. - /// - /// This is invoked with `-Command` followed by the command. - /// - /// This is preferred over `Unix("pwsh")`, though that will also work on unices due to - /// Powershell supporting the `-c` short option. - Powershell, -} - -impl Default for Shell { - fn default() -> Self { - Self::None - } -} - -impl Shell { - /// Obtain a [`Command`] given a list of command parts. - /// - /// Behaves as described in the enum documentation. - /// - /// # Panics - /// - /// - Panics if `cmd` is empty. - /// - Panics if the string in the `Unix` variant is empty or only whitespace. - pub fn to_command(&self, cmd: &[String]) -> Command { - assert!(!cmd.is_empty(), "cmd was empty"); - trace!(shell=?self, ?cmd, "constructing command"); - - match self { - Shell::None => { - // UNWRAP: checked by assert - #[allow(clippy::unwrap_used)] - let (first, rest) = cmd.split_first().unwrap(); - let mut c = Command::new(first); - c.args(rest); - c - } - - #[cfg(windows)] - Shell::Cmd => { - let mut c = Command::new("cmd.exe"); - c.arg("/C").arg(cmd.join(" ")); - c - } - - Shell::Powershell if cfg!(windows) => { - let mut c = Command::new("powershell.exe"); - c.arg("-Command").arg(cmd.join(" ")); - c - } - - Shell::Powershell => { - let mut c = Command::new("pwsh"); - c.arg("-Command").arg(cmd.join(" ")); - c - } - - Shell::Unix(name) => { - assert!(!name.is_empty(), "shell program was empty"); - let sh = name.split_ascii_whitespace().collect::>(); - - // UNWRAP: checked by assert - #[allow(clippy::unwrap_used)] - let (shprog, shopts) = sh.split_first().unwrap(); - - let mut c = Command::new(shprog); - c.args(shopts); - c.arg("-c").arg(cmd.join(" ")); - c - } - } - } -} - -#[cfg(test)] -mod test { - use super::Shell; - use command_group::AsyncCommandGroup; - - #[tokio::test] - #[cfg(unix)] - async fn unix_shell_default() -> Result<(), std::io::Error> { - assert!(Shell::default() - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(unix)] - async fn unix_shell_none() -> Result<(), std::io::Error> { - assert!(Shell::None - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(unix)] - async fn unix_shell_alternate() -> Result<(), std::io::Error> { - assert!(Shell::Unix("bash".into()) - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(unix)] - async fn unix_shell_alternate_shopts() -> Result<(), std::io::Error> { - assert!(Shell::Unix("bash -o errexit".into()) - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(windows)] - async fn windows_shell_default() -> Result<(), std::io::Error> { - assert!(Shell::default() - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(windows)] - async fn windows_shell_cmd() -> Result<(), std::io::Error> { - assert!(Shell::Cmd - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(windows)] - async fn windows_shell_powershell() -> Result<(), std::io::Error> { - assert!(Shell::Powershell - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } - - #[tokio::test] - #[cfg(windows)] - async fn windows_shell_unix_style_powershell() -> Result<(), std::io::Error> { - assert!(Shell::Unix("powershell.exe".into()) - .to_command(&["echo".into(), "hi".into()]) - .group_status() - .await? - .success()); - Ok(()) - } -} diff --git a/crates/lib/src/command/supervisor.rs b/crates/lib/src/command/supervisor.rs index 0c551f6..650a5ac 100644 --- a/crates/lib/src/command/supervisor.rs +++ b/crates/lib/src/command/supervisor.rs @@ -1,18 +1,22 @@ +use std::sync::Arc; + use async_priority_channel as priority; use command_group::AsyncCommandGroup; use tokio::{ - process::Command, select, spawn, sync::{ mpsc::{self, Sender}, watch, }, }; -use tracing::{debug, error, trace}; +use tracing::{debug, debug_span, error, trace, Span}; use crate::{ + action::{PostSpawn, PreSpawn}, + command::Command, error::RuntimeError, event::{Event, Priority, Source, Tag}, + handler::{rte, HandlerLock}, signal::process::SubSignal, }; @@ -24,136 +28,171 @@ enum Intervention { Signal(SubSignal), } -/// A task which supervises a process. +/// A task which supervises a sequence of processes. /// -/// This spawns a process from a [`Command`] and waits for it to complete while handling -/// interventions to it: orders to terminate it, or to send a signal to it. It also immediately -/// issues a [`Tag::ProcessCompletion`] event when the process completes. +/// This spawns processes from a vec of [`Command`]s in order and waits for each to complete while +/// handling interventions to itself: orders to terminate, or to send a signal to the current +/// process. It also immediately issues a [`Tag::ProcessCompletion`] event when the set completes. #[derive(Debug)] pub struct Supervisor { - id: u32, intervene: Sender, ongoing: watch::Receiver, } impl Supervisor { - /// Spawns the command, the supervision task, and returns a new control object. + /// Spawns the command set, the supervision task, and returns a new control object. pub fn spawn( errors: Sender, events: priority::Sender, - command: &mut Command, + mut commands: Vec, grouped: bool, + actioned_events: Arc<[Event]>, + pre_spawn_handler: HandlerLock, + post_spawn_handler: HandlerLock, ) -> Result { - debug!(%grouped, ?command, "spawning command"); - let (process, id) = if grouped { - let proc = command.group_spawn().map_err(|err| RuntimeError::IoError { - about: "spawning process group", - err, - })?; - let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?; - debug!(pgid=%id, "process group spawned"); - (Process::Grouped(proc), id) - } else { - let proc = command.spawn().map_err(|err| RuntimeError::IoError { - about: "spawning process (ungrouped)", - err, - })?; - let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?; - debug!(pid=%id, "process spawned"); - (Process::Ungrouped(proc), id) - }; + // get commands in reverse order so pop() returns the next to run + commands.reverse(); + let next = commands.pop().ok_or(RuntimeError::NoCommands)?; let (notify, waiter) = watch::channel(true); let (int_s, int_r) = mpsc::channel(8); spawn(async move { - let mut process = process; + let span = debug_span!("supervisor"); + + let mut next = next; + let mut commands = commands; let mut int = int_r; - debug!(?process, "starting task to watch on process"); - loop { - select! { - p = process.wait() => { - match p { - Ok(_) => break, // deal with it below - Err(err) => { - error!(%err, "while waiting on process"); - errors.send(err).await.ok(); - trace!("marking process as done"); - notify.send(false).unwrap_or_else(|e| trace!(%e, "error sending process complete")); - trace!("closing supervisor task early"); - return; - } - } - }, - Some(int) = int.recv() => { - match int { - Intervention::Kill => { - if let Err(err) = process.kill().await { - error!(%err, "while killing process"); - errors.send(err).await.ok(); - trace!("continuing to watch command"); + let (mut process, pid) = match spawn_process( + span.clone(), + next, + grouped, + actioned_events.clone(), + pre_spawn_handler.clone(), + post_spawn_handler.clone(), + ) + .await + { + Ok(pp) => pp, + Err(err) => { + let _enter = span.enter(); + error!(%err, "while spawning process"); + errors.send(err).await.ok(); + trace!("marking process as done"); + notify + .send(false) + .unwrap_or_else(|e| trace!(%e, "error sending process complete")); + trace!("closing supervisor task early"); + return; + } + }; + + span.in_scope(|| debug!(?process, ?pid, "spawned process")); + + loop { + select! { + p = process.wait() => { + match p { + Ok(_) => break, // deal with it below + Err(err) => { + let _enter = span.enter(); + error!(%err, "while waiting on process"); + errors.try_send(err).ok(); + trace!("marking process as done"); + notify.send(false).unwrap_or_else(|e| trace!(%e, "error sending process complete")); + trace!("closing supervisor task early"); + return; } } - #[cfg(unix)] - Intervention::Signal(sig) => { - if let Some(sig) = sig.to_nix() { - if let Err(err) = process.signal(sig) { - error!(%err, "while sending signal to process"); - errors.send(err).await.ok(); + }, + Some(int) = int.recv() => { + match int { + Intervention::Kill => { + if let Err(err) = process.kill().await { + let _enter = span.enter(); + error!(%err, "while killing process"); + errors.try_send(err).ok(); trace!("continuing to watch command"); } - } else { + } + #[cfg(unix)] + Intervention::Signal(sig) => { + let _enter = span.enter(); + if let Some(sig) = sig.to_nix() { + if let Err(err) = process.signal(sig) { + error!(%err, "while sending signal to process"); + errors.try_send(err).ok(); + trace!("continuing to watch command"); + } + } else { + let err = RuntimeError::UnsupportedSignal(sig); + error!(%err, "while sending signal to process"); + errors.try_send(err).ok(); + trace!("continuing to watch command"); + } + } + #[cfg(windows)] + Intervention::Signal(sig) => { + let _enter = span.enter(); + // https://github.com/watchexec/watchexec/issues/219 let err = RuntimeError::UnsupportedSignal(sig); error!(%err, "while sending signal to process"); - errors.send(err).await.ok(); + errors.try_send(err).ok(); trace!("continuing to watch command"); } } - #[cfg(windows)] - Intervention::Signal(sig) => { - // https://github.com/watchexec/watchexec/issues/219 - let err = RuntimeError::UnsupportedSignal(sig); - error!(%err, "while sending signal to process"); - errors.send(err).await.ok(); - trace!("continuing to watch command"); - } + } + else => break, + } + } + + span.in_scope(|| trace!("got out of loop, waiting once more")); + match process.wait().await { + Err(err) => { + let _enter = span.enter(); + error!(%err, "while waiting on process"); + errors.try_send(err).ok(); + } + Ok(status) => { + let event = span.in_scope(|| { + let event = Event { + tags: vec![ + Tag::Source(Source::Internal), + Tag::ProcessCompletion(status.map(|s| s.into())), + ], + metadata: Default::default(), + }; + + debug!(?event, "creating synthetic process completion event"); + event + }); + + if let Err(err) = events.send(event, Priority::Low).await { + let _enter = span.enter(); + error!(%err, "while sending process completion event"); + errors + .try_send(RuntimeError::EventChannelSend { + ctx: "command supervisor", + err, + }) + .ok(); } } - else => break, - } - } - - trace!("got out of loop, waiting once more"); - match process.wait().await { - Err(err) => { - error!(%err, "while waiting on process"); - errors.send(err).await.ok(); - } - Ok(status) => { - let event = Event { - tags: vec![ - Tag::Source(Source::Internal), - Tag::ProcessCompletion(status.map(|s| s.into())), - ], - metadata: Default::default(), - }; - - debug!(?event, "creating synthetic process completion event"); - if let Err(err) = events.send(event, Priority::Low).await { - error!(%err, "while sending process completion event"); - errors - .send(RuntimeError::EventChannelSend { - ctx: "command supervisor", - err, - }) - .await - .ok(); - } + } + + let _enter = span.enter(); + if let Some(cmd) = commands.pop() { + debug!(?cmd, "queuing up next command"); + next = cmd; + } else { + debug!("no more commands to supervise"); + break; } } + let _enter = span.enter(); trace!("marking process as done"); notify .send(false) @@ -162,21 +201,11 @@ impl Supervisor { }); Ok(Self { - id, ongoing: waiter, intervene: int_s, }) } - /// Get the PID of the process or process group. - /// - /// This always successfully returns a PID, even if the process has already exited, as the PID - /// is held as soon as the process spawns. Take care not to use this for process manipulation - /// once the process has exited, as the ID may have been reused already. - pub fn id(&self) -> u32 { - self.id - } - /// Issues a signal to the process. /// /// On Windows, this currently only supports [`SubSignal::ForceStop`]. @@ -239,3 +268,76 @@ impl Supervisor { Ok(()) } } + +async fn spawn_process( + span: Span, + command: Command, + grouped: bool, + actioned_events: Arc<[Event]>, + pre_spawn_handler: HandlerLock, + post_spawn_handler: HandlerLock, +) -> Result<(Process, u32), RuntimeError> { + let (pre_spawn, spawnable) = span.in_scope::<_, Result<_, RuntimeError>>(|| { + debug!(%grouped, ?command, "preparing command"); + let mut spawnable = command.to_spawnable()?; + spawnable.kill_on_drop(true); + + debug!("running pre-spawn handler"); + Ok(PreSpawn::new( + command.clone(), + spawnable, + actioned_events.clone(), + )) + })?; + + pre_spawn_handler + .call(pre_spawn) + .await + .map_err(|e| rte("action pre-spawn", e))?; + + let (proc, id, post_spawn) = span.in_scope::<_, Result<_, RuntimeError>>(|| { + let mut spawnable = Arc::try_unwrap(spawnable) + .map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))? + .into_inner(); + + debug!(command=?spawnable, "spawning command"); + let (proc, id) = if grouped { + let proc = spawnable + .group_spawn() + .map_err(|err| RuntimeError::IoError { + about: "spawning process group", + err, + })?; + let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?; + debug!(pgid=%id, "process group spawned"); + (Process::Grouped(proc), id) + } else { + let proc = spawnable.spawn().map_err(|err| RuntimeError::IoError { + about: "spawning process (ungrouped)", + err, + })?; + let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?; + debug!(pid=%id, "process spawned"); + (Process::Ungrouped(proc), id) + }; + + debug!("running post-spawn handler"); + Ok(( + proc, + id, + PostSpawn { + command: command.clone(), + events: actioned_events.clone(), + id, + grouped, + }, + )) + })?; + + post_spawn_handler + .call(post_spawn) + .await + .map_err(|e| rte("action post-spawn", e))?; + + Ok((proc, id)) +} diff --git a/crates/lib/src/command/tests.rs b/crates/lib/src/command/tests.rs new file mode 100644 index 0000000..061b6d2 --- /dev/null +++ b/crates/lib/src/command/tests.rs @@ -0,0 +1,129 @@ + +use super::{Command, Shell}; +use command_group::AsyncCommandGroup; + +#[tokio::test] +#[cfg(unix)] +async fn unix_shell_none() -> Result<(), std::io::Error> { + assert!(Command::Exec { + prog: "echo".into(), + args: vec!["hi".into()] + } + .to_spawnable() + .unwrap() + .group_status() + .await? + .success()); + Ok(()) +} + +#[tokio::test] +#[cfg(unix)] +async fn unix_shell_sh() -> Result<(), std::io::Error> { + assert!(Command::Shell { + shell: Shell::Unix("sh".into()), + args: Vec::new(), + command: "echo hi".into() + } + .to_spawnable() + .unwrap() + .group_status() + .await? + .success()); + Ok(()) +} + +#[tokio::test] +#[cfg(unix)] +async fn unix_shell_alternate() -> Result<(), std::io::Error> { + assert!(Command::Shell { + shell: Shell::Unix("bash".into()), + args: Vec::new(), + command: "echo hi".into() + } + .to_spawnable() + .unwrap() + .group_status() + .await? + .success()); + Ok(()) +} + +#[tokio::test] +#[cfg(unix)] +async fn unix_shell_alternate_shopts() -> Result<(), std::io::Error> { + assert!(Command::Shell { + shell: Shell::Unix("bash".into()), + args: vec!["-o".into(), "errexit".into()], + command: "echo hi".into() + } + .to_spawnable() + .unwrap() + .group_status() + .await? + .success()); + Ok(()) +} + +#[tokio::test] +#[cfg(windows)] +async fn windows_shell_none() -> Result<(), std::io::Error> { + assert!(Command::Exec { + prog: "echo".into(), + args: vec!["hi".into()] + } + .to_spawnable() + .unwrap() + .group_status() + .await? + .success()); + Ok(()) +} + +#[tokio::test] +#[cfg(windows)] +async fn windows_shell_cmd() -> Result<(), std::io::Error> { + assert!(Command::Shell { + shell: Shell::Cmd, + args: Vec::new(), + command: "echo hi".into() + } + .to_spawnable() + .unwrap() + .group_status() + .await? + .success()); + Ok(()) +} + +#[tokio::test] +#[cfg(windows)] +async fn windows_shell_powershell() -> Result<(), std::io::Error> { + assert!(Command::Shell { + shell: Shell::Powershell, + args: Vec::new(), + command: "echo hi".into() + } + .to_spawnable() + .unwrap() + .group_status() + .await? + .success()); + Ok(()) +} + +#[tokio::test] +#[cfg(windows)] +async fn windows_shell_unix_style_powershell() -> Result<(), std::io::Error> { + assert!(Command::Shell { + shell: Shell::Unix("powershell.exe".into()), + args: Vec::new(), + command: "echo hi".into() + } + .to_spawnable() + .unwrap() + .group_status() + .await? + .success()); + Ok(()) +} diff --git a/crates/lib/src/config.rs b/crates/lib/src/config.rs index f894c33..8dd88e5 100644 --- a/crates/lib/src/config.rs +++ b/crates/lib/src/config.rs @@ -4,7 +4,7 @@ use std::{fmt, path::Path, sync::Arc, time::Duration}; use crate::{ action::{Action, PostSpawn, PreSpawn}, - command::Shell, + command::Command, filter::Filterer, fs::Watcher, handler::{Handler, HandlerLock}, @@ -61,25 +61,23 @@ impl RuntimeConfig { self } - /// Set the shell to use to invoke commands. - pub fn command_shell(&mut self, shell: Shell) -> &mut Self { - self.action.shell = shell; - self - } - /// Toggle whether to use process groups or not. pub fn command_grouped(&mut self, grouped: bool) -> &mut Self { self.action.grouped = grouped; self } - /// Set the command to run on action. - pub fn command(&mut self, command: I) -> &mut Self - where - I: IntoIterator, - S: AsRef, - { - self.action.command = command.into_iter().map(|c| c.as_ref().to_owned()).collect(); + /// Set a single command to run on action. + /// + /// This is a convenience for `.commands(vec![Command...])`. + pub fn command(&mut self, command: Command) -> &mut Self { + self.action.commands = vec![command]; + self + } + + /// Set the commands to run on action. + pub fn commands(&mut self, commands: impl Into>) -> &mut Self { + self.action.commands = commands.into(); self } diff --git a/crates/lib/src/error/runtime.rs b/crates/lib/src/error/runtime.rs index efe2d18..d2b3eb7 100644 --- a/crates/lib/src/error/runtime.rs +++ b/crates/lib/src/error/runtime.rs @@ -121,6 +121,27 @@ pub enum RuntimeError { #[diagnostic(code(watchexec::runtime::unsupported_signal))] UnsupportedSignal(SubSignal), + /// Error received when there are no commands to run. + /// + /// This is generally a programmer error and should be caught earlier. + #[error("no commands to run")] + #[diagnostic(code(watchexec::runtime::no_commands))] + NoCommands, + + /// Error received when trying to render a [`Command::Shell`](crate::command::Command) that has no `command` + /// + /// This is generally a programmer error and should be caught earlier. + #[error("empty shelled command")] + #[diagnostic(code(watchexec::runtime::command_shell::empty_command))] + CommandShellEmptyCommand, + + /// Error received when trying to render a [`Shell::Unix`](crate::command::Shell) with an empty shell + /// + /// This is generally a programmer error and should be caught earlier. + #[error("empty shell program")] + #[diagnostic(code(watchexec::runtime::command_shell::empty_shell))] + CommandShellEmptyShell, + /// Error received when clearing the screen. #[error("clear screen: {0}")] #[diagnostic(code(watchexec::runtime::clearscreen))] @@ -128,6 +149,7 @@ pub enum RuntimeError { /// Error received from the [`ignore-files`](ignore_files) crate. #[error("ignore files: {0}")] + #[diagnostic(code(watchexec::runtime::ignore_files))] IgnoreFiles( #[diagnostic_source] #[from] diff --git a/crates/lib/src/fs.rs b/crates/lib/src/fs.rs index f19f75a..8d4d087 100644 --- a/crates/lib/src/fs.rs +++ b/crates/lib/src/fs.rs @@ -9,7 +9,7 @@ use std::{ }; use async_priority_channel as priority; -use notify::{Watcher as _, poll::PollWatcherConfig}; +use notify::{poll::PollWatcherConfig, Watcher as _}; use tokio::sync::{mpsc, watch}; use tracing::{debug, error, trace, warn};