From eb4f2ce201ab7074b00baebeb7d5dd0e748c6bcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Wed, 20 Dec 2023 00:22:59 +1300 Subject: [PATCH] Fix queueing behaviour (#734) --- bin/dates.mjs | 10 ++++++++ crates/cli/src/config.rs | 40 ++++++++++++++++++++++-------- crates/supervisor/CHANGELOG.md | 3 +++ crates/supervisor/src/job/state.rs | 4 +-- crates/supervisor/src/job/task.rs | 30 ++++++++++++---------- 5 files changed, 61 insertions(+), 26 deletions(-) create mode 100755 bin/dates.mjs diff --git a/bin/dates.mjs b/bin/dates.mjs new file mode 100755 index 0000000..a641019 --- /dev/null +++ b/bin/dates.mjs @@ -0,0 +1,10 @@ +#!/usr/bin/env node + +const id = Math.floor(Math.random() * 100); +let n = 0; +const m = 5; +while (n < m) { + n += 1; + console.log(`[${id} : ${n}/${m}] ${new Date}`); + await new Promise(done => setTimeout(done, 2000)); +} diff --git a/crates/cli/src/config.rs b/crates/cli/src/config.rs index 4035323..6ac34f5 100644 --- a/crates/cli/src/config.rs +++ b/crates/cli/src/config.rs @@ -8,7 +8,7 @@ use std::{ path::Path, process::Stdio, sync::{ - atomic::{AtomicU8, Ordering}, + atomic::{AtomicBool, AtomicU8, Ordering}, Arc, }, time::Duration, @@ -199,12 +199,14 @@ pub fn make_config(args: &Args, state: &State) -> Result { .collect(), ); + let queued = Arc::new(AtomicBool::new(false)); let quit_again = Arc::new(AtomicU8::new(0)); config.on_action_async(move |mut action| { let add_envs = add_envs.clone(); let command = command.clone(); let emit_file = emit_file.clone(); + let queued = queued.clone(); let quit_again = quit_again.clone(); let signal_map = signal_map.clone(); let workdir = workdir.clone(); @@ -215,6 +217,7 @@ pub fn make_config(args: &Args, state: &State) -> Result { let add_envs = add_envs.clone(); let command = command.clone(); let emit_file = emit_file.clone(); + let queued = queued.clone(); let quit_again = quit_again.clone(); let signal_map = signal_map.clone(); let workdir = workdir.clone(); @@ -409,17 +412,32 @@ pub fn make_config(args: &Args, state: &State) -> Result { } OnBusyUpdate::Queue => { let job = job.clone(); - tokio::spawn(async move { - job.to_wait().await; - job.start(); - job.run(move |context| { - setup_process( - innerjob.clone(), - context.command.clone(), - outflags, - ) + let already_queued = + queued.fetch_or(true, Ordering::SeqCst); + if already_queued { + debug!("next start is already queued, do nothing"); + } else { + debug!("queueing next start of job"); + tokio::spawn({ + let queued = queued.clone(); + async move { + trace!("waiting for job to finish"); + job.to_wait().await; + trace!("job finished, starting queued"); + job.start(); + job.run(move |context| { + setup_process( + innerjob.clone(), + context.command.clone(), + outflags, + ) + }) + .await; + trace!("resetting queued state"); + queued.store(false, Ordering::SeqCst); + } }); - }); + } } } } else { diff --git a/crates/supervisor/CHANGELOG.md b/crates/supervisor/CHANGELOG.md index 13b013d..3564a02 100644 --- a/crates/supervisor/CHANGELOG.md +++ b/crates/supervisor/CHANGELOG.md @@ -2,6 +2,9 @@ ## Next (YYYY-MM-DD) +- Fix Start executing even when the job is running. +- Add kill-on-drop to guarantee no two processes run at the same time. + ## v1.0.2 (2023-12-09) - Add `trace`-level logging to Job task. diff --git a/crates/supervisor/src/job/state.rs b/crates/supervisor/src/job/state.rs index 7174f26..d774449 100644 --- a/crates/supervisor/src/job/state.rs +++ b/crates/supervisor/src/job/state.rs @@ -86,9 +86,9 @@ impl CommandState { #[cfg(not(test))] let child = if command.options.grouped { - ErasedChild::Grouped(spawnable.group().spawn()?) + ErasedChild::Grouped(spawnable.group().kill_on_drop(true).spawn()?) } else { - ErasedChild::Ungrouped(spawnable.spawn()?) + ErasedChild::Ungrouped(spawnable.kill_on_drop(true).spawn()?) }; *self = Self::Running { diff --git a/crates/supervisor/src/job/task.rs b/crates/supervisor/src/job/task.rs index 884c818..88519c6 100644 --- a/crates/supervisor/src/job/task.rs +++ b/crates/supervisor/src/job/task.rs @@ -136,19 +136,23 @@ pub fn start_job(command: Arc) -> (Job, JoinHandle<()>) { match control { Control::Start => { - let mut spawnable = command.to_spawnable(); - previous_run = Some(command_state.reset()); - spawn_hook - .call( - &mut spawnable, - &JobTaskContext { - command: command.clone(), - current: &command_state, - previous: previous_run.as_ref(), - }, - ) - .await; - try_with_handler!(command_state.spawn(command.clone(), spawnable).await); + if command_state.is_running() { + trace!("child is running, skip"); + } else { + let mut spawnable = command.to_spawnable(); + previous_run = Some(command_state.reset()); + spawn_hook + .call( + &mut spawnable, + &JobTaskContext { + command: command.clone(), + current: &command_state, + previous: previous_run.as_ref(), + }, + ) + .await; + try_with_handler!(command_state.spawn(command.clone(), spawnable).await); + } } Control::Stop => { if let CommandState::Running { child, started, .. } = &mut command_state {