watchexec/lib/src/watchexec.rs

185 lines
4.7 KiB
Rust
Raw Normal View History

use std::{
fmt,
mem::{replace, take},
sync::Arc,
};
2021-08-18 15:12:50 +02:00
use atomic_take::AtomicTake;
2021-08-18 15:12:50 +02:00
use futures::FutureExt;
use tokio::{
spawn,
sync::{mpsc, watch, Notify},
task::{JoinError, JoinHandle},
try_join,
};
2021-08-21 16:48:00 +02:00
use tracing::{debug, error, trace};
2021-08-18 15:12:50 +02:00
use crate::{
2021-08-20 18:43:55 +02:00
action,
config::{InitConfig, RuntimeConfig},
error::{CriticalError, ReconfigError, RuntimeError},
event::Event,
fs,
2021-08-21 19:58:03 +02:00
handler::{rte, Handler},
signal,
2021-08-18 15:12:50 +02:00
};
pub struct Watchexec {
handle: Arc<AtomicTake<JoinHandle<Result<(), CriticalError>>>>,
2021-08-18 15:12:50 +02:00
start_lock: Arc<Notify>,
2021-08-20 18:43:55 +02:00
action_watch: watch::Sender<action::WorkingData>,
2021-08-18 15:12:50 +02:00
fs_watch: watch::Sender<fs::WorkingData>,
event_input: mpsc::Sender<Event>,
2021-08-18 15:12:50 +02:00
}
impl fmt::Debug for Watchexec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Watchexec").finish_non_exhaustive()
}
}
2021-08-18 15:12:50 +02:00
impl Watchexec {
/// TODO
///
/// Returns an [`Arc`] for convenience; use [`try_unwrap`][Arc::try_unwrap()] to get the value
/// directly if needed.
pub fn new(
mut init: InitConfig,
mut runtime: RuntimeConfig,
) -> Result<Arc<Self>, CriticalError> {
2021-08-22 16:30:56 +02:00
debug!(?init, ?runtime, pid=%std::process::id(), "initialising");
2021-08-21 16:48:00 +02:00
let (ev_s, ev_r) = mpsc::channel(init.event_channel_size);
let (ac_s, ac_r) = watch::channel(take(&mut runtime.action));
2021-08-22 17:11:58 +02:00
let (fs_s, fs_r) = watch::channel(fs::WorkingData::default());
let event_input = ev_s.clone();
2021-08-22 17:11:58 +02:00
// TODO: figure out how to do this (aka start the fs work) after the main task start lock
trace!("sending initial config to fs worker");
fs_s.send(take(&mut runtime.fs))
.expect("cannot send to just-created fs watch (bug)");
2021-08-18 15:12:50 +02:00
2021-08-21 16:48:00 +02:00
trace!("creating main task");
2021-08-18 15:12:50 +02:00
let notify = Arc::new(Notify::new());
let start_lock = notify.clone();
let handle = spawn(async move {
2021-08-21 16:48:00 +02:00
trace!("waiting for start lock");
2021-08-18 15:12:50 +02:00
notify.notified().await;
2021-08-21 16:48:00 +02:00
debug!("starting main task");
2021-08-18 15:12:50 +02:00
let (er_s, er_r) = mpsc::channel(init.error_channel_size);
let eh = replace(&mut init.error_handler, Box::new(()) as _);
2021-08-18 15:12:50 +02:00
macro_rules! subtask {
2021-08-21 16:48:00 +02:00
($name:ident, $task:expr) => {{
debug!(subtask=%stringify!($name), "spawning subtask");
2021-08-18 15:12:50 +02:00
spawn($task).then(|jr| async { flatten(jr) })
2021-08-21 16:48:00 +02:00
}};
2021-08-18 15:12:50 +02:00
}
let action = subtask!(
action,
action::worker(ac_r, er_s.clone(), ev_s.clone(), ev_r)
);
2021-08-21 16:48:00 +02:00
let fs = subtask!(fs, fs::worker(fs_r, er_s.clone(), ev_s.clone()));
let signal = subtask!(signal, signal::worker(er_s.clone(), ev_s.clone()));
2021-08-18 15:12:50 +02:00
2021-08-21 16:48:00 +02:00
let error_hook = subtask!(error_hook, error_hook(er_r, eh));
try_join!(action, error_hook, fs, signal)
.map(drop)
.or_else(|e| {
if matches!(e, CriticalError::Exit) {
trace!("got graceful exit request via critical error, erasing the error");
Ok(())
} else {
Err(e)
}
})
2021-08-22 17:36:10 +02:00
.map(|_| {
debug!("main task graceful exit");
})
2021-08-18 15:12:50 +02:00
});
2021-08-21 16:48:00 +02:00
trace!("done with setup");
Ok(Arc::new(Self {
handle: Arc::new(AtomicTake::new(handle)),
2021-08-18 15:12:50 +02:00
start_lock,
2021-08-20 18:43:55 +02:00
action_watch: ac_s,
2021-08-18 15:12:50 +02:00
fs_watch: fs_s,
event_input,
}))
2021-08-18 15:12:50 +02:00
}
2021-08-22 14:31:39 +02:00
pub fn reconfigure(&self, config: RuntimeConfig) -> Result<(), ReconfigError> {
2021-08-21 16:48:00 +02:00
debug!(?config, "reconfiguring");
2021-08-20 18:43:55 +02:00
self.action_watch.send(config.action)?;
2021-08-18 15:12:50 +02:00
self.fs_watch.send(config.fs)?;
Ok(())
}
/// Inputs an [`Event`] directly.
///
/// This can be useful for testing, for custom event sources, or for one-off action triggers
/// (for example, on start).
///
/// Hint: use [`Event::default()`] to send an empty event (which won't be filtered).
pub async fn send_event(&self, event: Event) -> Result<(), CriticalError> {
self.event_input.send(event).await?;
Ok(())
}
/// Start watchexec and obtain the handle to its main task.
///
/// This must only be called once.
///
/// # Panics
/// Panics if called twice.
pub fn main(&self) -> JoinHandle<Result<(), CriticalError>> {
2021-08-21 16:48:00 +02:00
trace!("notifying start lock");
2021-08-18 15:12:50 +02:00
self.start_lock.notify_one();
2021-08-21 16:48:00 +02:00
debug!("handing over main task handle");
self.handle
.take()
.expect("Watchexec::main was called twice")
2021-08-18 15:12:50 +02:00
}
}
#[inline]
fn flatten(join_res: Result<Result<(), CriticalError>, JoinError>) -> Result<(), CriticalError> {
join_res
.map_err(CriticalError::MainTaskJoin)
.and_then(|x| x)
}
async fn error_hook(
mut errors: mpsc::Receiver<RuntimeError>,
mut handler: Box<dyn Handler<RuntimeError> + Send>,
) -> Result<(), CriticalError> {
while let Some(err) = errors.recv().await {
2021-08-22 16:32:08 +02:00
if matches!(err, RuntimeError::Exit) {
trace!("got graceful exit request via runtime error, upgrading to crit");
return Err(CriticalError::Exit);
}
2021-08-21 16:48:00 +02:00
error!(%err, "runtime error");
if let Err(err) = handler.handle(err) {
error!(%err, "error while handling error");
handler
2021-08-21 19:58:03 +02:00
.handle(rte("error hook", err))
2021-08-21 16:48:00 +02:00
.unwrap_or_else(|err| {
error!(%err, "error while handling error of handling error");
});
}
}
Ok(())
}