diff --git a/lib/src/action.rs b/lib/src/action.rs index d263a40..e828df0 100644 --- a/lib/src/action.rs +++ b/lib/src/action.rs @@ -1,28 +1,53 @@ //! Processor responsible for receiving events, filtering them, and scheduling actions in response. -use std::time::{Duration, Instant}; +use std::{ + fmt, + sync::Arc, + time::{Duration, Instant}, +}; -use tokio::{sync::{mpsc, watch}, time::timeout}; +use atomic_take::AtomicTake; +use tokio::{ + sync::{mpsc, watch}, + time::timeout, +}; +use tracing::{debug, trace}; use crate::{ error::{CriticalError, RuntimeError}, event::Event, + handler::{rte, Handler}, }; -#[derive(Clone, Debug)] +#[derive(Clone)] #[non_exhaustive] pub struct WorkingData { pub throttle: Duration, + pub action_handler: Arc + Send>>>, +} + +impl fmt::Debug for WorkingData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WorkingData") + .field("throttle", &self.throttle) + .finish_non_exhaustive() + } } impl Default for WorkingData { fn default() -> Self { Self { throttle: Duration::from_millis(100), + action_handler: Arc::new(AtomicTake::new(Box::new(()) as _)), } } } +#[derive(Clone, Debug)] +pub struct Action { + pub events: Vec, +} + pub async fn worker( working: watch::Receiver, errors: mpsc::Sender, @@ -30,11 +55,13 @@ pub async fn worker( ) -> Result<(), CriticalError> { let mut last = Instant::now(); let mut set = Vec::new(); + let mut handler = + { working.borrow().action_handler.take() }.ok_or(CriticalError::MissingHandler)?; loop { let maxtime = working.borrow().throttle; match timeout(maxtime, events.recv()).await { - Err(_timeout) => {}, + Err(_timeout) => {} Ok(None) => break, Ok(Some(event)) => { set.push(event); @@ -46,7 +73,21 @@ pub async fn worker( } last = Instant::now(); - set.drain(..); // TODO: do action with the set + + let action = Action { + events: set.drain(..).collect(), + }; + debug!(?action, "action constructed"); + + if let Some(h) = working.borrow().action_handler.take() { + trace!("action handler updated"); + handler = h; + } + + let err = handler.handle(action).map_err(|e| rte("action worker", e)); + if let Err(err) = err { + errors.send(err).await?; + } } Ok(()) } diff --git a/lib/src/error.rs b/lib/src/error.rs index d3a247d..4150c70 100644 --- a/lib/src/error.rs +++ b/lib/src/error.rs @@ -33,6 +33,13 @@ pub enum CriticalError { #[error("main task join: {0}")] #[diagnostic(code(watchexec::critical::main_task_join))] MainTaskJoin(#[source] JoinError), + + /// Error received when a handler is missing on initialisation. + /// + /// This is a critical bug and unlikely to be recoverable in any way. + #[error("internal: missing handler on init")] + #[diagnostic(code(watchexec::critical::internal::missing_handler))] + MissingHandler, } /// Errors which _may_ be recoverable, transient, or only affect a part of the operation, and should @@ -112,15 +119,6 @@ pub enum RuntimeError { Handler { ctx: &'static str, err: String }, } -impl RuntimeError { - pub(crate) fn from_handler(ctx: &'static str, err: impl std::error::Error) -> Self { - Self::Handler { - ctx, - err: err.to_string(), - } - } -} - /// Errors occurring from reconfigs. #[derive(Debug, Diagnostic, Error)] #[non_exhaustive] diff --git a/lib/src/handler.rs b/lib/src/handler.rs index 7d8ff68..a3ec39f 100644 --- a/lib/src/handler.rs +++ b/lib/src/handler.rs @@ -88,12 +88,21 @@ use std::{error::Error, future::Future, io::Write, marker::PhantomData}; use tokio::runtime::Handle; +use crate::error::RuntimeError; + /// A callable that can be used to hook into watchexec. pub trait Handler { /// Call the handler with the given data. fn handle(&mut self, _data: T) -> Result<(), Box>; } +pub(crate) fn rte(ctx: &'static str, err: Box) -> RuntimeError { + RuntimeError::Handler { + ctx, + err: err.to_string(), + } +} + /// Wrapper for [`Handler`]s that are non-future [`FnMut`]s. /// /// Construct using [`Into::into`]: diff --git a/lib/src/watchexec.rs b/lib/src/watchexec.rs index 6586bf1..9ce5626 100644 --- a/lib/src/watchexec.rs +++ b/lib/src/watchexec.rs @@ -19,7 +19,7 @@ use crate::{ config::{InitConfig, RuntimeConfig}, error::{CriticalError, ReconfigError, RuntimeError}, fs, - handler::Handler, + handler::{rte, Handler}, signal, }; @@ -130,10 +130,7 @@ async fn error_hook( if let Err(err) = handler.handle(err) { error!(%err, "error while handling error"); handler - .handle(RuntimeError::Handler { - ctx: "error hook", - err: err.to_string(), - }) + .handle(rte("error hook", err)) .unwrap_or_else(|err| { error!(%err, "error while handling error of handling error"); });