From 3066ee5913de3532f01f3fe467e83fe3f0e01f52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Mon, 23 Aug 2021 00:28:20 +1200 Subject: [PATCH] Implement most of process handling --- Cargo.lock | 13 ++++++ lib/Cargo.toml | 2 + lib/src/action.rs | 90 ++++++++++++++++++++++++++++++++++++++---- lib/src/command.rs | 98 +++++++++++++++++++++++++++++++++++++++++++++- lib/src/error.rs | 10 +++++ 5 files changed, 205 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf670f4..3bc4ae2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "async-recursion" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2" +dependencies = [ + "proc-macro2", + "quote 1.0.9", + "syn 1.0.73", +] + [[package]] name = "async-trait" version = "0.1.51" @@ -2073,7 +2084,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" name = "watchexec" version = "1.17.1" dependencies = [ + "async-recursion", "atomic-take", + "clearscreen", "color-eyre", "command-group", "derive_builder", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 53cf035..f38a3a4 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -24,6 +24,8 @@ futures = "0.3.16" derive_builder = "0.10.2" atomic-take = "1.0.0" once_cell = "1.8.0" +clearscreen = "1.0.6" +async-recursion = "0.3.2" [dependencies.command-group] version = "1.0.5" diff --git a/lib/src/action.rs b/lib/src/action.rs index 2e255c8..45de966 100644 --- a/lib/src/action.rs +++ b/lib/src/action.rs @@ -7,16 +7,16 @@ use std::{ }; use atomic_take::AtomicTake; -use command_group::Signal; +use command_group::{AsyncCommandGroup, Signal}; use once_cell::sync::OnceCell; use tokio::{ sync::{mpsc, watch}, time::timeout, }; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; use crate::{ - command::Shell, + command::{Process, Shell}, error::{CriticalError, RuntimeError}, event::Event, handler::{rte, Handler}, @@ -41,6 +41,9 @@ impl fmt::Debug for WorkingData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WorkingData") .field("throttle", &self.throttle) + .field("shell", &self.shell) + .field("command", &self.command) + .field("grouped", &self.grouped) .finish_non_exhaustive() } } @@ -53,6 +56,7 @@ impl Default for WorkingData { action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), shell: Shell::default(), command: Vec::new(), + grouped: true, } } } @@ -92,9 +96,9 @@ pub enum Outcome { /// If the command isn't running, start it. Start, - /// Wait for command completion, then start a new one. - Queue, - + // TODO + // /// Wait for command completion, then start a new one. + // Queue, /// Send this signal to the command. Signal(Signal), @@ -142,6 +146,7 @@ pub async fn worker( let mut set = Vec::new(); let mut handler = { working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?; + let mut process: Option = None; loop { let maxtime = working.borrow().throttle.saturating_sub(last.elapsed()); @@ -188,12 +193,83 @@ pub async fn worker( let outcome = outcome.get().cloned().unwrap_or_default(); debug!(?outcome, "handler finished"); - let is_running = todo!(); + let is_running = match process.as_mut().map(|p| p.is_running()).transpose() { + Err(err) => { + errors.send(err).await?; + false + } + Ok(Some(ir)) => ir, + Ok(None) => false, + }; let outcome = outcome.resolve(is_running); debug!(?outcome, "outcome resolved"); + + let w = working.borrow().clone(); + let rerr = apply_outcome(outcome, w, &mut process).await; + if let Err(err) = rerr { + errors.send(err).await?; + } } debug!("action worker finished"); Ok(()) } + +#[async_recursion::async_recursion] +async fn apply_outcome( + outcome: Outcome, + working: WorkingData, + process: &mut Option, +) -> Result<(), RuntimeError> { + match (process.as_mut(), outcome) { + (_, Outcome::DoNothing) => {} + (Some(p), Outcome::Stop) => { + p.kill().await?; + p.wait().await?; + } + (p @ None, o @ Outcome::Stop) + | (p @ Some(_), o @ Outcome::Start) + | (p @ None, o @ Outcome::Signal(_)) => { + warn!(is_running=?p.is_some(), outcome=?o, "outcome does not apply to process state"); + } + (None, Outcome::Start) => { + let mut command = working.shell.to_command(&working.command); + + // TODO: pre-spawn hook + + let proc = if working.grouped { + Process::Grouped(command.group_spawn()?) + } else { + Process::Ungrouped(command.spawn()?) + }; + + // TODO: post-spawn hook + + *process = Some(proc); + } + + (Some(p), Outcome::Signal(sig)) => { + // TODO: windows + p.signal(sig)?; + } + + (_, Outcome::Clear) => { + clearscreen::clear()?; + } + + (Some(_), Outcome::IfRunning(then, _)) => { + apply_outcome(*then, working, process).await?; + } + (None, Outcome::IfRunning(_, otherwise)) => { + apply_outcome(*otherwise, working, process).await?; + } + + (_, Outcome::Both(one, two)) => { + apply_outcome(*one, working.clone(), process).await?; + apply_outcome(*two, working, process).await?; + } + } + + Ok(()) +} diff --git a/lib/src/command.rs b/lib/src/command.rs index 8cc7bf5..b64805e 100644 --- a/lib/src/command.rs +++ b/lib/src/command.rs @@ -1,6 +1,12 @@ //! Command construction and configuration thereof. -use tokio::process::Command; +use std::process::ExitStatus; + +use command_group::{AsyncGroupChild, Signal}; +use tokio::process::{Child, Command}; +use tracing::debug; + +use crate::error::RuntimeError; /// Shell to use to run commands. /// @@ -112,6 +118,96 @@ impl Shell { } } +#[derive(Debug)] +pub enum Process { + None, + Grouped(AsyncGroupChild), + Ungrouped(Child), + Done(ExitStatus), +} + +impl Default for Process { + fn default() -> Self { + Process::None + } +} + +impl Process { + #[cfg(unix)] + pub fn signal(&mut self, sig: Signal) -> Result<(), RuntimeError> { + use command_group::UnixChildExt; + + match self { + Self::None | Self::Done(_) => Ok(()), + Self::Grouped(c) => { + debug!(signal=%sig, pgid=?c.id(), "sending signal to process group"); + c.signal(sig) + } + Self::Ungrouped(c) => { + debug!(signal=%sig, pid=?c.id(), "sending signal to process"); + c.signal(sig) + } + } + .map_err(RuntimeError::Process) + } + + pub async fn kill(&mut self) -> Result<(), RuntimeError> { + match self { + Self::None | Self::Done(_) => Ok(()), + Self::Grouped(c) => { + debug!(pgid=?c.id(), "killing process group"); + c.kill() + } + Self::Ungrouped(c) => { + debug!(pid=?c.id(), "killing process"); + c.kill().await + } + } + .map_err(RuntimeError::Process) + } + + pub fn is_running(&mut self) -> Result { + match self { + Self::None | Self::Done(_) => Ok(false), + Self::Grouped(c) => c.try_wait().map(|status| { + if let Some(s) = status { + *self = Self::Done(s); + true + } else { + false + } + }), + Self::Ungrouped(c) => c.try_wait().map(|status| { + if let Some(s) = status { + *self = Self::Done(s); + true + } else { + false + } + }), + } + .map_err(RuntimeError::Process) + } + + pub async fn wait(&mut self) -> Result, RuntimeError> { + match self { + Self::None => Ok(None), + Self::Done(status) => Ok(Some(*status)), + Self::Grouped(c) => { + let status = c.wait().await?; + *self = Self::Done(status); + Ok(Some(status)) + } + Self::Ungrouped(c) => { + let status = c.wait().await?; + *self = Self::Done(status); + Ok(Some(status)) + } + } + .map_err(RuntimeError::Process) + } +} + #[cfg(test)] mod test { use super::Shell; diff --git a/lib/src/error.rs b/lib/src/error.rs index 4150c70..f6e8b42 100644 --- a/lib/src/error.rs +++ b/lib/src/error.rs @@ -117,6 +117,16 @@ pub enum RuntimeError { #[error("handler error while {ctx}: {err}")] #[diagnostic(code(watchexec::runtime::handler))] Handler { ctx: &'static str, err: String }, + + /// Error received when operating on a process. + #[error("when operating on process: {0}")] + #[diagnostic(code(watchexec::runtime::process))] + Process(#[source] std::io::Error), + + /// Error received when clearing the screen. + #[error("clear screen: {0}")] + #[diagnostic(code(watchexec::runtime::clearscreen))] + Clearscreen(#[from] clearscreen::Error), } /// Errors occurring from reconfigs.