watchexec/lib/src/command/supervisor.rs

267 lines
7.6 KiB
Rust
Raw Normal View History

2021-09-02 23:25:06 +02:00
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
2021-10-16 15:24:36 +02:00
use command_group::AsyncCommandGroup;
use tokio::{
process::Command,
select, spawn,
sync::{
mpsc::{self, Sender},
2021-09-02 23:25:06 +02:00
oneshot,
},
};
use tracing::{debug, error, trace};
use crate::{
error::RuntimeError,
2021-09-13 09:51:07 +02:00
event::{Event, Source, Tag},
2021-10-16 15:24:36 +02:00
signal::process::SubSignal,
};
use super::Process;
#[derive(Clone, Copy, Debug)]
enum Intervention {
Kill,
2021-10-16 15:24:36 +02:00
Signal(SubSignal),
}
2021-10-16 15:24:36 +02:00
/// A task which supervises a process.
///
/// This spawns a process from a [`Command`] and waits for it to complete while handling
/// interventions to it: orders to terminate it, or to send a signal to it. It also immediately
/// issues a [`Tag::ProcessCompletion`] event when the process completes.
#[derive(Debug)]
pub struct Supervisor {
id: u32,
intervene: Sender<Intervention>,
2021-12-09 12:53:18 +01:00
2021-09-02 23:25:06 +02:00
// why this and not a watch::channel? two reasons:
// 1. I tried the watch and ran into some race conditions???
// 2. This way it's typed-enforced that I send only once
waiter: Option<oneshot::Receiver<()>>,
ongoing: Arc<AtomicBool>,
}
impl Supervisor {
2021-10-16 15:24:36 +02:00
/// Spawns the command, the supervision task, and returns a new control object.
pub fn spawn(
errors: Sender<RuntimeError>,
events: Sender<Event>,
command: &mut Command,
grouped: bool,
) -> Result<Self, RuntimeError> {
debug!(%grouped, ?command, "spawning command");
let (process, id) = if grouped {
let proc = command.group_spawn().map_err(|err| RuntimeError::IoError {
2022-01-25 06:52:16 +01:00
about: "spawning process group",
err,
})?;
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
debug!(pgid=%id, "process group spawned");
(Process::Grouped(proc), id)
} else {
let proc = command.spawn().map_err(|err| RuntimeError::IoError {
about: "spawning process (ungrouped)",
err,
})?;
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
debug!(pid=%id, "process spawned");
(Process::Ungrouped(proc), id)
};
2021-09-02 23:25:06 +02:00
let ongoing = Arc::new(AtomicBool::new(true));
let (notify, waiter) = oneshot::channel();
let (int_s, int_r) = mpsc::channel(8);
2021-09-02 23:25:06 +02:00
let going = ongoing.clone();
2022-01-28 16:13:08 +01:00
spawn(async move {
let mut process = process;
let mut int = int_r;
debug!(?process, "starting task to watch on process");
loop {
select! {
p = process.wait() => {
match p {
Ok(_) => break, // deal with it below
Err(err) => {
error!(%err, "while waiting on process");
errors.send(err).await.ok();
2021-09-02 23:25:06 +02:00
trace!("marking process as done");
going.store(false, Ordering::SeqCst);
trace!("closing supervisor task early");
notify.send(()).ok();
return;
}
}
},
Some(int) = int.recv() => {
match int {
Intervention::Kill => {
if let Err(err) = process.kill().await {
error!(%err, "while killing process");
errors.send(err).await.ok();
trace!("continuing to watch command");
}
}
#[cfg(unix)]
Intervention::Signal(sig) => {
2021-10-16 15:24:36 +02:00
if let Some(sig) = sig.to_nix() {
if let Err(err) = process.signal(sig) {
error!(%err, "while sending signal to process");
errors.send(err).await.ok();
trace!("continuing to watch command");
}
} else {
let err = RuntimeError::UnsupportedSignal(sig);
error!(%err, "while sending signal to process");
errors.send(err).await.ok();
trace!("continuing to watch command");
}
}
2021-10-16 15:24:36 +02:00
#[cfg(windows)]
Intervention::Signal(sig) => {
// https://github.com/watchexec/watchexec/issues/219
let err = RuntimeError::UnsupportedSignal(sig);
error!(%err, "while sending signal to process");
errors.send(err).await.ok();
trace!("continuing to watch command");
}
}
}
else => break,
}
}
trace!("got out of loop, waiting once more");
match process.wait().await {
Err(err) => {
error!(%err, "while waiting on process");
errors.send(err).await.ok();
}
Ok(status) => {
let event = Event {
2021-09-13 09:34:40 +02:00
tags: vec![
Tag::Source(Source::Internal),
Tag::ProcessCompletion(status.map(|s| s.into())),
2021-09-02 21:57:45 +02:00
],
metadata: Default::default(),
};
debug!(?event, "creating synthetic process completion event");
if let Err(err) = events.send(event).await {
error!(%err, "while sending process completion event");
errors
.send(RuntimeError::EventChannelSend {
ctx: "command supervisor",
err,
})
.await
.ok();
}
}
}
2021-09-02 23:25:06 +02:00
trace!("marking process as done");
going.store(false, Ordering::SeqCst);
trace!("closing supervisor task");
notify.send(()).ok();
});
Ok(Self {
id,
2021-09-02 23:25:06 +02:00
waiter: Some(waiter),
ongoing,
intervene: int_s,
})
}
2021-10-16 15:24:36 +02:00
/// Get the PID of the process or process group.
///
/// This always successfully returns a PID, even if the process has already exited, as the PID
/// is held as soon as the process spawns. Take care not to use this for process manipulation
/// once the process has exited, as the ID may have been reused already.
pub fn id(&self) -> u32 {
self.id
}
2021-10-16 15:24:36 +02:00
/// Issues a signal to the process.
///
/// On Windows, this currently only supports [`SubSignal::ForceStop`].
///
/// While this is async, it returns once the signal intervention has been sent internally, not
/// when the signal has been delivered.
pub async fn signal(&self, signal: SubSignal) {
if cfg!(windows) {
if let SubSignal::ForceStop = signal {
self.intervene.send(Intervention::Kill).await.ok();
}
// else: https://github.com/watchexec/watchexec/issues/219
} else {
trace!(?signal, "sending signal intervention");
self.intervene.send(Intervention::Signal(signal)).await.ok();
}
// only errors on channel closed, and that only happens if the process is dead
}
2021-10-16 15:24:36 +02:00
/// Stops the process.
///
/// While this is async, it returns once the signal intervention has been sent internally, not
/// when the signal has been delivered.
pub async fn kill(&self) {
trace!("sending kill intervention");
self.intervene.send(Intervention::Kill).await.ok();
// only errors on channel closed, and that only happens if the process is dead
}
2021-10-16 15:24:36 +02:00
/// Returns true if the supervisor is still running.
///
/// This is almost always equivalent to whether the _process_ is still running, but may not be
/// 100% in sync.
pub fn is_running(&self) -> bool {
2021-09-02 23:25:06 +02:00
let ongoing = self.ongoing.load(Ordering::SeqCst);
trace!(?ongoing, "supervisor state");
ongoing
}
2021-10-16 15:24:36 +02:00
/// Returns only when the supervisor completes.
///
/// This is almost always equivalent to waiting for the _process_ to complete, but may not be
/// 100% in sync.
pub async fn wait(&mut self) -> Result<(), RuntimeError> {
2021-09-02 23:25:06 +02:00
if !self.ongoing.load(Ordering::SeqCst) {
trace!("supervisor already completed");
return Ok(());
}
2021-09-02 23:25:06 +02:00
if let Some(waiter) = self.waiter.take() {
debug!("waiting on supervisor completion");
waiter
.await
.map_err(|err| RuntimeError::InternalSupervisor(err.to_string()))?;
2021-09-02 23:25:06 +02:00
debug!("supervisor completed");
if self.ongoing.swap(false, Ordering::SeqCst) {
#[cfg(debug_assertions)]
panic!("oneshot completed but ongoing was true, this should never happen");
#[cfg(not(debug_assertions))]
tracing::warn!("oneshot completed but ongoing was true, this should never happen");
}
2021-09-02 23:25:06 +02:00
} else {
#[cfg(debug_assertions)]
panic!("waiter is None but ongoing was true, this should never happen");
2021-09-29 12:43:39 +02:00
#[cfg(not(debug_assertions))]
{
self.ongoing.store(false, Ordering::SeqCst);
tracing::warn!("waiter is None but ongoing was true, this should never happen");
}
}
Ok(())
}
}