2021-08-21 10:46:44 +02:00
|
|
|
use std::{
|
|
|
|
fmt,
|
|
|
|
mem::{replace, take},
|
|
|
|
sync::Arc,
|
|
|
|
};
|
2021-08-18 15:12:50 +02:00
|
|
|
|
2021-08-19 16:55:34 +02:00
|
|
|
use atomic_take::AtomicTake;
|
2021-08-18 15:12:50 +02:00
|
|
|
use futures::FutureExt;
|
|
|
|
use tokio::{
|
|
|
|
spawn,
|
|
|
|
sync::{mpsc, watch, Notify},
|
|
|
|
task::{JoinError, JoinHandle},
|
|
|
|
try_join,
|
|
|
|
};
|
2021-08-21 16:48:00 +02:00
|
|
|
use tracing::{debug, error, trace};
|
2021-08-18 15:12:50 +02:00
|
|
|
|
|
|
|
use crate::{
|
2021-08-20 18:43:55 +02:00
|
|
|
action,
|
2021-08-21 10:46:44 +02:00
|
|
|
config::{InitConfig, RuntimeConfig},
|
|
|
|
error::{CriticalError, ReconfigError, RuntimeError},
|
2021-08-24 12:20:44 +02:00
|
|
|
event::Event,
|
2021-08-21 10:46:44 +02:00
|
|
|
fs,
|
2021-08-21 19:58:03 +02:00
|
|
|
handler::{rte, Handler},
|
2021-08-21 10:46:44 +02:00
|
|
|
signal,
|
2021-08-18 15:12:50 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
pub struct Watchexec {
|
2021-08-19 16:55:34 +02:00
|
|
|
handle: Arc<AtomicTake<JoinHandle<Result<(), CriticalError>>>>,
|
2021-08-18 15:12:50 +02:00
|
|
|
start_lock: Arc<Notify>,
|
2021-08-21 10:46:44 +02:00
|
|
|
|
2021-08-20 18:43:55 +02:00
|
|
|
action_watch: watch::Sender<action::WorkingData>,
|
2021-08-18 15:12:50 +02:00
|
|
|
fs_watch: watch::Sender<fs::WorkingData>,
|
2021-08-24 12:20:44 +02:00
|
|
|
|
|
|
|
event_input: mpsc::Sender<Event>,
|
2021-08-18 15:12:50 +02:00
|
|
|
}
|
|
|
|
|
2021-08-21 10:46:44 +02:00
|
|
|
impl fmt::Debug for Watchexec {
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
f.debug_struct("Watchexec").finish_non_exhaustive()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-18 15:12:50 +02:00
|
|
|
impl Watchexec {
|
2021-08-19 16:55:34 +02:00
|
|
|
/// TODO
|
|
|
|
///
|
|
|
|
/// Returns an [`Arc`] for convenience; use [`try_unwrap`][Arc::try_unwrap()] to get the value
|
|
|
|
/// directly if needed.
|
2021-08-21 10:46:44 +02:00
|
|
|
pub fn new(
|
|
|
|
mut init: InitConfig,
|
|
|
|
mut runtime: RuntimeConfig,
|
|
|
|
) -> Result<Arc<Self>, CriticalError> {
|
2021-08-22 16:30:56 +02:00
|
|
|
debug!(?init, ?runtime, pid=%std::process::id(), "initialising");
|
2021-08-21 16:48:00 +02:00
|
|
|
|
2021-08-24 12:20:44 +02:00
|
|
|
let (ev_s, ev_r) = mpsc::channel(init.event_channel_size);
|
2021-08-21 10:46:44 +02:00
|
|
|
let (ac_s, ac_r) = watch::channel(take(&mut runtime.action));
|
2021-08-22 17:11:58 +02:00
|
|
|
let (fs_s, fs_r) = watch::channel(fs::WorkingData::default());
|
|
|
|
|
2021-08-24 12:20:44 +02:00
|
|
|
let event_input = ev_s.clone();
|
|
|
|
|
2021-08-22 17:11:58 +02:00
|
|
|
// TODO: figure out how to do this (aka start the fs work) after the main task start lock
|
|
|
|
trace!("sending initial config to fs worker");
|
|
|
|
fs_s.send(take(&mut runtime.fs))
|
|
|
|
.expect("cannot send to just-created fs watch (bug)");
|
2021-08-18 15:12:50 +02:00
|
|
|
|
2021-08-21 16:48:00 +02:00
|
|
|
trace!("creating main task");
|
2021-08-18 15:12:50 +02:00
|
|
|
let notify = Arc::new(Notify::new());
|
|
|
|
let start_lock = notify.clone();
|
|
|
|
let handle = spawn(async move {
|
2021-08-21 16:48:00 +02:00
|
|
|
trace!("waiting for start lock");
|
2021-08-18 15:12:50 +02:00
|
|
|
notify.notified().await;
|
2021-08-21 16:48:00 +02:00
|
|
|
debug!("starting main task");
|
2021-08-18 15:12:50 +02:00
|
|
|
|
2021-08-21 10:46:44 +02:00
|
|
|
let (er_s, er_r) = mpsc::channel(init.error_channel_size);
|
|
|
|
|
|
|
|
let eh = replace(&mut init.error_handler, Box::new(()) as _);
|
2021-08-18 15:12:50 +02:00
|
|
|
|
|
|
|
macro_rules! subtask {
|
2021-08-21 16:48:00 +02:00
|
|
|
($name:ident, $task:expr) => {{
|
|
|
|
debug!(subtask=%stringify!($name), "spawning subtask");
|
2021-08-18 15:12:50 +02:00
|
|
|
spawn($task).then(|jr| async { flatten(jr) })
|
2021-08-21 16:48:00 +02:00
|
|
|
}};
|
2021-08-18 15:12:50 +02:00
|
|
|
}
|
|
|
|
|
2021-09-02 19:22:15 +02:00
|
|
|
let action = subtask!(
|
|
|
|
action,
|
|
|
|
action::worker(ac_r, er_s.clone(), ev_s.clone(), ev_r)
|
|
|
|
);
|
2021-08-21 16:48:00 +02:00
|
|
|
let fs = subtask!(fs, fs::worker(fs_r, er_s.clone(), ev_s.clone()));
|
|
|
|
let signal = subtask!(signal, signal::worker(er_s.clone(), ev_s.clone()));
|
2021-08-18 15:12:50 +02:00
|
|
|
|
2021-08-21 16:48:00 +02:00
|
|
|
let error_hook = subtask!(error_hook, error_hook(er_r, eh));
|
2021-08-21 10:46:44 +02:00
|
|
|
|
2021-08-22 16:36:58 +02:00
|
|
|
try_join!(action, error_hook, fs, signal)
|
|
|
|
.map(drop)
|
|
|
|
.or_else(|e| {
|
|
|
|
if matches!(e, CriticalError::Exit) {
|
|
|
|
trace!("got graceful exit request via critical error, erasing the error");
|
|
|
|
Ok(())
|
|
|
|
} else {
|
|
|
|
Err(e)
|
|
|
|
}
|
|
|
|
})
|
2021-08-22 17:36:10 +02:00
|
|
|
.map(|_| {
|
|
|
|
debug!("main task graceful exit");
|
|
|
|
})
|
2021-08-18 15:12:50 +02:00
|
|
|
});
|
|
|
|
|
2021-08-21 16:48:00 +02:00
|
|
|
trace!("done with setup");
|
2021-08-19 16:55:34 +02:00
|
|
|
Ok(Arc::new(Self {
|
|
|
|
handle: Arc::new(AtomicTake::new(handle)),
|
2021-08-18 15:12:50 +02:00
|
|
|
start_lock,
|
2021-08-21 10:46:44 +02:00
|
|
|
|
2021-08-20 18:43:55 +02:00
|
|
|
action_watch: ac_s,
|
2021-08-18 15:12:50 +02:00
|
|
|
fs_watch: fs_s,
|
2021-08-24 12:20:44 +02:00
|
|
|
|
|
|
|
event_input,
|
2021-08-19 16:55:34 +02:00
|
|
|
}))
|
2021-08-18 15:12:50 +02:00
|
|
|
}
|
|
|
|
|
2021-08-22 14:31:39 +02:00
|
|
|
pub fn reconfigure(&self, config: RuntimeConfig) -> Result<(), ReconfigError> {
|
2021-08-21 16:48:00 +02:00
|
|
|
debug!(?config, "reconfiguring");
|
2021-08-20 18:43:55 +02:00
|
|
|
self.action_watch.send(config.action)?;
|
2021-08-18 15:12:50 +02:00
|
|
|
self.fs_watch.send(config.fs)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-08-24 12:20:44 +02:00
|
|
|
/// Inputs an [`Event`] directly.
|
|
|
|
///
|
|
|
|
/// This can be useful for testing, for custom event sources, or for one-off action triggers
|
|
|
|
/// (for example, on start).
|
|
|
|
///
|
|
|
|
/// Hint: use [`Event::default()`] to send an empty event (which won't be filtered).
|
|
|
|
pub async fn send_event(&self, event: Event) -> Result<(), CriticalError> {
|
|
|
|
self.event_input.send(event).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-08-19 16:55:34 +02:00
|
|
|
/// Start watchexec and obtain the handle to its main task.
|
|
|
|
///
|
|
|
|
/// This must only be called once.
|
|
|
|
///
|
|
|
|
/// # Panics
|
|
|
|
/// Panics if called twice.
|
|
|
|
pub fn main(&self) -> JoinHandle<Result<(), CriticalError>> {
|
2021-08-21 16:48:00 +02:00
|
|
|
trace!("notifying start lock");
|
2021-08-18 15:12:50 +02:00
|
|
|
self.start_lock.notify_one();
|
2021-08-21 16:48:00 +02:00
|
|
|
|
|
|
|
debug!("handing over main task handle");
|
2021-08-19 16:55:34 +02:00
|
|
|
self.handle
|
|
|
|
.take()
|
|
|
|
.expect("Watchexec::main was called twice")
|
2021-08-18 15:12:50 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
fn flatten(join_res: Result<Result<(), CriticalError>, JoinError>) -> Result<(), CriticalError> {
|
|
|
|
join_res
|
|
|
|
.map_err(CriticalError::MainTaskJoin)
|
|
|
|
.and_then(|x| x)
|
|
|
|
}
|
2021-08-21 10:46:44 +02:00
|
|
|
|
|
|
|
async fn error_hook(
|
|
|
|
mut errors: mpsc::Receiver<RuntimeError>,
|
|
|
|
mut handler: Box<dyn Handler<RuntimeError> + Send>,
|
|
|
|
) -> Result<(), CriticalError> {
|
|
|
|
while let Some(err) = errors.recv().await {
|
2021-08-22 16:32:08 +02:00
|
|
|
if matches!(err, RuntimeError::Exit) {
|
|
|
|
trace!("got graceful exit request via runtime error, upgrading to crit");
|
|
|
|
return Err(CriticalError::Exit);
|
|
|
|
}
|
|
|
|
|
2021-08-21 16:48:00 +02:00
|
|
|
error!(%err, "runtime error");
|
|
|
|
if let Err(err) = handler.handle(err) {
|
|
|
|
error!(%err, "error while handling error");
|
2021-08-21 10:46:44 +02:00
|
|
|
handler
|
2021-08-21 19:58:03 +02:00
|
|
|
.handle(rte("error hook", err))
|
2021-08-21 16:48:00 +02:00
|
|
|
.unwrap_or_else(|err| {
|
|
|
|
error!(%err, "error while handling error of handling error");
|
|
|
|
});
|
2021-08-21 10:46:44 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|