2022-01-28 14:37:01 +01:00
|
|
|
use std::{
|
|
|
|
sync::Arc,
|
|
|
|
time::{Duration, Instant},
|
|
|
|
};
|
|
|
|
|
2022-06-11 08:43:11 +02:00
|
|
|
use async_priority_channel as priority;
|
2022-01-28 14:37:01 +01:00
|
|
|
use tokio::{
|
|
|
|
sync::{
|
|
|
|
mpsc,
|
|
|
|
watch::{self},
|
|
|
|
},
|
|
|
|
time::timeout,
|
|
|
|
};
|
|
|
|
use tracing::{debug, trace};
|
|
|
|
|
|
|
|
use crate::{
|
|
|
|
error::{CriticalError, RuntimeError},
|
2022-06-11 08:43:11 +02:00
|
|
|
event::{Event, Priority},
|
2022-01-28 14:37:01 +01:00
|
|
|
handler::rte,
|
|
|
|
};
|
|
|
|
|
|
|
|
use super::{outcome_worker::OutcomeWorker, process_holder::ProcessHolder, Action, WorkingData};
|
|
|
|
|
|
|
|
/// The main worker of a Watchexec process.
|
|
|
|
///
|
|
|
|
/// This is the main loop of the process. It receives events from the event channel, filters them,
|
|
|
|
/// debounces them, obtains the desired outcome of an actioned event, calls the appropriate handlers
|
|
|
|
/// and schedules processes as needed.
|
|
|
|
pub async fn worker(
|
|
|
|
working: watch::Receiver<WorkingData>,
|
|
|
|
errors: mpsc::Sender<RuntimeError>,
|
2022-06-11 08:43:11 +02:00
|
|
|
events_tx: priority::Sender<Event, Priority>,
|
|
|
|
events: priority::Receiver<Event, Priority>,
|
2022-01-28 14:37:01 +01:00
|
|
|
) -> Result<(), CriticalError> {
|
|
|
|
let mut last = Instant::now();
|
|
|
|
let mut set = Vec::new();
|
|
|
|
let process = ProcessHolder::default();
|
2022-01-30 12:05:43 +01:00
|
|
|
let outcome_gen = OutcomeWorker::newgen();
|
2022-01-28 14:37:01 +01:00
|
|
|
|
|
|
|
loop {
|
2022-06-11 08:43:11 +02:00
|
|
|
if events.is_closed() {
|
|
|
|
trace!("events channel closed, stopping");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
2022-01-28 14:37:01 +01:00
|
|
|
let maxtime = if set.is_empty() {
|
|
|
|
trace!("nothing in set, waiting forever for next event");
|
|
|
|
Duration::from_secs(u64::MAX)
|
|
|
|
} else {
|
|
|
|
working.borrow().throttle.saturating_sub(last.elapsed())
|
|
|
|
};
|
|
|
|
|
|
|
|
if maxtime.is_zero() {
|
|
|
|
if set.is_empty() {
|
|
|
|
trace!("out of throttle but nothing to do, resetting");
|
|
|
|
last = Instant::now();
|
|
|
|
continue;
|
|
|
|
}
|
2023-01-06 14:53:49 +01:00
|
|
|
|
|
|
|
trace!("out of throttle on recycle");
|
2022-01-28 14:37:01 +01:00
|
|
|
} else {
|
|
|
|
trace!(?maxtime, "waiting for event");
|
2022-06-11 08:43:11 +02:00
|
|
|
let maybe_event = timeout(maxtime, events.recv()).await;
|
|
|
|
if events.is_closed() {
|
|
|
|
trace!("events channel closed during timeout, stopping");
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
match maybe_event {
|
2022-01-28 14:37:01 +01:00
|
|
|
Err(_timeout) => {
|
|
|
|
trace!("timed out, cycling");
|
|
|
|
continue;
|
|
|
|
}
|
2022-06-11 08:43:11 +02:00
|
|
|
Ok(Err(_empty)) => break,
|
|
|
|
Ok(Ok((event, priority))) => {
|
|
|
|
trace!(?event, ?priority, "got event");
|
2022-01-28 14:37:01 +01:00
|
|
|
|
2022-06-11 08:43:11 +02:00
|
|
|
if priority == Priority::Urgent {
|
2023-03-03 16:34:15 +01:00
|
|
|
trace!("urgent event, by-passing filters");
|
2022-06-11 08:43:11 +02:00
|
|
|
} else if event.is_empty() {
|
2022-01-28 14:37:01 +01:00
|
|
|
trace!("empty event, by-passing filters");
|
|
|
|
} else {
|
2022-06-11 08:43:11 +02:00
|
|
|
let filtered = working.borrow().filterer.check_event(&event, priority);
|
2022-01-28 14:37:01 +01:00
|
|
|
match filtered {
|
|
|
|
Err(err) => {
|
|
|
|
trace!(%err, "filter errored on event");
|
|
|
|
errors.send(err).await?;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
Ok(false) => {
|
|
|
|
trace!("filter rejected event");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
Ok(true) => {
|
|
|
|
trace!("filter passed event");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if set.is_empty() {
|
|
|
|
trace!("event is the first, resetting throttle window");
|
|
|
|
last = Instant::now();
|
|
|
|
}
|
|
|
|
|
|
|
|
set.push(event);
|
|
|
|
|
|
|
|
let elapsed = last.elapsed();
|
|
|
|
if elapsed < working.borrow().throttle {
|
|
|
|
trace!(?elapsed, "still within throttle window, cycling");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
trace!("out of throttle, starting action process");
|
|
|
|
last = Instant::now();
|
|
|
|
|
2023-01-06 14:53:49 +01:00
|
|
|
#[allow(clippy::iter_with_drain)]
|
2022-01-28 14:37:01 +01:00
|
|
|
let events = Arc::from(set.drain(..).collect::<Vec<_>>().into_boxed_slice());
|
|
|
|
let action = Action::new(Arc::clone(&events));
|
|
|
|
debug!(?action, "action constructed");
|
|
|
|
|
|
|
|
debug!("running action handler");
|
|
|
|
let action_handler = {
|
|
|
|
let wrk = working.borrow();
|
|
|
|
wrk.action_handler.clone()
|
|
|
|
};
|
|
|
|
|
|
|
|
let outcome = action.outcome.clone();
|
|
|
|
let err = action_handler
|
|
|
|
.call(action)
|
|
|
|
.await
|
2023-01-06 14:53:49 +01:00
|
|
|
.map_err(|e| rte("action worker", e.as_ref()));
|
2022-01-28 14:37:01 +01:00
|
|
|
if let Err(err) = err {
|
|
|
|
errors.send(err).await?;
|
|
|
|
debug!("action handler errored, skipping");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
let outcome = outcome.get().cloned().unwrap_or_default();
|
|
|
|
debug!(?outcome, "handler finished");
|
|
|
|
|
|
|
|
let outcome = outcome.resolve(process.is_running().await);
|
|
|
|
debug!(?outcome, "outcome resolved");
|
|
|
|
|
|
|
|
OutcomeWorker::spawn(
|
|
|
|
outcome,
|
|
|
|
events,
|
|
|
|
working.clone(),
|
|
|
|
process.clone(),
|
2022-01-30 12:05:43 +01:00
|
|
|
outcome_gen.clone(),
|
2022-01-28 14:37:01 +01:00
|
|
|
errors.clone(),
|
|
|
|
events_tx.clone(),
|
|
|
|
);
|
2022-01-28 16:13:08 +01:00
|
|
|
debug!("action process done");
|
2022-01-28 14:37:01 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
debug!("action worker finished");
|
|
|
|
Ok(())
|
|
|
|
}
|