use std::sync::Arc; use async_priority_channel as priority; use command_group::AsyncCommandGroup; use tokio::{ select, spawn, sync::{ mpsc::{self, Sender}, watch, }, }; use tracing::{debug, debug_span, error, trace, Span}; use crate::{ action::{PostSpawn, PreSpawn}, command::Command, error::RuntimeError, event::{Event, Priority, Source, Tag}, handler::{rte, HandlerLock}, signal::process::SubSignal, }; use super::Process; #[derive(Clone, Copy, Debug)] enum Intervention { Kill, Signal(SubSignal), } /// A task which supervises a sequence of processes. /// /// This spawns processes from a vec of [`Command`]s in order and waits for each to complete while /// handling interventions to itself: orders to terminate, or to send a signal to the current /// process. It also immediately issues a [`Tag::ProcessCompletion`] event when the set completes. #[derive(Debug)] pub struct Supervisor { intervene: Sender, ongoing: watch::Receiver, } impl Supervisor { /// Spawns the command set, the supervision task, and returns a new control object. pub fn spawn( errors: Sender, events: priority::Sender, mut commands: Vec, grouped: bool, actioned_events: Arc<[Event]>, pre_spawn_handler: HandlerLock, post_spawn_handler: HandlerLock, ) -> Result { // get commands in reverse order so pop() returns the next to run commands.reverse(); let next = commands.pop().ok_or(RuntimeError::NoCommands)?; let (notify, waiter) = watch::channel(true); let (int_s, int_r) = mpsc::channel(8); spawn(async move { let span = debug_span!("supervisor"); let mut next = next; let mut commands = commands; let mut int = int_r; loop { let (mut process, pid) = match spawn_process( span.clone(), next, grouped, actioned_events.clone(), pre_spawn_handler.clone(), post_spawn_handler.clone(), ) .await { Ok(pp) => pp, Err(err) => { let _enter = span.enter(); error!(%err, "while spawning process"); errors.send(err).await.ok(); trace!("marking process as done"); notify .send(false) .unwrap_or_else(|e| trace!(%e, "error sending process complete")); trace!("closing supervisor task early"); return; } }; span.in_scope(|| debug!(?process, ?pid, "spawned process")); loop { select! { p = process.wait() => { match p { Ok(_) => break, // deal with it below Err(err) => { let _enter = span.enter(); error!(%err, "while waiting on process"); errors.try_send(err).ok(); trace!("marking process as done"); notify.send(false).unwrap_or_else(|e| trace!(%e, "error sending process complete")); trace!("closing supervisor task early"); return; } } }, Some(int) = int.recv() => { match int { Intervention::Kill => { if let Err(err) = process.kill().await { let _enter = span.enter(); error!(%err, "while killing process"); errors.try_send(err).ok(); trace!("continuing to watch command"); } } #[cfg(unix)] Intervention::Signal(sig) => { let _enter = span.enter(); if let Some(sig) = sig.to_nix() { if let Err(err) = process.signal(sig) { error!(%err, "while sending signal to process"); errors.try_send(err).ok(); trace!("continuing to watch command"); } } else { let err = RuntimeError::UnsupportedSignal(sig); error!(%err, "while sending signal to process"); errors.try_send(err).ok(); trace!("continuing to watch command"); } } #[cfg(windows)] Intervention::Signal(sig) => { let _enter = span.enter(); // let err = RuntimeError::UnsupportedSignal(sig); error!(%err, "while sending signal to process"); errors.try_send(err).ok(); trace!("continuing to watch command"); } } } else => break, } } span.in_scope(|| trace!("got out of loop, waiting once more")); match process.wait().await { Err(err) => { let _enter = span.enter(); error!(%err, "while waiting on process"); errors.try_send(err).ok(); } Ok(status) => { let event = span.in_scope(|| { let event = Event { tags: vec![ Tag::Source(Source::Internal), Tag::ProcessCompletion(|s| s.into())), ], metadata: Default::default(), }; debug!(?event, "creating synthetic process completion event"); event }); if let Err(err) = events.send(event, Priority::Low).await { let _enter = span.enter(); error!(%err, "while sending process completion event"); errors .try_send(RuntimeError::EventChannelSend { ctx: "command supervisor", err, }) .ok(); } } } let _enter = span.enter(); if let Some(cmd) = commands.pop() { debug!(?cmd, "queuing up next command"); next = cmd; } else { debug!("no more commands to supervise"); break; } } let _enter = span.enter(); trace!("marking process as done"); notify .send(false) .unwrap_or_else(|e| trace!(%e, "error sending process complete")); trace!("closing supervisor task"); }); Ok(Self { ongoing: waiter, intervene: int_s, }) } /// Issues a signal to the process. /// /// On Windows, this currently only supports [`SubSignal::ForceStop`]. /// /// While this is async, it returns once the signal intervention has been sent internally, not /// when the signal has been delivered. pub async fn signal(&self, signal: SubSignal) { if cfg!(windows) { if let SubSignal::ForceStop = signal { self.intervene.send(Intervention::Kill).await.ok(); } // else: } else { trace!(?signal, "sending signal intervention"); self.intervene.send(Intervention::Signal(signal)).await.ok(); } // only errors on channel closed, and that only happens if the process is dead } /// Stops the process. /// /// While this is async, it returns once the signal intervention has been sent internally, not /// when the signal has been delivered. pub async fn kill(&self) { trace!("sending kill intervention"); self.intervene.send(Intervention::Kill).await.ok(); // only errors on channel closed, and that only happens if the process is dead } /// Returns true if the supervisor is still running. /// /// This is almost always equivalent to whether the _process_ is still running, but may not be /// 100% in sync. pub fn is_running(&self) -> bool { let ongoing = *self.ongoing.borrow(); trace!(?ongoing, "supervisor state"); ongoing } /// Returns only when the supervisor completes. /// /// This is almost always equivalent to waiting for the _process_ to complete, but may not be /// 100% in sync. pub async fn wait(&self) -> Result<(), RuntimeError> { if !*self.ongoing.borrow() { trace!("supervisor already completed (pre)"); return Ok(()); } debug!("waiting on supervisor completion"); let mut ongoing = self.ongoing.clone(); // never completes if ongoing is marked false in between the previous check and now! // TODO: select with something that sleeps a bit and rechecks the ongoing ongoing .changed() .await .map_err(|err| RuntimeError::InternalSupervisor(err.to_string()))?; debug!("supervisor completed"); Ok(()) } } async fn spawn_process( span: Span, command: Command, grouped: bool, actioned_events: Arc<[Event]>, pre_spawn_handler: HandlerLock, post_spawn_handler: HandlerLock, ) -> Result<(Process, u32), RuntimeError> { let (pre_spawn, spawnable) = span.in_scope::<_, Result<_, RuntimeError>>(|| { debug!(%grouped, ?command, "preparing command"); let mut spawnable = command.to_spawnable()?; spawnable.kill_on_drop(true); debug!("running pre-spawn handler"); Ok(PreSpawn::new( command.clone(), spawnable, actioned_events.clone(), )) })?; pre_spawn_handler .call(pre_spawn) .await .map_err(|e| rte("action pre-spawn", e))?; let (proc, id, post_spawn) = span.in_scope::<_, Result<_, RuntimeError>>(|| { let mut spawnable = Arc::try_unwrap(spawnable) .map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))? .into_inner(); debug!(command=?spawnable, "spawning command"); let (proc, id) = if grouped { let proc = spawnable .group_spawn() .map_err(|err| RuntimeError::IoError { about: "spawning process group", err, })?; let id =; debug!(pgid=%id, "process group spawned"); (Process::Grouped(proc), id) } else { let proc = spawnable.spawn().map_err(|err| RuntimeError::IoError { about: "spawning process (ungrouped)", err, })?; let id =; debug!(pid=%id, "process spawned"); (Process::Ungrouped(proc), id) }; debug!("running post-spawn handler"); Ok(( proc, id, PostSpawn { command: command.clone(), events: actioned_events.clone(), id, grouped, }, )) })?; post_spawn_handler .call(post_spawn) .await .map_err(|e| rte("action post-spawn", e))?; Ok((proc, id)) }