Implement most of process handling

This commit is contained in:
Félix Saparelli 2021-08-23 00:28:20 +12:00
parent f150c26b22
commit 3066ee5913
No known key found for this signature in database
GPG Key ID: B948C4BAE44FC474
5 changed files with 205 additions and 8 deletions

13
Cargo.lock generated
View File

@ -80,6 +80,17 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "async-recursion"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7d78656ba01f1b93024b7c3a0467f1608e4be67d725749fdcd7d2c7678fd7a2"
dependencies = [
"proc-macro2",
"quote 1.0.9",
"syn 1.0.73",
]
[[package]]
name = "async-trait"
version = "0.1.51"
@ -2073,7 +2084,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
name = "watchexec"
version = "1.17.1"
dependencies = [
"async-recursion",
"atomic-take",
"clearscreen",
"color-eyre",
"command-group",
"derive_builder",

View File

@ -24,6 +24,8 @@ futures = "0.3.16"
derive_builder = "0.10.2"
atomic-take = "1.0.0"
once_cell = "1.8.0"
clearscreen = "1.0.6"
async-recursion = "0.3.2"
[dependencies.command-group]
version = "1.0.5"

View File

@ -7,16 +7,16 @@ use std::{
};
use atomic_take::AtomicTake;
use command_group::Signal;
use command_group::{AsyncCommandGroup, Signal};
use once_cell::sync::OnceCell;
use tokio::{
sync::{mpsc, watch},
time::timeout,
};
use tracing::{debug, trace};
use tracing::{debug, trace, warn};
use crate::{
command::Shell,
command::{Process, Shell},
error::{CriticalError, RuntimeError},
event::Event,
handler::{rte, Handler},
@ -41,6 +41,9 @@ impl fmt::Debug for WorkingData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WorkingData")
.field("throttle", &self.throttle)
.field("shell", &self.shell)
.field("command", &self.command)
.field("grouped", &self.grouped)
.finish_non_exhaustive()
}
}
@ -53,6 +56,7 @@ impl Default for WorkingData {
action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)),
shell: Shell::default(),
command: Vec::new(),
grouped: true,
}
}
}
@ -92,9 +96,9 @@ pub enum Outcome {
/// If the command isn't running, start it.
Start,
/// Wait for command completion, then start a new one.
Queue,
// TODO
// /// Wait for command completion, then start a new one.
// Queue,
/// Send this signal to the command.
Signal(Signal),
@ -142,6 +146,7 @@ pub async fn worker(
let mut set = Vec::new();
let mut handler =
{ working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?;
let mut process: Option<Process> = None;
loop {
let maxtime = working.borrow().throttle.saturating_sub(last.elapsed());
@ -188,12 +193,83 @@ pub async fn worker(
let outcome = outcome.get().cloned().unwrap_or_default();
debug!(?outcome, "handler finished");
let is_running = todo!();
let is_running = match process.as_mut().map(|p| p.is_running()).transpose() {
Err(err) => {
errors.send(err).await?;
false
}
Ok(Some(ir)) => ir,
Ok(None) => false,
};
let outcome = outcome.resolve(is_running);
debug!(?outcome, "outcome resolved");
let w = working.borrow().clone();
let rerr = apply_outcome(outcome, w, &mut process).await;
if let Err(err) = rerr {
errors.send(err).await?;
}
}
debug!("action worker finished");
Ok(())
}
#[async_recursion::async_recursion]
async fn apply_outcome(
outcome: Outcome,
working: WorkingData,
process: &mut Option<Process>,
) -> Result<(), RuntimeError> {
match (process.as_mut(), outcome) {
(_, Outcome::DoNothing) => {}
(Some(p), Outcome::Stop) => {
p.kill().await?;
p.wait().await?;
}
(p @ None, o @ Outcome::Stop)
| (p @ Some(_), o @ Outcome::Start)
| (p @ None, o @ Outcome::Signal(_)) => {
warn!(is_running=?p.is_some(), outcome=?o, "outcome does not apply to process state");
}
(None, Outcome::Start) => {
let mut command = working.shell.to_command(&working.command);
// TODO: pre-spawn hook
let proc = if working.grouped {
Process::Grouped(command.group_spawn()?)
} else {
Process::Ungrouped(command.spawn()?)
};
// TODO: post-spawn hook
*process = Some(proc);
}
(Some(p), Outcome::Signal(sig)) => {
// TODO: windows
p.signal(sig)?;
}
(_, Outcome::Clear) => {
clearscreen::clear()?;
}
(Some(_), Outcome::IfRunning(then, _)) => {
apply_outcome(*then, working, process).await?;
}
(None, Outcome::IfRunning(_, otherwise)) => {
apply_outcome(*otherwise, working, process).await?;
}
(_, Outcome::Both(one, two)) => {
apply_outcome(*one, working.clone(), process).await?;
apply_outcome(*two, working, process).await?;
}
}
Ok(())
}

View File

@ -1,6 +1,12 @@
//! Command construction and configuration thereof.
use tokio::process::Command;
use std::process::ExitStatus;
use command_group::{AsyncGroupChild, Signal};
use tokio::process::{Child, Command};
use tracing::debug;
use crate::error::RuntimeError;
/// Shell to use to run commands.
///
@ -112,6 +118,96 @@ impl Shell {
}
}
#[derive(Debug)]
pub enum Process {
None,
Grouped(AsyncGroupChild),
Ungrouped(Child),
Done(ExitStatus),
}
impl Default for Process {
fn default() -> Self {
Process::None
}
}
impl Process {
#[cfg(unix)]
pub fn signal(&mut self, sig: Signal) -> Result<(), RuntimeError> {
use command_group::UnixChildExt;
match self {
Self::None | Self::Done(_) => Ok(()),
Self::Grouped(c) => {
debug!(signal=%sig, pgid=?c.id(), "sending signal to process group");
c.signal(sig)
}
Self::Ungrouped(c) => {
debug!(signal=%sig, pid=?c.id(), "sending signal to process");
c.signal(sig)
}
}
.map_err(RuntimeError::Process)
}
pub async fn kill(&mut self) -> Result<(), RuntimeError> {
match self {
Self::None | Self::Done(_) => Ok(()),
Self::Grouped(c) => {
debug!(pgid=?c.id(), "killing process group");
c.kill()
}
Self::Ungrouped(c) => {
debug!(pid=?c.id(), "killing process");
c.kill().await
}
}
.map_err(RuntimeError::Process)
}
pub fn is_running(&mut self) -> Result<bool, RuntimeError> {
match self {
Self::None | Self::Done(_) => Ok(false),
Self::Grouped(c) => c.try_wait().map(|status| {
if let Some(s) = status {
*self = Self::Done(s);
true
} else {
false
}
}),
Self::Ungrouped(c) => c.try_wait().map(|status| {
if let Some(s) = status {
*self = Self::Done(s);
true
} else {
false
}
}),
}
.map_err(RuntimeError::Process)
}
pub async fn wait(&mut self) -> Result<Option<ExitStatus>, RuntimeError> {
match self {
Self::None => Ok(None),
Self::Done(status) => Ok(Some(*status)),
Self::Grouped(c) => {
let status = c.wait().await?;
*self = Self::Done(status);
Ok(Some(status))
}
Self::Ungrouped(c) => {
let status = c.wait().await?;
*self = Self::Done(status);
Ok(Some(status))
}
}
.map_err(RuntimeError::Process)
}
}
#[cfg(test)]
mod test {
use super::Shell;

View File

@ -117,6 +117,16 @@ pub enum RuntimeError {
#[error("handler error while {ctx}: {err}")]
#[diagnostic(code(watchexec::runtime::handler))]
Handler { ctx: &'static str, err: String },
/// Error received when operating on a process.
#[error("when operating on process: {0}")]
#[diagnostic(code(watchexec::runtime::process))]
Process(#[source] std::io::Error),
/// Error received when clearing the screen.
#[error("clear screen: {0}")]
#[diagnostic(code(watchexec::runtime::clearscreen))]
Clearscreen(#[from] clearscreen::Error),
}
/// Errors occurring from reconfigs.