2022-06-16 17:36:08 +02:00
|
|
|
use std::sync::Arc;
|
|
|
|
|
2022-06-11 08:43:11 +02:00
|
|
|
use async_priority_channel as priority;
|
2021-10-16 15:24:36 +02:00
|
|
|
use command_group::AsyncCommandGroup;
|
2021-09-02 19:22:15 +02:00
|
|
|
use tokio::{
|
|
|
|
select, spawn,
|
|
|
|
sync::{
|
|
|
|
mpsc::{self, Sender},
|
2022-01-30 12:00:51 +01:00
|
|
|
watch,
|
2021-09-02 19:22:15 +02:00
|
|
|
},
|
|
|
|
};
|
2022-06-16 17:36:08 +02:00
|
|
|
use tracing::{debug, debug_span, error, trace, Span};
|
2021-09-02 19:22:15 +02:00
|
|
|
|
|
|
|
use crate::{
|
2022-06-16 17:36:08 +02:00
|
|
|
action::{PostSpawn, PreSpawn},
|
|
|
|
command::Command,
|
2021-09-02 19:22:15 +02:00
|
|
|
error::RuntimeError,
|
2022-06-11 08:43:11 +02:00
|
|
|
event::{Event, Priority, Source, Tag},
|
2022-06-16 17:36:08 +02:00
|
|
|
handler::{rte, HandlerLock},
|
2021-10-16 15:24:36 +02:00
|
|
|
signal::process::SubSignal,
|
2021-09-02 19:22:15 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
use super::Process;
|
|
|
|
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
|
|
enum Intervention {
|
|
|
|
Kill,
|
2021-10-16 15:24:36 +02:00
|
|
|
Signal(SubSignal),
|
2021-09-02 19:22:15 +02:00
|
|
|
}
|
|
|
|
|
2022-06-16 17:36:08 +02:00
|
|
|
/// A task which supervises a sequence of processes.
|
2021-10-16 15:24:36 +02:00
|
|
|
///
|
2022-06-16 17:36:08 +02:00
|
|
|
/// This spawns processes from a vec of [`Command`]s in order and waits for each to complete while
|
|
|
|
/// handling interventions to itself: orders to terminate, or to send a signal to the current
|
|
|
|
/// process. It also immediately issues a [`Tag::ProcessCompletion`] event when the set completes.
|
2021-09-02 19:22:15 +02:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct Supervisor {
|
|
|
|
intervene: Sender<Intervention>,
|
2022-01-30 12:00:51 +01:00
|
|
|
ongoing: watch::Receiver<bool>,
|
2021-09-02 19:22:15 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Supervisor {
|
2022-06-16 17:36:08 +02:00
|
|
|
/// Spawns the command set, the supervision task, and returns a new control object.
|
2021-09-02 19:22:15 +02:00
|
|
|
pub fn spawn(
|
|
|
|
errors: Sender<RuntimeError>,
|
2022-06-11 08:43:11 +02:00
|
|
|
events: priority::Sender<Event, Priority>,
|
2022-06-16 17:36:08 +02:00
|
|
|
mut commands: Vec<Command>,
|
2021-09-02 19:22:15 +02:00
|
|
|
grouped: bool,
|
2022-06-16 17:36:08 +02:00
|
|
|
actioned_events: Arc<[Event]>,
|
|
|
|
pre_spawn_handler: HandlerLock<PreSpawn>,
|
|
|
|
post_spawn_handler: HandlerLock<PostSpawn>,
|
2021-09-02 19:22:15 +02:00
|
|
|
) -> Result<Self, RuntimeError> {
|
2022-06-16 17:36:08 +02:00
|
|
|
// get commands in reverse order so pop() returns the next to run
|
|
|
|
commands.reverse();
|
|
|
|
let next = commands.pop().ok_or(RuntimeError::NoCommands)?;
|
2021-09-02 19:22:15 +02:00
|
|
|
|
2022-01-30 12:00:51 +01:00
|
|
|
let (notify, waiter) = watch::channel(true);
|
2021-09-02 19:22:15 +02:00
|
|
|
let (int_s, int_r) = mpsc::channel(8);
|
|
|
|
|
2022-01-28 16:13:08 +01:00
|
|
|
spawn(async move {
|
2022-06-16 17:36:08 +02:00
|
|
|
let span = debug_span!("supervisor");
|
2021-09-02 19:22:15 +02:00
|
|
|
|
2022-06-16 17:36:08 +02:00
|
|
|
let mut next = next;
|
|
|
|
let mut commands = commands;
|
|
|
|
let mut int = int_r;
|
2021-09-02 19:22:15 +02:00
|
|
|
|
|
|
|
loop {
|
2022-06-16 17:36:08 +02:00
|
|
|
let (mut process, pid) = match spawn_process(
|
|
|
|
span.clone(),
|
|
|
|
next,
|
|
|
|
grouped,
|
|
|
|
actioned_events.clone(),
|
|
|
|
pre_spawn_handler.clone(),
|
|
|
|
post_spawn_handler.clone(),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
Ok(pp) => pp,
|
|
|
|
Err(err) => {
|
|
|
|
let _enter = span.enter();
|
|
|
|
error!(%err, "while spawning process");
|
|
|
|
errors.send(err).await.ok();
|
|
|
|
trace!("marking process as done");
|
|
|
|
notify
|
|
|
|
.send(false)
|
|
|
|
.unwrap_or_else(|e| trace!(%e, "error sending process complete"));
|
|
|
|
trace!("closing supervisor task early");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
span.in_scope(|| debug!(?process, ?pid, "spawned process"));
|
|
|
|
|
|
|
|
loop {
|
|
|
|
select! {
|
|
|
|
p = process.wait() => {
|
|
|
|
match p {
|
|
|
|
Ok(_) => break, // deal with it below
|
|
|
|
Err(err) => {
|
|
|
|
let _enter = span.enter();
|
|
|
|
error!(%err, "while waiting on process");
|
|
|
|
errors.try_send(err).ok();
|
|
|
|
trace!("marking process as done");
|
|
|
|
notify.send(false).unwrap_or_else(|e| trace!(%e, "error sending process complete"));
|
|
|
|
trace!("closing supervisor task early");
|
|
|
|
return;
|
2021-09-02 19:22:15 +02:00
|
|
|
}
|
|
|
|
}
|
2022-06-16 17:36:08 +02:00
|
|
|
},
|
|
|
|
Some(int) = int.recv() => {
|
|
|
|
match int {
|
|
|
|
Intervention::Kill => {
|
|
|
|
if let Err(err) = process.kill().await {
|
|
|
|
let _enter = span.enter();
|
|
|
|
error!(%err, "while killing process");
|
|
|
|
errors.try_send(err).ok();
|
|
|
|
trace!("continuing to watch command");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#[cfg(unix)]
|
|
|
|
Intervention::Signal(sig) => {
|
|
|
|
let _enter = span.enter();
|
|
|
|
if let Some(sig) = sig.to_nix() {
|
|
|
|
if let Err(err) = process.signal(sig) {
|
|
|
|
error!(%err, "while sending signal to process");
|
|
|
|
errors.try_send(err).ok();
|
|
|
|
trace!("continuing to watch command");
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
let err = RuntimeError::UnsupportedSignal(sig);
|
2021-10-16 15:24:36 +02:00
|
|
|
error!(%err, "while sending signal to process");
|
2022-06-16 17:36:08 +02:00
|
|
|
errors.try_send(err).ok();
|
2021-10-16 15:24:36 +02:00
|
|
|
trace!("continuing to watch command");
|
|
|
|
}
|
2022-06-16 17:36:08 +02:00
|
|
|
}
|
|
|
|
#[cfg(windows)]
|
|
|
|
Intervention::Signal(sig) => {
|
|
|
|
let _enter = span.enter();
|
|
|
|
// https://github.com/watchexec/watchexec/issues/219
|
2021-10-16 15:24:36 +02:00
|
|
|
let err = RuntimeError::UnsupportedSignal(sig);
|
2021-09-02 19:22:15 +02:00
|
|
|
error!(%err, "while sending signal to process");
|
2022-06-16 17:36:08 +02:00
|
|
|
errors.try_send(err).ok();
|
2021-09-02 19:22:15 +02:00
|
|
|
trace!("continuing to watch command");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-06-16 17:36:08 +02:00
|
|
|
else => break,
|
2021-09-02 19:22:15 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-06-16 17:36:08 +02:00
|
|
|
span.in_scope(|| trace!("got out of loop, waiting once more"));
|
|
|
|
match process.wait().await {
|
|
|
|
Err(err) => {
|
|
|
|
let _enter = span.enter();
|
|
|
|
error!(%err, "while waiting on process");
|
|
|
|
errors.try_send(err).ok();
|
|
|
|
}
|
|
|
|
Ok(status) => {
|
|
|
|
let event = span.in_scope(|| {
|
|
|
|
let event = Event {
|
|
|
|
tags: vec![
|
|
|
|
Tag::Source(Source::Internal),
|
2023-01-06 14:53:49 +01:00
|
|
|
Tag::ProcessCompletion(status.map(Into::into)),
|
2022-06-16 17:36:08 +02:00
|
|
|
],
|
|
|
|
metadata: Default::default(),
|
|
|
|
};
|
|
|
|
|
|
|
|
debug!(?event, "creating synthetic process completion event");
|
|
|
|
event
|
|
|
|
});
|
|
|
|
|
|
|
|
if let Err(err) = events.send(event, Priority::Low).await {
|
|
|
|
let _enter = span.enter();
|
|
|
|
error!(%err, "while sending process completion event");
|
|
|
|
errors
|
|
|
|
.try_send(RuntimeError::EventChannelSend {
|
|
|
|
ctx: "command supervisor",
|
|
|
|
err,
|
|
|
|
})
|
|
|
|
.ok();
|
|
|
|
}
|
2021-09-02 19:22:15 +02:00
|
|
|
}
|
|
|
|
}
|
2022-06-16 17:36:08 +02:00
|
|
|
|
|
|
|
let _enter = span.enter();
|
|
|
|
if let Some(cmd) = commands.pop() {
|
|
|
|
debug!(?cmd, "queuing up next command");
|
|
|
|
next = cmd;
|
|
|
|
} else {
|
|
|
|
debug!("no more commands to supervise");
|
|
|
|
break;
|
|
|
|
}
|
2021-09-02 19:22:15 +02:00
|
|
|
}
|
|
|
|
|
2022-06-16 17:36:08 +02:00
|
|
|
let _enter = span.enter();
|
2021-09-02 23:25:06 +02:00
|
|
|
trace!("marking process as done");
|
2022-01-30 12:00:51 +01:00
|
|
|
notify
|
|
|
|
.send(false)
|
|
|
|
.unwrap_or_else(|e| trace!(%e, "error sending process complete"));
|
2021-09-02 23:25:06 +02:00
|
|
|
trace!("closing supervisor task");
|
2021-09-02 19:22:15 +02:00
|
|
|
});
|
|
|
|
|
|
|
|
Ok(Self {
|
2022-01-30 12:00:51 +01:00
|
|
|
ongoing: waiter,
|
2021-09-02 19:22:15 +02:00
|
|
|
intervene: int_s,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2021-10-16 15:24:36 +02:00
|
|
|
/// Issues a signal to the process.
|
|
|
|
///
|
|
|
|
/// On Windows, this currently only supports [`SubSignal::ForceStop`].
|
|
|
|
///
|
|
|
|
/// While this is async, it returns once the signal intervention has been sent internally, not
|
|
|
|
/// when the signal has been delivered.
|
|
|
|
pub async fn signal(&self, signal: SubSignal) {
|
|
|
|
if cfg!(windows) {
|
2023-01-06 14:53:49 +01:00
|
|
|
if signal == SubSignal::ForceStop {
|
2021-10-16 15:24:36 +02:00
|
|
|
self.intervene.send(Intervention::Kill).await.ok();
|
|
|
|
}
|
|
|
|
// else: https://github.com/watchexec/watchexec/issues/219
|
|
|
|
} else {
|
|
|
|
trace!(?signal, "sending signal intervention");
|
|
|
|
self.intervene.send(Intervention::Signal(signal)).await.ok();
|
|
|
|
}
|
2021-09-02 19:43:26 +02:00
|
|
|
// only errors on channel closed, and that only happens if the process is dead
|
2021-09-02 19:22:15 +02:00
|
|
|
}
|
|
|
|
|
2021-10-16 15:24:36 +02:00
|
|
|
/// Stops the process.
|
|
|
|
///
|
|
|
|
/// While this is async, it returns once the signal intervention has been sent internally, not
|
|
|
|
/// when the signal has been delivered.
|
2021-09-02 19:43:26 +02:00
|
|
|
pub async fn kill(&self) {
|
2021-09-02 19:22:15 +02:00
|
|
|
trace!("sending kill intervention");
|
2021-09-02 19:43:26 +02:00
|
|
|
self.intervene.send(Intervention::Kill).await.ok();
|
|
|
|
// only errors on channel closed, and that only happens if the process is dead
|
2021-09-02 19:22:15 +02:00
|
|
|
}
|
|
|
|
|
2021-10-16 15:24:36 +02:00
|
|
|
/// Returns true if the supervisor is still running.
|
|
|
|
///
|
|
|
|
/// This is almost always equivalent to whether the _process_ is still running, but may not be
|
|
|
|
/// 100% in sync.
|
2021-09-02 19:22:15 +02:00
|
|
|
pub fn is_running(&self) -> bool {
|
2022-01-30 12:00:51 +01:00
|
|
|
let ongoing = *self.ongoing.borrow();
|
2021-09-02 23:25:06 +02:00
|
|
|
trace!(?ongoing, "supervisor state");
|
|
|
|
ongoing
|
2021-09-02 19:22:15 +02:00
|
|
|
}
|
|
|
|
|
2021-10-16 15:24:36 +02:00
|
|
|
/// Returns only when the supervisor completes.
|
|
|
|
///
|
|
|
|
/// This is almost always equivalent to waiting for the _process_ to complete, but may not be
|
|
|
|
/// 100% in sync.
|
2022-01-30 12:00:51 +01:00
|
|
|
pub async fn wait(&self) -> Result<(), RuntimeError> {
|
|
|
|
if !*self.ongoing.borrow() {
|
2022-01-30 10:29:33 +01:00
|
|
|
trace!("supervisor already completed (pre)");
|
2021-09-02 23:25:06 +02:00
|
|
|
return Ok(());
|
|
|
|
}
|
2021-09-02 19:22:15 +02:00
|
|
|
|
2022-01-30 12:00:51 +01:00
|
|
|
debug!("waiting on supervisor completion");
|
|
|
|
let mut ongoing = self.ongoing.clone();
|
|
|
|
// never completes if ongoing is marked false in between the previous check and now!
|
|
|
|
// TODO: select with something that sleeps a bit and rechecks the ongoing
|
|
|
|
ongoing
|
|
|
|
.changed()
|
|
|
|
.await
|
|
|
|
.map_err(|err| RuntimeError::InternalSupervisor(err.to_string()))?;
|
|
|
|
debug!("supervisor completed");
|
2021-09-02 19:22:15 +02:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
2022-06-16 17:36:08 +02:00
|
|
|
|
|
|
|
async fn spawn_process(
|
|
|
|
span: Span,
|
|
|
|
command: Command,
|
|
|
|
grouped: bool,
|
|
|
|
actioned_events: Arc<[Event]>,
|
|
|
|
pre_spawn_handler: HandlerLock<PreSpawn>,
|
|
|
|
post_spawn_handler: HandlerLock<PostSpawn>,
|
|
|
|
) -> Result<(Process, u32), RuntimeError> {
|
|
|
|
let (pre_spawn, spawnable) = span.in_scope::<_, Result<_, RuntimeError>>(|| {
|
|
|
|
debug!(%grouped, ?command, "preparing command");
|
|
|
|
let mut spawnable = command.to_spawnable()?;
|
|
|
|
spawnable.kill_on_drop(true);
|
|
|
|
|
|
|
|
debug!("running pre-spawn handler");
|
|
|
|
Ok(PreSpawn::new(
|
|
|
|
command.clone(),
|
|
|
|
spawnable,
|
|
|
|
actioned_events.clone(),
|
|
|
|
))
|
|
|
|
})?;
|
|
|
|
|
|
|
|
pre_spawn_handler
|
|
|
|
.call(pre_spawn)
|
|
|
|
.await
|
2023-01-06 14:53:49 +01:00
|
|
|
.map_err(|e| rte("action pre-spawn", e.as_ref()))?;
|
2022-06-16 17:36:08 +02:00
|
|
|
|
|
|
|
let (proc, id, post_spawn) = span.in_scope::<_, Result<_, RuntimeError>>(|| {
|
|
|
|
let mut spawnable = Arc::try_unwrap(spawnable)
|
|
|
|
.map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))?
|
|
|
|
.into_inner();
|
|
|
|
|
|
|
|
debug!(command=?spawnable, "spawning command");
|
|
|
|
let (proc, id) = if grouped {
|
|
|
|
let proc = spawnable
|
|
|
|
.group_spawn()
|
|
|
|
.map_err(|err| RuntimeError::IoError {
|
|
|
|
about: "spawning process group",
|
|
|
|
err,
|
|
|
|
})?;
|
|
|
|
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
|
|
|
|
debug!(pgid=%id, "process group spawned");
|
|
|
|
(Process::Grouped(proc), id)
|
|
|
|
} else {
|
|
|
|
let proc = spawnable.spawn().map_err(|err| RuntimeError::IoError {
|
|
|
|
about: "spawning process (ungrouped)",
|
|
|
|
err,
|
|
|
|
})?;
|
|
|
|
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
|
|
|
|
debug!(pid=%id, "process spawned");
|
|
|
|
(Process::Ungrouped(proc), id)
|
|
|
|
};
|
|
|
|
|
|
|
|
debug!("running post-spawn handler");
|
|
|
|
Ok((
|
|
|
|
proc,
|
|
|
|
id,
|
|
|
|
PostSpawn {
|
|
|
|
command: command.clone(),
|
|
|
|
events: actioned_events.clone(),
|
|
|
|
id,
|
|
|
|
grouped,
|
|
|
|
},
|
|
|
|
))
|
|
|
|
})?;
|
|
|
|
|
|
|
|
post_spawn_handler
|
|
|
|
.call(post_spawn)
|
|
|
|
.await
|
2023-01-06 14:53:49 +01:00
|
|
|
.map_err(|e| rte("action post-spawn", e.as_ref()))?;
|
2022-06-16 17:36:08 +02:00
|
|
|
|
|
|
|
Ok((proc, id))
|
|
|
|
}
|