From 22f081a47bf091463371f3117bd36bb13f88030e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Sat, 29 Jan 2022 02:02:44 +1300 Subject: [PATCH] Spawn outcome apply separately from action loop --- lib/src/action.rs | 303 ++++++++++++++++++++++------------------------ 1 file changed, 143 insertions(+), 160 deletions(-) diff --git a/lib/src/action.rs b/lib/src/action.rs index 2a67cc2..fb0ce18 100644 --- a/lib/src/action.rs +++ b/lib/src/action.rs @@ -15,7 +15,7 @@ use tokio::{ }, time::timeout, }; -use tracing::{debug, trace, warn}; +use tracing::{debug, error, trace, warn}; use crate::{ command::Supervisor, @@ -141,179 +141,161 @@ pub async fn worker( let outcome = outcome.resolve(process.is_running().await); debug!(?outcome, "outcome resolved"); - let w = working.borrow().clone(); - let rerr = apply_outcome( - outcome, + ActionOutcome { events, - w, - &mut process, - &mut pre_spawn_handler, - &mut post_spawn_handler, - errors.clone(), - events_tx.clone(), - ) - .await; - if let Err(err) = rerr { - errors.send(err).await?; + working: working.clone(), + process: process.clone(), + errors_c: errors.clone(), + events_c: events_tx.clone(), } + .spawn(outcome); } debug!("action worker finished"); Ok(()) } -#[allow(clippy::too_many_arguments)] -#[async_recursion::async_recursion] -async fn apply_outcome( - outcome: Outcome, - events: Arc>, - working: WorkingData, - process: &mut Option, - pre_spawn_handler: &mut Box + Send>, - post_spawn_handler: &mut Box + Send>, +#[derive(Clone)] +struct ActionOutcome { + events: Arc>, // TODO: make this Arc<[Event]> + working: Receiver, + process: ProcessHolder, errors_c: mpsc::Sender, events_c: mpsc::Sender, -) -> Result<(), RuntimeError> { - trace!(?outcome, "applying outcome"); - match (process.as_mut(), outcome) { - (_, Outcome::DoNothing) => {} - (_, Outcome::Exit) => { - return Err(RuntimeError::Exit); - } - (Some(p), Outcome::Stop) => { - p.kill().await; - p.wait().await?; - *process = None; - } - (None, o @ Outcome::Stop) | (None, o @ Outcome::Wait) | (None, o @ Outcome::Signal(_)) => { - debug!(outcome=?o, "meaningless without a process, not doing anything"); - } - (_, Outcome::Start) => { - if working.command.is_empty() { - warn!("tried to start a command without anything to run"); +} + +impl ActionOutcome { + fn spawn(self, outcome: Outcome) { + debug!(?outcome, "spawning outcome applier"); + let this = self; + spawn(async move { + let errors_c = this.errors_c.clone(); + if let Err(err) = this.apply_outcome(outcome.clone()).await { + error!(?err, "outcome applier errored"); + if let Err(err) = errors_c.send(err).await { + error!(?err, "failed to send an error, something is terribly wrong"); + } } else { - let command = working.shell.to_command(&working.command); - let (pre_spawn, command) = - PreSpawn::new(command, working.command.clone(), events.clone()); - - debug!("running pre-spawn handler"); - pre_spawn_handler - .handle(pre_spawn) - .map_err(|e| rte("action pre-spawn", e))?; - - let mut command = Arc::try_unwrap(command) - .map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))? - .into_inner(); - - trace!("spawning supervisor for command"); - let sup = Supervisor::spawn( - errors_c.clone(), - events_c.clone(), - &mut command, - working.grouped, - )?; - - debug!("running post-spawn handler"); - let post_spawn = PostSpawn { - command: working.command.clone(), - events: events.clone(), - id: sup.id(), - grouped: working.grouped, - }; - post_spawn_handler - .handle(post_spawn) - .map_err(|e| rte("action post-spawn", e))?; - - // TODO: consider what we want to do for (previous) process if it's still running here? - *process = Some(sup); + debug!(?outcome, "outcome applier finished"); } - } - - (Some(p), Outcome::Signal(sig)) => { - p.signal(sig).await; - } - - (Some(p), Outcome::Wait) => { - p.wait().await?; - } - - (_, Outcome::Clear) => { - clearscreen::clear()?; - } - - (_, Outcome::Reset) => { - for cs in [ - ClearScreen::WindowsCooked, - ClearScreen::WindowsVt, - ClearScreen::VtLeaveAlt, - ClearScreen::VtWellDone, - ClearScreen::default(), - ] { - cs.clear()?; - } - } - - (Some(_), Outcome::IfRunning(then, _)) => { - apply_outcome( - *then, - events.clone(), - working, - process, - pre_spawn_handler, - post_spawn_handler, - errors_c, - events_c, - ) - .await?; - } - (None, Outcome::IfRunning(_, otherwise)) => { - apply_outcome( - *otherwise, - events.clone(), - working, - process, - pre_spawn_handler, - post_spawn_handler, - errors_c, - events_c, - ) - .await?; - } - - (_, Outcome::Both(one, two)) => { - if let Err(err) = apply_outcome( - *one, - events.clone(), - working.clone(), - process, - pre_spawn_handler, - post_spawn_handler, - errors_c.clone(), - events_c.clone(), - ) - .await - { - debug!( - "first outcome failed, sending an error but proceeding to the second anyway" - ); - errors_c.send(err).await.ok(); - } - - apply_outcome( - *two, - events.clone(), - working, - process, - pre_spawn_handler, - post_spawn_handler, - errors_c, - events_c, - ) - .await?; - } + }); } - Ok(()) + #[async_recursion::async_recursion] + async fn apply_outcome(&self, outcome: Outcome) -> Result<(), RuntimeError> { + match (self.process.is_some().await, outcome) { + (_, Outcome::DoNothing) => {} + (_, Outcome::Exit) => { + return Err(RuntimeError::Exit); + } + (true, Outcome::Stop) => { + self.process.kill().await; + self.process.wait().await?; + self.process.drop_inner().await; + } + (false, o @ Outcome::Stop) + | (false, o @ Outcome::Wait) + | (false, o @ Outcome::Signal(_)) => { + debug!(outcome=?o, "meaningless without a process, not doing anything"); + } + (_, Outcome::Start) => { + let (cmd, shell, grouped, pre_spawn_handler, post_spawn_handler) = { + let wrk = self.working.borrow(); + ( + wrk.command.clone(), + wrk.shell.clone(), + wrk.grouped, + wrk.pre_spawn_handler.clone(), + wrk.post_spawn_handler.clone(), + ) + }; + + if cmd.is_empty() { + warn!("tried to start a command without anything to run"); + } else { + let command = shell.to_command(&cmd); + let (pre_spawn, command) = + PreSpawn::new(command, cmd.clone(), self.events.clone()); + + debug!("running pre-spawn handler"); + pre_spawn_handler + .call(pre_spawn) + .await + .map_err(|e| rte("action pre-spawn", e))?; + + let mut command = Arc::try_unwrap(command) + .map_err(|_| RuntimeError::HandlerLockHeld("pre-spawn"))? + .into_inner(); + + trace!("spawning supervisor for command"); + let sup = Supervisor::spawn( + self.errors_c.clone(), + self.events_c.clone(), + &mut command, + grouped, + )?; + + debug!("running post-spawn handler"); + let post_spawn = PostSpawn { + command: cmd.clone(), + events: self.events.clone(), + id: sup.id(), + grouped, + }; + post_spawn_handler + .call(post_spawn) + .await + .map_err(|e| rte("action post-spawn", e))?; + + self.process.replace(sup).await; + } + } + + (true, Outcome::Signal(sig)) => { + self.process.signal(sig).await; + } + + (true, Outcome::Wait) => { + self.process.wait().await?; + } + + (_, Outcome::Clear) => { + clearscreen::clear()?; + } + + (_, Outcome::Reset) => { + for cs in [ + ClearScreen::WindowsCooked, + ClearScreen::WindowsVt, + ClearScreen::VtLeaveAlt, + ClearScreen::VtWellDone, + ClearScreen::default(), + ] { + cs.clear()?; + } + } + + (true, Outcome::IfRunning(then, _)) => { + self.apply_outcome(*then).await?; + } + (false, Outcome::IfRunning(_, otherwise)) => { + self.apply_outcome(*otherwise).await?; + } + + (_, Outcome::Both(one, two)) => { + if let Err(err) = self.apply_outcome(*one).await { + debug!( + "first outcome failed, sending an error but proceeding to the second anyway" + ); + self.errors_c.send(err).await.ok(); + } + + self.apply_outcome(*two).await?; + } + } + + Ok(()) + } } #[derive(Clone, Debug, Default)] @@ -322,6 +304,7 @@ impl ProcessHolder { async fn is_running(&self) -> bool { self.0 .read() + .await .as_ref() .map(|p| p.is_running()) .unwrap_or(false)