Only run one outcome worker at a time

This commit is contained in:
Félix Saparelli 2022-01-31 00:05:43 +13:00
parent 2a9ee4de0b
commit d7a305fc4c
No known key found for this signature in database
GPG Key ID: B948C4BAE44FC474
2 changed files with 67 additions and 30 deletions

View File

@ -1,6 +1,10 @@
use std::sync::Arc;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use clearscreen::ClearScreen;
use futures::Future;
use tokio::{
spawn,
sync::{mpsc, watch::Receiver},
@ -16,57 +20,92 @@ pub struct OutcomeWorker {
events: Arc<[Event]>,
working: Receiver<WorkingData>,
process: ProcessHolder,
gen: usize,
gencheck: Arc<AtomicUsize>,
errors_c: mpsc::Sender<RuntimeError>,
events_c: mpsc::Sender<Event>,
}
impl OutcomeWorker {
pub fn newgen() -> Arc<AtomicUsize> {
Default::default()
}
pub fn spawn(
outcome: Outcome,
events: Arc<[Event]>,
working: Receiver<WorkingData>,
process: ProcessHolder,
gencheck: Arc<AtomicUsize>,
errors_c: mpsc::Sender<RuntimeError>,
events_c: mpsc::Sender<Event>,
) {
let gen = gencheck.fetch_add(1, Ordering::SeqCst).wrapping_add(1);
let this = Self {
events,
working,
process,
gen,
gencheck,
errors_c,
events_c,
};
debug!(?outcome, "spawning outcome worker");
debug!(?outcome, %gen, "spawning outcome worker");
spawn(async move {
let errors_c = this.errors_c.clone();
if let Err(err) = this.apply(outcome.clone()).await {
if matches!(err, RuntimeError::Exit) {
debug!("propagating graceful exit");
} else {
error!(?err, "outcome applier errored");
}
match this.apply(outcome.clone()).await {
Err(err) => {
if matches!(err, RuntimeError::Exit) {
debug!(%gen, "propagating graceful exit");
} else {
error!(?err, %gen, "outcome applier errored");
}
if let Err(err) = errors_c.send(err).await {
error!(?err, "failed to send an error, something is terribly wrong");
if let Err(err) = errors_c.send(err).await {
error!(?err, %gen, "failed to send an error, something is terribly wrong");
}
}
Ok(_) => {
debug!(?outcome, %gen, "outcome worker finished");
}
} else {
debug!(?outcome, "outcome worker finished");
}
});
}
async fn check_gen<O>(&self, f: impl Future<Output = O>) -> Option<O> {
// TODO: use a select and a notifier of some kind so it cancels tasks
if self.gencheck.load(Ordering::SeqCst) != self.gen {
warn!(when=%"pre", gen=%self.gen, "outcome worker was cycled, aborting");
return None;
}
let o = f.await;
if self.gencheck.load(Ordering::SeqCst) != self.gen {
warn!(when=%"post", gen=%self.gen, "outcome worker was cycled, aborting");
return None;
}
Some(o)
}
#[async_recursion::async_recursion]
async fn apply(&self, outcome: Outcome) -> Result<(), RuntimeError> {
match (self.process.is_some().await, outcome) {
macro_rules! notry {
($e:expr) => {
match self.check_gen($e).await {
None => return Ok(()),
Some(o) => o,
}
};
}
match (notry!(self.process.is_some()), outcome) {
(_, Outcome::DoNothing) => {}
(_, Outcome::Exit) => {
return Err(RuntimeError::Exit);
}
(true, Outcome::Stop) => {
self.process.kill().await;
self.process.wait().await?;
self.process.drop_inner().await;
notry!(self.process.kill());
notry!(self.process.wait())?;
notry!(self.process.drop_inner());
}
(false, o @ Outcome::Stop)
| (false, o @ Outcome::Wait)
@ -93,9 +132,7 @@ impl OutcomeWorker {
PreSpawn::new(command, cmd.clone(), self.events.clone());
debug!("running pre-spawn handler");
pre_spawn_handler
.call(pre_spawn)
.await
notry!(pre_spawn_handler.call(pre_spawn))
.map_err(|e| rte("action pre-spawn", e))?;
let mut command = Arc::try_unwrap(command)
@ -117,21 +154,19 @@ impl OutcomeWorker {
id: sup.id(),
grouped,
};
post_spawn_handler
.call(post_spawn)
.await
notry!(post_spawn_handler.call(post_spawn))
.map_err(|e| rte("action post-spawn", e))?;
self.process.replace(sup).await;
notry!(self.process.replace(sup));
}
}
(true, Outcome::Signal(sig)) => {
self.process.signal(sig).await;
notry!(self.process.signal(sig));
}
(true, Outcome::Wait) => {
self.process.wait().await?;
notry!(self.process.wait())?;
}
(_, Outcome::Clear) => {
@ -151,21 +186,21 @@ impl OutcomeWorker {
}
(true, Outcome::IfRunning(then, _)) => {
self.apply(*then).await?;
notry!(self.apply(*then))?;
}
(false, Outcome::IfRunning(_, otherwise)) => {
self.apply(*otherwise).await?;
notry!(self.apply(*otherwise))?;
}
(_, Outcome::Both(one, two)) => {
if let Err(err) = self.apply(*one).await {
if let Err(err) = notry!(self.apply(*one)) {
debug!(
"first outcome failed, sending an error but proceeding to the second anyway"
);
self.errors_c.send(err).await.ok();
notry!(self.errors_c.send(err)).ok();
}
self.apply(*two).await?;
notry!(self.apply(*two))?;
}
}

View File

@ -34,6 +34,7 @@ pub async fn worker(
let mut last = Instant::now();
let mut set = Vec::new();
let process = ProcessHolder::default();
let outcome_gen = OutcomeWorker::newgen();
loop {
let maxtime = if set.is_empty() {
@ -133,6 +134,7 @@ pub async fn worker(
events,
working.clone(),
process.clone(),
outcome_gen.clone(),
errors.clone(),
events_tx.clone(),
);