Add action worker

This commit is contained in:
Félix Saparelli 2021-08-21 04:43:55 +12:00
parent dcde429787
commit 0f37e42243
No known key found for this signature in database
GPG Key ID: B948C4BAE44FC474
5 changed files with 54 additions and 2 deletions

32
lib/src/action.rs Normal file
View File

@ -0,0 +1,32 @@
//! Processor responsible for receiving events, filtering them, and scheduling actions in response.
use std::time::Duration;
use tokio::sync::{mpsc, watch};
use crate::{
error::{CriticalError, RuntimeError},
event::Event,
};
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct WorkingData {
pub throttle: Duration,
}
impl Default for WorkingData {
fn default() -> Self {
Self {
throttle: Duration::from_millis(100),
}
}
}
pub async fn worker(
mut working: watch::Receiver<WorkingData>,
errors: mpsc::Sender<RuntimeError>,
events: mpsc::Receiver<Event>,
) -> Result<(), CriticalError> {
Ok(())
}

View File

@ -16,6 +16,13 @@ pub struct Config {
#[builder(default)]
pub fs: crate::fs::WorkingData,
/// Working data for the action processing.
///
/// This is the task responsible for scheduling the actions in response to events, applying the
/// filtering, etc.
#[builder(default)]
pub action: crate::action::WorkingData,
/// Internal: the buffer size of the channel which carries runtime errors.
///
/// The default (64) is usually fine. If you expect a much larger throughput of runtime errors,

View File

@ -10,6 +10,7 @@ use tokio::{
};
use crate::{
action,
event::Event,
fs::{self, Watcher},
};
@ -124,7 +125,12 @@ impl RuntimeError {
#[derive(Debug, Diagnostic, Error)]
#[non_exhaustive]
pub enum ReconfigError {
/// Error received when the fs watcher internal state cannot be updated.
/// Error received when the action processor cannot be updated.
#[error("reconfig: action watch: {0}")]
#[diagnostic(code(watchexec::reconfig::action_watch))]
ActionWatch(#[from] watch::error::SendError<action::WorkingData>),
/// Error received when the fs event source cannot be updated.
#[error("reconfig: fs watch: {0}")]
#[diagnostic(code(watchexec::reconfig::fs_watch))]
FsWatch(#[from] watch::error::SendError<fs::WorkingData>),

View File

@ -53,6 +53,7 @@
#![forbid(unsafe_code)]
// the toolkit to make your own
pub mod action;
pub mod command;
pub mod error;
pub mod event;

View File

@ -10,6 +10,7 @@ use tokio::{
};
use crate::{
action,
config::Config,
error::{CriticalError, ReconfigError},
fs, signal,
@ -19,6 +20,7 @@ use crate::{
pub struct Watchexec {
handle: Arc<AtomicTake<JoinHandle<Result<(), CriticalError>>>>,
start_lock: Arc<Notify>,
action_watch: watch::Sender<action::WorkingData>,
fs_watch: watch::Sender<fs::WorkingData>,
}
@ -29,6 +31,7 @@ impl Watchexec {
/// directly if needed.
pub fn new(mut config: Config) -> Result<Arc<Self>, CriticalError> {
let (fs_s, fs_r) = watch::channel(take(&mut config.fs));
let (ac_s, ac_r) = watch::channel(take(&mut config.action));
let notify = Arc::new(Notify::new());
let start_lock = notify.clone();
@ -44,20 +47,23 @@ impl Watchexec {
};
}
let action = subtask!(action::worker(ac_r, er_s.clone(), ev_r));
let fs = subtask!(fs::worker(fs_r, er_s.clone(), ev_s.clone()));
let signal = subtask!(signal::worker(er_s.clone(), ev_s.clone()));
try_join!(fs, signal).map(drop)
try_join!(action, fs, signal).map(drop)
});
Ok(Arc::new(Self {
handle: Arc::new(AtomicTake::new(handle)),
start_lock,
action_watch: ac_s,
fs_watch: fs_s,
}))
}
pub fn reconfig(&self, config: Config) -> Result<(), ReconfigError> {
self.action_watch.send(config.action)?;
self.fs_watch.send(config.fs)?;
Ok(())
}