watchexec/lib/src/command/supervisor.rs

214 lines
5.3 KiB
Rust
Raw Normal View History

2021-09-02 23:25:06 +02:00
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use command_group::{AsyncCommandGroup, Signal};
use tokio::{
process::Command,
select, spawn,
sync::{
mpsc::{self, Sender},
2021-09-02 23:25:06 +02:00
oneshot,
},
task::JoinHandle,
};
use tracing::{debug, error, trace};
use crate::{
error::RuntimeError,
2021-09-13 09:51:07 +02:00
event::{Event, Source, Tag},
};
use super::Process;
#[derive(Clone, Copy, Debug)]
enum Intervention {
Kill,
#[cfg(unix)]
Signal(Signal),
}
#[derive(Debug)]
pub struct Supervisor {
id: u32,
intervene: Sender<Intervention>,
handle: JoinHandle<()>,
2021-09-02 23:25:06 +02:00
// why this and not a watch::channel? two reasons:
// 1. I tried the watch and ran into some race conditions???
// 2. This way it's typed-enforced that I send only once
waiter: Option<oneshot::Receiver<()>>,
ongoing: Arc<AtomicBool>,
}
impl Supervisor {
pub fn spawn(
errors: Sender<RuntimeError>,
events: Sender<Event>,
command: &mut Command,
grouped: bool,
) -> Result<Self, RuntimeError> {
debug!(%grouped, ?command, "spawning command");
let (process, id) = if grouped {
let proc = command.group_spawn()?;
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
debug!(pgid=%id, "process group spawned");
(Process::Grouped(proc), id)
} else {
let proc = command.spawn()?;
let id = proc.id().ok_or(RuntimeError::ProcessDeadOnArrival)?;
debug!(pid=%id, "process spawned");
(Process::Ungrouped(proc), id)
};
2021-09-02 23:25:06 +02:00
let ongoing = Arc::new(AtomicBool::new(true));
let (notify, waiter) = oneshot::channel();
let (int_s, int_r) = mpsc::channel(8);
2021-09-02 23:25:06 +02:00
let going = ongoing.clone();
let handle = spawn(async move {
let mut process = process;
let mut int = int_r;
debug!(?process, "starting task to watch on process");
loop {
select! {
p = process.wait() => {
match p {
Ok(_) => break, // deal with it below
Err(err) => {
error!(%err, "while waiting on process");
errors.send(err).await.ok();
2021-09-02 23:25:06 +02:00
trace!("marking process as done");
going.store(false, Ordering::SeqCst);
trace!("closing supervisor task early");
notify.send(()).ok();
return;
}
}
},
Some(int) = int.recv() => {
match int {
Intervention::Kill => {
if let Err(err) = process.kill().await {
error!(%err, "while killing process");
errors.send(err).await.ok();
trace!("continuing to watch command");
}
}
#[cfg(unix)]
Intervention::Signal(sig) => {
if let Err(err) = process.signal(sig) {
error!(%err, "while sending signal to process");
errors.send(err).await.ok();
trace!("continuing to watch command");
}
}
}
}
else => break,
}
}
trace!("got out of loop, waiting once more");
match process.wait().await {
Err(err) => {
error!(%err, "while waiting on process");
errors.send(err).await.ok();
}
Ok(status) => {
let event = Event {
2021-09-13 09:34:40 +02:00
tags: vec![
Tag::Source(Source::Internal),
Tag::ProcessCompletion(status),
2021-09-02 21:57:45 +02:00
],
metadata: Default::default(),
};
debug!(?event, "creating synthetic process completion event");
if let Err(err) = events.send(event).await {
error!(%err, "while sending process completion event");
errors
.send(RuntimeError::EventChannelSend {
ctx: "command supervisor",
err,
})
.await
.ok();
}
}
}
2021-09-02 23:25:06 +02:00
trace!("marking process as done");
going.store(false, Ordering::SeqCst);
trace!("closing supervisor task");
notify.send(()).ok();
});
Ok(Self {
id,
2021-09-02 23:25:06 +02:00
waiter: Some(waiter),
ongoing,
intervene: int_s,
handle, // TODO: is there anything useful to do with this? do we need to keep it?
})
}
pub fn id(&self) -> u32 {
self.id
}
#[cfg(unix)]
pub async fn signal(&self, signal: Signal) {
trace!(?signal, "sending signal intervention");
self.intervene.send(Intervention::Signal(signal)).await.ok();
// only errors on channel closed, and that only happens if the process is dead
}
pub async fn kill(&self) {
trace!("sending kill intervention");
self.intervene.send(Intervention::Kill).await.ok();
// only errors on channel closed, and that only happens if the process is dead
}
pub fn is_running(&self) -> bool {
2021-09-02 23:25:06 +02:00
let ongoing = self.ongoing.load(Ordering::SeqCst);
trace!(?ongoing, "supervisor state");
ongoing
}
pub async fn wait(&mut self) -> Result<(), RuntimeError> {
2021-09-02 23:25:06 +02:00
if !self.ongoing.load(Ordering::SeqCst) {
trace!("supervisor already completed");
return Ok(());
}
2021-09-02 23:25:06 +02:00
if let Some(waiter) = self.waiter.take() {
debug!("waiting on supervisor completion");
waiter
.await
.map_err(|err| RuntimeError::InternalSupervisor(err.to_string()))?;
2021-09-02 23:25:06 +02:00
debug!("supervisor completed");
if self.ongoing.swap(false, Ordering::SeqCst) {
#[cfg(debug_assertions)]
panic!("oneshot completed but ongoing was true, this should never happen");
#[cfg(not(debug_assertions))]
tracing::warn!("oneshot completed but ongoing was true, this should never happen");
}
2021-09-02 23:25:06 +02:00
} else {
#[cfg(debug_assertions)]
panic!("waiter is None but ongoing was true, this should never happen");
2021-09-29 12:43:39 +02:00
#[cfg(not(debug_assertions))]
{
self.ongoing.store(false, Ordering::SeqCst);
tracing::warn!("waiter is None but ongoing was true, this should never happen");
}
}
Ok(())
}
}