Spawn outcome apply separately from action loop

This commit is contained in:
Félix Saparelli 2022-01-29 02:02:44 +13:00
parent 10b31a0269
commit 22f081a47b
1 changed files with 143 additions and 160 deletions

View File

@ -15,7 +15,7 @@ use tokio::{
}, },
time::timeout, time::timeout,
}; };
use tracing::{debug, trace, warn}; use tracing::{debug, error, trace, warn};
use crate::{ use crate::{
command::Supervisor, command::Supervisor,
@ -141,64 +141,86 @@ pub async fn worker(
let outcome = outcome.resolve(process.is_running().await); let outcome = outcome.resolve(process.is_running().await);
debug!(?outcome, "outcome resolved"); debug!(?outcome, "outcome resolved");
let w = working.borrow().clone(); ActionOutcome {
let rerr = apply_outcome(
outcome,
events, events,
w, working: working.clone(),
&mut process, process: process.clone(),
&mut pre_spawn_handler, errors_c: errors.clone(),
&mut post_spawn_handler, events_c: events_tx.clone(),
errors.clone(),
events_tx.clone(),
)
.await;
if let Err(err) = rerr {
errors.send(err).await?;
} }
.spawn(outcome);
} }
debug!("action worker finished"); debug!("action worker finished");
Ok(()) Ok(())
} }
#[allow(clippy::too_many_arguments)] #[derive(Clone)]
#[async_recursion::async_recursion] struct ActionOutcome {
async fn apply_outcome( events: Arc<Vec<Event>>, // TODO: make this Arc<[Event]>
outcome: Outcome, working: Receiver<WorkingData>,
events: Arc<Vec<Event>>, process: ProcessHolder,
working: WorkingData,
process: &mut Option<Supervisor>,
pre_spawn_handler: &mut Box<dyn Handler<PreSpawn> + Send>,
post_spawn_handler: &mut Box<dyn Handler<PostSpawn> + Send>,
errors_c: mpsc::Sender<RuntimeError>, errors_c: mpsc::Sender<RuntimeError>,
events_c: mpsc::Sender<Event>, events_c: mpsc::Sender<Event>,
) -> Result<(), RuntimeError> { }
trace!(?outcome, "applying outcome");
match (process.as_mut(), outcome) { impl ActionOutcome {
fn spawn(self, outcome: Outcome) {
debug!(?outcome, "spawning outcome applier");
let this = self;
spawn(async move {
let errors_c = this.errors_c.clone();
if let Err(err) = this.apply_outcome(outcome.clone()).await {
error!(?err, "outcome applier errored");
if let Err(err) = errors_c.send(err).await {
error!(?err, "failed to send an error, something is terribly wrong");
}
} else {
debug!(?outcome, "outcome applier finished");
}
});
}
#[async_recursion::async_recursion]
async fn apply_outcome(&self, outcome: Outcome) -> Result<(), RuntimeError> {
match (self.process.is_some().await, outcome) {
(_, Outcome::DoNothing) => {} (_, Outcome::DoNothing) => {}
(_, Outcome::Exit) => { (_, Outcome::Exit) => {
return Err(RuntimeError::Exit); return Err(RuntimeError::Exit);
} }
(Some(p), Outcome::Stop) => { (true, Outcome::Stop) => {
p.kill().await; self.process.kill().await;
p.wait().await?; self.process.wait().await?;
*process = None; self.process.drop_inner().await;
} }
(None, o @ Outcome::Stop) | (None, o @ Outcome::Wait) | (None, o @ Outcome::Signal(_)) => { (false, o @ Outcome::Stop)
| (false, o @ Outcome::Wait)
| (false, o @ Outcome::Signal(_)) => {
debug!(outcome=?o, "meaningless without a process, not doing anything"); debug!(outcome=?o, "meaningless without a process, not doing anything");
} }
(_, Outcome::Start) => { (_, Outcome::Start) => {
if working.command.is_empty() { let (cmd, shell, grouped, pre_spawn_handler, post_spawn_handler) = {
let wrk = self.working.borrow();
(
wrk.command.clone(),
wrk.shell.clone(),
wrk.grouped,
wrk.pre_spawn_handler.clone(),
wrk.post_spawn_handler.clone(),
)
};
if cmd.is_empty() {
warn!("tried to start a command without anything to run"); warn!("tried to start a command without anything to run");
} else { } else {
let command = working.shell.to_command(&working.command); let command = shell.to_command(&cmd);
let (pre_spawn, command) = let (pre_spawn, command) =
PreSpawn::new(command, working.command.clone(), events.clone()); PreSpawn::new(command, cmd.clone(), self.events.clone());
debug!("running pre-spawn handler"); debug!("running pre-spawn handler");
pre_spawn_handler pre_spawn_handler
.handle(pre_spawn) .call(pre_spawn)
.await
.map_err(|e| rte("action pre-spawn", e))?; .map_err(|e| rte("action pre-spawn", e))?;
let mut command = Arc::try_unwrap(command) let mut command = Arc::try_unwrap(command)
@ -207,34 +229,34 @@ async fn apply_outcome(
trace!("spawning supervisor for command"); trace!("spawning supervisor for command");
let sup = Supervisor::spawn( let sup = Supervisor::spawn(
errors_c.clone(), self.errors_c.clone(),
events_c.clone(), self.events_c.clone(),
&mut command, &mut command,
working.grouped, grouped,
)?; )?;
debug!("running post-spawn handler"); debug!("running post-spawn handler");
let post_spawn = PostSpawn { let post_spawn = PostSpawn {
command: working.command.clone(), command: cmd.clone(),
events: events.clone(), events: self.events.clone(),
id: sup.id(), id: sup.id(),
grouped: working.grouped, grouped,
}; };
post_spawn_handler post_spawn_handler
.handle(post_spawn) .call(post_spawn)
.await
.map_err(|e| rte("action post-spawn", e))?; .map_err(|e| rte("action post-spawn", e))?;
// TODO: consider what we want to do for (previous) process if it's still running here? self.process.replace(sup).await;
*process = Some(sup);
} }
} }
(Some(p), Outcome::Signal(sig)) => { (true, Outcome::Signal(sig)) => {
p.signal(sig).await; self.process.signal(sig).await;
} }
(Some(p), Outcome::Wait) => { (true, Outcome::Wait) => {
p.wait().await?; self.process.wait().await?;
} }
(_, Outcome::Clear) => { (_, Outcome::Clear) => {
@ -253,68 +275,28 @@ async fn apply_outcome(
} }
} }
(Some(_), Outcome::IfRunning(then, _)) => { (true, Outcome::IfRunning(then, _)) => {
apply_outcome( self.apply_outcome(*then).await?;
*then,
events.clone(),
working,
process,
pre_spawn_handler,
post_spawn_handler,
errors_c,
events_c,
)
.await?;
} }
(None, Outcome::IfRunning(_, otherwise)) => { (false, Outcome::IfRunning(_, otherwise)) => {
apply_outcome( self.apply_outcome(*otherwise).await?;
*otherwise,
events.clone(),
working,
process,
pre_spawn_handler,
post_spawn_handler,
errors_c,
events_c,
)
.await?;
} }
(_, Outcome::Both(one, two)) => { (_, Outcome::Both(one, two)) => {
if let Err(err) = apply_outcome( if let Err(err) = self.apply_outcome(*one).await {
*one,
events.clone(),
working.clone(),
process,
pre_spawn_handler,
post_spawn_handler,
errors_c.clone(),
events_c.clone(),
)
.await
{
debug!( debug!(
"first outcome failed, sending an error but proceeding to the second anyway" "first outcome failed, sending an error but proceeding to the second anyway"
); );
errors_c.send(err).await.ok(); self.errors_c.send(err).await.ok();
} }
apply_outcome( self.apply_outcome(*two).await?;
*two,
events.clone(),
working,
process,
pre_spawn_handler,
post_spawn_handler,
errors_c,
events_c,
)
.await?;
} }
} }
Ok(()) Ok(())
} }
}
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
struct ProcessHolder(Arc<RwLock<Option<Supervisor>>>); struct ProcessHolder(Arc<RwLock<Option<Supervisor>>>);
@ -322,6 +304,7 @@ impl ProcessHolder {
async fn is_running(&self) -> bool { async fn is_running(&self) -> bool {
self.0 self.0
.read() .read()
.await
.as_ref() .as_ref()
.map(|p| p.is_running()) .map(|p| p.is_running())
.unwrap_or(false) .unwrap_or(false)