From 75b2c4b4aed494cc7552618d8064d1a007cc8aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?F=C3=A9lix=20Saparelli?= Date: Sat, 20 Apr 2024 16:58:17 +1200 Subject: [PATCH] Adapt supervisor to process-wrap (#815) --- Cargo.lock | 205 +++++++++++++------ crates/cli/Cargo.toml | 4 - crates/cli/src/config.rs | 10 +- crates/lib/CHANGELOG.md | 2 + crates/lib/Cargo.toml | 6 +- crates/lib/README.md | 2 +- crates/lib/examples/readme.rs | 2 +- crates/lib/src/watchexec.rs | 3 +- crates/supervisor/CHANGELOG.md | 2 + crates/supervisor/Cargo.toml | 6 +- crates/supervisor/src/command.rs | 8 + crates/supervisor/src/command/conversions.rs | 41 ++-- crates/supervisor/src/job/job.rs | 17 +- crates/supervisor/src/job/state.rs | 16 +- crates/supervisor/src/job/task.rs | 25 +-- crates/supervisor/src/job/test.rs | 13 +- crates/supervisor/src/job/testchild.rs | 85 ++++---- crates/supervisor/tests/programs.rs | 162 ++++++++------- 18 files changed, 367 insertions(+), 242 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0301155..b5231a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -564,6 +564,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "chrono" version = "0.4.31" @@ -673,30 +679,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "command-group" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5080df6b0f0ecb76cab30808f00d937ba725cebe266a3da8cd89dff92f2a9916" -dependencies = [ - "async-trait", - "nix 0.26.4", - "tokio", - "winapi", -] - -[[package]] -name = "command-group" -version = "5.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a68fa787550392a9d58f44c21a3022cfb3ea3e2458b7f85d3b399d0ceeccf409" -dependencies = [ - "async-trait", - "nix 0.27.1", - "tokio", - "winapi", -] - [[package]] name = "concurrent-queue" version = "2.4.0" @@ -1758,7 +1740,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.1.0", + "indexmap 2.2.6", "slab", "tokio", "tokio-util", @@ -1874,7 +1856,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -1904,7 +1886,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core 0.52.0", + "windows-core 0.53.0", ] [[package]] @@ -1972,9 +1954,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -2098,9 +2080,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.151" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libm" @@ -2351,6 +2333,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" +dependencies = [ + "bitflags 2.4.1", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -2758,6 +2752,20 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "process-wrap" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df785e1a8a4f462ab97abb49e1718d8ee173f4fc37c29dde7d0487b6538c3a82" +dependencies = [ + "futures", + "indexmap 2.2.6", + "nix 0.28.0", + "tokio", + "tracing", + "windows 0.56.0", +] + [[package]] name = "prodash" version = "26.2.2" @@ -3456,9 +3464,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.35.1" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", @@ -3546,7 +3554,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.6", "toml_datetime", "winnow", ] @@ -3557,7 +3565,7 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" dependencies = [ - "indexmap 2.1.0", + "indexmap 2.2.6", "serde", "serde_spanned", "toml_datetime", @@ -3992,7 +4000,6 @@ dependencies = [ "async-priority-channel", "async-recursion", "atomic-take", - "command-group 5.0.1", "futures", "ignore-files", "miette", @@ -4000,6 +4007,7 @@ dependencies = [ "normalize-path", "notify", "once_cell", + "process-wrap", "project-origins", "thiserror", "tokio", @@ -4022,7 +4030,6 @@ dependencies = [ "clap_complete_nushell", "clap_mangen", "clearscreen", - "command-group 2.1.0", "console-subscriber", "dirs 5.0.1", "embed-resource", @@ -4135,9 +4142,9 @@ name = "watchexec-supervisor" version = "1.0.3" dependencies = [ "boxcar", - "command-group 5.0.1", "futures", "nix 0.27.1", + "process-wrap", "tokio", "tracing", "watchexec-events", @@ -4217,7 +4224,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ "windows-core 0.52.0", - "windows-targets 0.52.0", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1de69df01bdf1ead2f4ac895dc77c9351aefff65b2f3db429a343f9cbf05e132" +dependencies = [ + "windows-core 0.56.0", + "windows-targets 0.52.5", ] [[package]] @@ -4235,7 +4252,60 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-core" +version = "0.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dcc5b895a6377f1ab9fa55acedab1fd5ac0db66ad1e6c7f47e28a22e446a5dd" +dependencies = [ + "windows-result", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-core" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4698e52ed2d08f8658ab0c39512a7c00ee5fe2688c65f8c0a4f06750d729f2a6" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-result", + "windows-targets 0.52.5", +] + +[[package]] +name = "windows-implement" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6fc35f58ecd95a9b71c4f2329b911016e6bec66b3f2e6a4aad86bd2e99e2f9b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.47", +] + +[[package]] +name = "windows-interface" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08990546bf4edef8f431fa6326e032865f27138718c587dc21bc0265bbcb57cc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.47", +] + +[[package]] +name = "windows-result" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "749f0da9cc72d82e600d8d2e44cadd0b9eedb9038f71a1c58556ac1c5791813b" +dependencies = [ + "windows-targets 0.52.5", ] [[package]] @@ -4253,7 +4323,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.0", + "windows-targets 0.52.5", ] [[package]] @@ -4273,17 +4343,18 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.52.0" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +checksum = "6f0713a46559409d202e70e28227288446bf7841d3211583a4b53e3f6d96e7eb" dependencies = [ - "windows_aarch64_gnullvm 0.52.0", - "windows_aarch64_msvc 0.52.0", - "windows_i686_gnu 0.52.0", - "windows_i686_msvc 0.52.0", - "windows_x86_64_gnu 0.52.0", - "windows_x86_64_gnullvm 0.52.0", - "windows_x86_64_msvc 0.52.0", + "windows_aarch64_gnullvm 0.52.5", + "windows_aarch64_msvc 0.52.5", + "windows_i686_gnu 0.52.5", + "windows_i686_gnullvm", + "windows_i686_msvc 0.52.5", + "windows_x86_64_gnu 0.52.5", + "windows_x86_64_gnullvm 0.52.5", + "windows_x86_64_msvc 0.52.5", ] [[package]] @@ -4294,9 +4365,9 @@ checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.52.0" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +checksum = "7088eed71e8b8dda258ecc8bac5fb1153c5cffaf2578fc8ff5d61e23578d3263" [[package]] name = "windows_aarch64_msvc" @@ -4306,9 +4377,9 @@ checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_aarch64_msvc" -version = "0.52.0" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +checksum = "9985fd1504e250c615ca5f281c3f7a6da76213ebd5ccc9561496568a2752afb6" [[package]] name = "windows_i686_gnu" @@ -4318,9 +4389,15 @@ checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_gnu" -version = "0.52.0" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +checksum = "88ba073cf16d5372720ec942a8ccbf61626074c6d4dd2e745299726ce8b89670" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87f4261229030a858f36b459e748ae97545d6f1ec60e5e0d6a3d32e0dc232ee9" [[package]] name = "windows_i686_msvc" @@ -4330,9 +4407,9 @@ checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_i686_msvc" -version = "0.52.0" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +checksum = "db3c2bf3d13d5b658be73463284eaf12830ac9a26a90c717b7f771dfe97487bf" [[package]] name = "windows_x86_64_gnu" @@ -4342,9 +4419,9 @@ checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" [[package]] name = "windows_x86_64_gnu" -version = "0.52.0" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +checksum = "4e4246f76bdeff09eb48875a0fd3e2af6aada79d409d33011886d3e1581517d9" [[package]] name = "windows_x86_64_gnullvm" @@ -4354,9 +4431,9 @@ checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_gnullvm" -version = "0.52.0" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +checksum = "852298e482cd67c356ddd9570386e2862b5673c85bd5f88df9ab6802b334c596" [[package]] name = "windows_x86_64_msvc" @@ -4366,9 +4443,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "windows_x86_64_msvc" -version = "0.52.0" +version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" [[package]] name = "winnow" diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 28b496a..da309dd 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -42,10 +42,6 @@ tracing-test = "0.1" uuid = { workspace = true, features = [ "v4", "fast-rng" ] } rand = { workspace = true } -[dependencies.command-group] -version = "2.1.0" -features = ["with-tokio"] - [dependencies.clap] version = "4.4.7" features = ["cargo", "derive", "env", "wrap_help"] diff --git a/crates/cli/src/config.rs b/crates/cli/src/config.rs index 6ac34f5..90668cd 100644 --- a/crates/cli/src/config.rs +++ b/crates/cli/src/config.rs @@ -232,10 +232,16 @@ pub fn make_config(args: &Args, state: &State) -> Result { if let Some(ref workdir) = workdir.as_ref() { debug!(?workdir, "set command workdir"); - command.current_dir(workdir); + command.command_mut().current_dir(workdir); } - emit_events_to_command(command, events, emit_file, emit_events_to, add_envs); + emit_events_to_command( + command.command_mut(), + events, + emit_file, + emit_events_to, + add_envs, + ); }); let show_events = || { diff --git a/crates/lib/CHANGELOG.md b/crates/lib/CHANGELOG.md index 0aba80b..ecd7971 100644 --- a/crates/lib/CHANGELOG.md +++ b/crates/lib/CHANGELOG.md @@ -2,6 +2,8 @@ ## Next (YYYY-MM-DD) +- Deps: replace command-group with process-wrap (in supervisor, but has flow-on effects) + ## v3.0.1 (2023-11-29) - Deps: watchexec-events and watchexec-signals after major bump and yank diff --git a/crates/lib/Cargo.toml b/crates/lib/Cargo.toml index fca4369..6ed0b7e 100644 --- a/crates/lib/Cargo.toml +++ b/crates/lib/Cargo.toml @@ -26,9 +26,9 @@ once_cell = "1.8.0" thiserror = "1.0.44" normalize-path = "0.2.0" -[dependencies.command-group] -version = "5.0.1" -features = ["with-tokio"] +[dependencies.process-wrap] +version = "8.0.0" +features = ["tokio1"] [dependencies.watchexec-events] version = "2.0.1" diff --git a/crates/lib/README.md b/crates/lib/README.md index 08e8125..7a043e4 100644 --- a/crates/lib/README.md +++ b/crates/lib/README.md @@ -68,7 +68,7 @@ async fn main() -> Result<()> { job.set_spawn_hook(|cmd, _| { use nix::sys::signal::{sigprocmask, SigSet, SigmaskHow, Signal}; unsafe { - cmd.pre_exec(|| { + cmd.command_mut().pre_exec(|| { let mut newset = SigSet::empty(); newset.add(Signal::SIGINT); sigprocmask(SigmaskHow::SIG_BLOCK, Some(&newset), None)?; diff --git a/crates/lib/examples/readme.rs b/crates/lib/examples/readme.rs index 31e68b0..50b80de 100644 --- a/crates/lib/examples/readme.rs +++ b/crates/lib/examples/readme.rs @@ -47,7 +47,7 @@ async fn main() -> Result<()> { job.set_spawn_hook(|cmd, _| { use nix::sys::signal::{sigprocmask, SigSet, SigmaskHow, Signal}; unsafe { - cmd.pre_exec(|| { + cmd.command_mut().pre_exec(|| { let mut newset = SigSet::empty(); newset.add(Signal::SIGINT); sigprocmask(SigmaskHow::SIG_BLOCK, Some(&newset), None)?; diff --git a/crates/lib/src/watchexec.rs b/crates/lib/src/watchexec.rs index 6654443..1f9caeb 100644 --- a/crates/lib/src/watchexec.rs +++ b/crates/lib/src/watchexec.rs @@ -140,7 +140,8 @@ impl Watchexec { let notify = Arc::new(Notify::new()); let start_lock = notify.clone(); - let (ev_s, ev_r) = priority::bounded(config.event_channel_size.try_into().unwrap_or(u64::MAX)); + let (ev_s, ev_r) = + priority::bounded(config.event_channel_size.try_into().unwrap_or(u64::MAX)); let event_input = ev_s.clone(); trace!("creating main task"); diff --git a/crates/supervisor/CHANGELOG.md b/crates/supervisor/CHANGELOG.md index 47cfb92..6535187 100644 --- a/crates/supervisor/CHANGELOG.md +++ b/crates/supervisor/CHANGELOG.md @@ -2,6 +2,8 @@ ## Next (YYYY-MM-DD) +- Deps: replace command-group with process-wrap + ## v1.0.3 (2023-12-19) - Fix Start executing even when the job is running. diff --git a/crates/supervisor/Cargo.toml b/crates/supervisor/Cargo.toml index 30c910e..6a6be77 100644 --- a/crates/supervisor/Cargo.toml +++ b/crates/supervisor/Cargo.toml @@ -18,9 +18,9 @@ edition = "2021" futures = "0.3.29" tracing = "0.1.40" -[dependencies.command-group] -version = "5.0.1" -features = ["with-tokio"] +[dependencies.process-wrap] +version = "8.0.0" +features = ["reset-sigmask", "tokio1"] [dependencies.tokio] version = "1.33.0" diff --git a/crates/supervisor/src/command.rs b/crates/supervisor/src/command.rs index a5bef6a..d396cce 100644 --- a/crates/supervisor/src/command.rs +++ b/crates/supervisor/src/command.rs @@ -61,6 +61,14 @@ pub struct SpawnOptions { /// [Job Objects]: https://en.wikipedia.org/wiki/Object_Manager_(Windows) pub grouped: bool, + /// Run the program in a new session. + /// + /// This will use Unix [sessions]. On Windows, this is not supported. This + /// implies `grouped: true`. + /// + /// [sessions]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/setsid.html + pub session: bool, + /// Reset the signal mask of the process before we spawn it. /// /// By default, the signal mask of the process is inherited from the parent process. This means diff --git a/crates/supervisor/src/command/conversions.rs b/crates/supervisor/src/command/conversions.rs index 5ee4108..415cae8 100644 --- a/crates/supervisor/src/command/conversions.rs +++ b/crates/supervisor/src/command/conversions.rs @@ -1,17 +1,17 @@ use std::fmt; +use process_wrap::tokio::{KillOnDrop, TokioCommandWrap}; use tokio::process::Command as TokioCommand; use tracing::trace; -use super::{Command, Program}; +use super::{Command, Program, SpawnOptions}; impl Command { - /// Obtain a [`tokio::process::Command`]. - pub fn to_spawnable(&self) -> TokioCommand { + /// Obtain a [`process_wrap::tokio::TokioCommandWrap`]. + pub fn to_spawnable(&self) -> TokioCommandWrap { trace!(program=?self.program, "constructing command"); - #[cfg_attr(not(unix), allow(unused_mut))] - let mut cmd = match &self.program { + let cmd = match &self.program { Program::Exec { prog, args, .. } => { let mut c = TokioCommand::new(prog); c.args(args); @@ -56,19 +56,28 @@ impl Command { } }; + let mut cmd = TokioCommandWrap::from(cmd); + cmd.wrap(KillOnDrop); + + match self.options { + #[cfg(unix)] + SpawnOptions { session: true, .. } => { + cmd.wrap(process_wrap::tokio::ProcessSession); + } + #[cfg(unix)] + SpawnOptions { grouped: true, .. } => { + cmd.wrap(process_wrap::tokio::ProcessGroup::leader()); + } + #[cfg(windows)] + SpawnOptions { grouped: true, .. } | SpawnOptions { session: true, .. } => { + cmd.wrap(process_wrap::tokio::JobObject); + } + _ => {} + } + #[cfg(unix)] if self.options.reset_sigmask { - use nix::sys::signal::{sigprocmask, SigSet, SigmaskHow}; - unsafe { - cmd.pre_exec(|| { - let mut oldset = SigSet::empty(); - let newset = SigSet::all(); - trace!(unblocking=?newset, "resetting process sigmask"); - sigprocmask(SigmaskHow::SIG_UNBLOCK, Some(&newset), Some(&mut oldset))?; - trace!(?oldset, "sigmask reset"); - Ok(()) - }); - } + cmd.wrap(process_wrap::tokio::ResetSigmask); } cmd diff --git a/crates/supervisor/src/job/job.rs b/crates/supervisor/src/job/job.rs index ccde571..52ffa4e 100644 --- a/crates/supervisor/src/job/job.rs +++ b/crates/supervisor/src/job/job.rs @@ -1,6 +1,6 @@ use std::{future::Future, sync::Arc, time::Duration}; -use tokio::process::Command as TokioCommand; +use process_wrap::tokio::TokioCommandWrap; use watchexec_signals::Signal; use crate::{command::Command, errors::SyncIoError, flag::Flag}; @@ -289,11 +289,11 @@ impl Job { /// Set the spawn hook. /// /// The hook will be called once per process spawned, before the process is spawned. It's given - /// a mutable reference to the [`tokio::process::Command`] and some context; it can modify the - /// command as it sees fit. + /// a mutable reference to the [`process_wrap::tokio::TokioCommandWrap`] and some context; it + /// can modify or further [wrap](process_wrap) the command as it sees fit. pub fn set_spawn_hook( &self, - fun: impl Fn(&mut TokioCommand, &JobTaskContext<'_>) + Send + Sync + 'static, + fun: impl Fn(&mut TokioCommandWrap, &JobTaskContext<'_>) + Send + Sync + 'static, ) -> Ticket { self.control(Control::SetSyncSpawnHook(Arc::new(fun))) } @@ -301,8 +301,8 @@ impl Job { /// Set the spawn hook (async version). /// /// The hook will be called once per process spawned, before the process is spawned. It's given - /// a mutable reference to the [`tokio::process::Command`] and some context; it can modify the - /// command as it sees fit. + /// a mutable reference to the [`process_wrap::tokio::TokioCommandWrap`] and some context; it + /// can modify or further [wrap](process_wrap) the command as it sees fit. /// /// A gotcha when using this method is that the future returned by the function can live longer /// than the references it was given, so you can't bring the command or context into the async @@ -313,7 +313,10 @@ impl Job { /// spawn hooks that can't be done in the simpler sync version. pub fn set_spawn_async_hook( &self, - fun: impl (Fn(&mut TokioCommand, &JobTaskContext<'_>) -> Box + Send + Sync>) + fun: impl (Fn( + &mut TokioCommandWrap, + &JobTaskContext<'_>, + ) -> Box + Send + Sync>) + Send + Sync + 'static, diff --git a/crates/supervisor/src/job/state.rs b/crates/supervisor/src/job/state.rs index 23997ed..8e016cf 100644 --- a/crates/supervisor/src/job/state.rs +++ b/crates/supervisor/src/job/state.rs @@ -1,8 +1,6 @@ use std::{sync::Arc, time::Instant}; -#[cfg(not(test))] -use command_group::{tokio::ErasedChild, AsyncCommandGroup}; -use tokio::process::Command as TokioCommand; +use process_wrap::tokio::{TokioChildWrapper, TokioCommandWrap}; use tracing::trace; use watchexec_events::ProcessEnd; @@ -33,7 +31,7 @@ pub enum CommandState { /// The child process. #[cfg(not(test))] - child: ErasedChild, + child: Box, /// The time at which the process was spawned. started: Instant, @@ -75,7 +73,7 @@ impl CommandState { pub(crate) fn spawn( &mut self, command: Arc, - mut spawnable: TokioCommand, + mut spawnable: TokioCommandWrap, ) -> std::io::Result { if let Self::Running { .. } = self { trace!("command running, not spawning again"); @@ -88,11 +86,7 @@ impl CommandState { let child = super::TestChild::new(command)?; #[cfg(not(test))] - let child = if command.options.grouped { - ErasedChild::Grouped(spawnable.group().kill_on_drop(true).spawn()?) - } else { - ErasedChild::Ungrouped(spawnable.kill_on_drop(true).spawn()?) - }; + let child = spawnable.spawn()?; *self = Self::Running { child, @@ -136,7 +130,7 @@ impl CommandState { pub(crate) async fn wait(&mut self) -> std::io::Result { if let Self::Running { child, started } = self { - let end = child.wait().await?; + let end = Box::into_pin(child.wait()).await?; *self = Self::Finished { status: end.into(), started: *started, diff --git a/crates/supervisor/src/job/task.rs b/crates/supervisor/src/job/task.rs index 3a951a3..e28d323 100644 --- a/crates/supervisor/src/job/task.rs +++ b/crates/supervisor/src/job/task.rs @@ -1,6 +1,7 @@ use std::{future::Future, mem::take, sync::Arc, time::Instant}; -use tokio::{process::Command as TokioCommand, select, task::JoinHandle}; +use process_wrap::tokio::TokioCommandWrap; +use tokio::{select, task::JoinHandle}; use tracing::{instrument, trace, trace_span, Instrument}; use watchexec_signals::Signal; @@ -156,9 +157,9 @@ pub fn start_job(command: Arc) -> (Job, JoinHandle<()>) { Control::Stop => { if let CommandState::Running { child, started, .. } = &mut command_state { trace!("stopping child"); - try_with_handler!(child.kill().await); + try_with_handler!(Box::into_pin(child.kill()).await); trace!("waiting on child"); - let status = try_with_handler!(child.wait().await); + let status = try_with_handler!(Box::into_pin(child.wait()).await); trace!(?status, "got child end status"); command_state = CommandState::Finished { @@ -188,9 +189,9 @@ pub fn start_job(command: Arc) -> (Job, JoinHandle<()>) { Control::TryRestart => { if let CommandState::Running { child, started, .. } = &mut command_state { trace!("stopping child"); - try_with_handler!(child.kill().await); + try_with_handler!(Box::into_pin(child.kill()).await); trace!("waiting on child"); - let status = try_with_handler!(child.wait().await); + let status = try_with_handler!(Box::into_pin(child.wait()).await); trace!(?status, "got child end status"); command_state = CommandState::Finished { @@ -238,9 +239,9 @@ pub fn start_job(command: Arc) -> (Job, JoinHandle<()>) { if let CommandState::Running { child, started, .. } = &mut command_state { trace!("stopping child forcefully"); - try_with_handler!(child.kill().await); + try_with_handler!(Box::into_pin(child.kill()).await); trace!("waiting on child"); - let status = try_with_handler!(child.wait().await); + let status = try_with_handler!(Box::into_pin(child.wait()).await); trace!(?status, "got child end status"); command_state = CommandState::Finished { @@ -411,15 +412,15 @@ pub type AsyncFunc = Box< >; pub type SyncSpawnHook = - Arc) + Send + Sync + 'static>; + Arc) + Send + Sync + 'static>; pub type AsyncSpawnHook = Arc< - dyn (Fn(&mut TokioCommand, &JobTaskContext<'_>) -> Box + Send + Sync>) + dyn (Fn(&mut TokioCommandWrap, &JobTaskContext<'_>) -> Box + Send + Sync>) + Send + Sync + 'static, >; -sync_async_callbox!(SpawnHook, SyncSpawnHook, AsyncSpawnHook, (command: &mut TokioCommand, context: &JobTaskContext<'_>)); +sync_async_callbox!(SpawnHook, SyncSpawnHook, AsyncSpawnHook, (command: &mut TokioCommandWrap, context: &JobTaskContext<'_>)); pub type SyncErrorHandler = Arc; pub type AsyncErrorHandler = Arc< @@ -432,8 +433,8 @@ sync_async_callbox!(ErrorHandler, SyncErrorHandler, AsyncErrorHandler, (error: S #[instrument(level = "trace")] async fn signal_child( signal: Signal, + #[cfg(not(test))] child: &mut Box, #[cfg(test)] child: &mut super::TestChild, - #[cfg(not(test))] child: &mut command_group::tokio::ErasedChild, ) -> std::io::Result<()> { #[cfg(unix)] { @@ -442,7 +443,7 @@ async fn signal_child( .or_else(|| Signal::Terminate.to_nix()) .expect("UNWRAP: guaranteed for Signal::Terminate default"); trace!(signal=?sig, "sending signal"); - child.signal(sig)?; + child.signal(sig as _)?; } #[cfg(windows)] diff --git a/crates/supervisor/src/job/test.rs b/crates/supervisor/src/job/test.rs index db32e2b..4016e16 100644 --- a/crates/supervisor/src/job/test.rs +++ b/crates/supervisor/src/job/test.rs @@ -328,6 +328,7 @@ async fn start() { #[cfg(unix)] #[tokio::test] async fn signal_unix() { + use nix::sys::signal::Signal; let (job, task) = start_job(working_command()); expect_state!(job, CommandState::Pending); @@ -336,9 +337,9 @@ async fn signal_unix() { job.signal(watchexec_signals::Signal::User1).await; let calls = get_child(&job).await.calls; - assert!(calls - .iter() - .any(|(_, call)| matches!(call, TestChildCall::Signal(command_group::Signal::SIGUSR1)))); + assert!(calls.iter().any( + |(_, call)| matches!(call, TestChildCall::Signal(sig) if *sig == Signal::SIGUSR1 as i32) + )); task.abort(); } @@ -553,6 +554,7 @@ async fn graceful_stop_beyond_grace() { #[cfg(unix)] { + use nix::sys::signal::Signal; expect_state!( job, CommandState::Running { .. }, @@ -562,7 +564,7 @@ async fn graceful_stop_beyond_grace() { let calls = get_child(&job).await.calls; assert!(calls.iter().any(|(_, call)| matches!( call, - TestChildCall::Signal(command_group::Signal::SIGUSR1) + TestChildCall::Signal(sig) if *sig == Signal::SIGUSR1 as i32 ))); } @@ -596,6 +598,7 @@ async fn graceful_restart_beyond_grace() { #[cfg(unix)] { + use nix::sys::signal::Signal; expect_state!( job, CommandState::Running { .. }, @@ -605,7 +608,7 @@ async fn graceful_restart_beyond_grace() { let calls = get_child(&job).await.calls; assert!(calls.iter().any(|(_, call)| matches!( call, - TestChildCall::Signal(command_group::Signal::SIGUSR1) + TestChildCall::Signal(sig) if *sig == Signal::SIGUSR1 as i32 ))); } diff --git a/crates/supervisor/src/job/testchild.rs b/crates/supervisor/src/job/testchild.rs index d77899b..01d34bb 100644 --- a/crates/supervisor/src/job/testchild.rs +++ b/crates/supervisor/src/job/testchild.rs @@ -1,4 +1,5 @@ use std::{ + future::Future, io::Result, path::Path, process::{ExitStatus, Output}, @@ -6,8 +7,6 @@ use std::{ time::{Duration, Instant}, }; -#[cfg(unix)] -use command_group::Signal; use tokio::{sync::Mutex, time::sleep}; use watchexec_events::ProcessEnd; @@ -35,7 +34,7 @@ impl TestChild { } Ok(Self { - grouped: command.options.grouped, + grouped: command.options.grouped || command.options.session, command, calls: Arc::new(boxcar::Vec::new()), output: Arc::new(Mutex::new(None)), @@ -52,7 +51,7 @@ pub enum TestChildCall { TryWait, Wait, #[cfg(unix)] - Signal(Signal), + Signal(i32), } // Exact same signatures as ErasedChild @@ -62,9 +61,9 @@ impl TestChild { None } - pub async fn kill(&mut self) -> Result<()> { + pub fn kill(&mut self) -> Box> + Send + '_> { self.calls.push(TestChildCall::Kill); - Ok(()) + Box::new(async { Ok(()) }) } pub fn start_kill(&mut self) -> Result<()> { @@ -96,54 +95,58 @@ impl TestChild { .and_then(|o| o.as_ref().map(|o| o.status))) } - pub async fn wait(&mut self) -> Result { + pub fn wait(&mut self) -> Box> + Send + '_> { self.calls.push(TestChildCall::Wait); - if let Program::Exec { prog, args } = &self.command.program { - if prog == Path::new("sleep") { - if let Some(time) = args - .get(0) - .and_then(|arg| arg.parse().ok()) - .map(Duration::from_millis) - { - if self.spawned.elapsed() < time { - sleep(time - self.spawned.elapsed()).await; - if let Ok(guard) = self.output.try_lock() { - if let Some(output) = guard.as_ref() { - return Ok(output.status); + Box::new(async { + if let Program::Exec { prog, args } = &self.command.program { + if prog == Path::new("sleep") { + if let Some(time) = args + .get(0) + .and_then(|arg| arg.parse().ok()) + .map(Duration::from_millis) + { + if self.spawned.elapsed() < time { + sleep(time - self.spawned.elapsed()).await; + if let Ok(guard) = self.output.try_lock() { + if let Some(output) = guard.as_ref() { + return Ok(output.status); + } } - } - return Ok(ProcessEnd::Success.into_exitstatus()); + return Ok(ProcessEnd::Success.into_exitstatus()); + } } } } - } - loop { - eprintln!("[{:?}] child: output lock", Instant::now()); - let output = self.output.lock().await; - if let Some(output) = output.as_ref() { - return Ok(output.status); - } - eprintln!("[{:?}] child: output unlock", Instant::now()); + loop { + eprintln!("[{:?}] child: output lock", Instant::now()); + let output = self.output.lock().await; + if let Some(output) = output.as_ref() { + return Ok(output.status); + } + eprintln!("[{:?}] child: output unlock", Instant::now()); - sleep(Duration::from_secs(1)).await; - } - } - - pub async fn wait_with_output(self) -> Result { - loop { - let mut output = self.output.lock().await; - if let Some(output) = output.take() { - return Ok(output); - } else { sleep(Duration::from_secs(1)).await; } - } + }) + } + + pub fn wait_with_output(self) -> Box> + Send> { + Box::new(async move { + loop { + let mut output = self.output.lock().await; + if let Some(output) = output.take() { + return Ok(output); + } else { + sleep(Duration::from_secs(1)).await; + } + } + }) } #[cfg(unix)] - pub fn signal(&self, sig: Signal) -> Result<()> { + pub fn signal(&self, sig: i32) -> Result<()> { self.calls.push(TestChildCall::Signal(sig)); Ok(()) } diff --git a/crates/supervisor/tests/programs.rs b/crates/supervisor/tests/programs.rs index b4bb7d1..a0c6289 100644 --- a/crates/supervisor/tests/programs.rs +++ b/crates/supervisor/tests/programs.rs @@ -1,18 +1,20 @@ -use command_group::AsyncCommandGroup; use watchexec_supervisor::command::{Command, Program, Shell}; #[tokio::test] #[cfg(unix)] async fn unix_shell_none() -> Result<(), std::io::Error> { - assert!(Command { - program: Program::Exec { - prog: "echo".into(), - args: vec!["hi".into()], - }, - options: Default::default() - } - .to_spawnable() - .group_status() + assert!(Box::into_pin( + Command { + program: Program::Exec { + prog: "echo".into(), + args: vec!["hi".into()], + }, + options: Default::default() + } + .to_spawnable() + .spawn()? + .wait() + ) .await? .success()); Ok(()) @@ -21,16 +23,19 @@ async fn unix_shell_none() -> Result<(), std::io::Error> { #[tokio::test] #[cfg(unix)] async fn unix_shell_sh() -> Result<(), std::io::Error> { - assert!(Command { - program: Program::Shell { - shell: Shell::new("sh"), - command: "echo hi".into(), - args: Vec::new(), - }, - options: Default::default() - } - .to_spawnable() - .group_status() + assert!(Box::into_pin( + Command { + program: Program::Shell { + shell: Shell::new("sh"), + command: "echo hi".into(), + args: Vec::new(), + }, + options: Default::default() + } + .to_spawnable() + .spawn()? + .wait() + ) .await? .success()); Ok(()) @@ -39,16 +44,19 @@ async fn unix_shell_sh() -> Result<(), std::io::Error> { #[tokio::test] #[cfg(unix)] async fn unix_shell_alternate() -> Result<(), std::io::Error> { - assert!(Command { - program: Program::Shell { - shell: Shell::new("bash"), - command: "echo".into(), - args: vec!["--".into(), "hi".into()], - }, - options: Default::default() - } - .to_spawnable() - .group_status() + assert!(Box::into_pin( + Command { + program: Program::Shell { + shell: Shell::new("bash"), + command: "echo".into(), + args: vec!["--".into(), "hi".into()], + }, + options: Default::default() + } + .to_spawnable() + .spawn()? + .wait() + ) .await? .success()); Ok(()) @@ -57,19 +65,22 @@ async fn unix_shell_alternate() -> Result<(), std::io::Error> { #[tokio::test] #[cfg(unix)] async fn unix_shell_alternate_shopts() -> Result<(), std::io::Error> { - assert!(Command { - program: Program::Shell { - shell: Shell { - options: vec!["-o".into(), "errexit".into()], - ..Shell::new("bash") + assert!(Box::into_pin( + Command { + program: Program::Shell { + shell: Shell { + options: vec!["-o".into(), "errexit".into()], + ..Shell::new("bash") + }, + command: "echo hi".into(), + args: Vec::new(), }, - command: "echo hi".into(), - args: Vec::new(), - }, - options: Default::default() - } - .to_spawnable() - .group_status() + options: Default::default() + } + .to_spawnable() + .spawn()? + .wait() + ) .await? .success()); Ok(()) @@ -78,15 +89,18 @@ async fn unix_shell_alternate_shopts() -> Result<(), std::io::Error> { #[tokio::test] #[cfg(windows)] async fn windows_shell_none() -> Result<(), std::io::Error> { - assert!(Command { - program: Program::Exec { - prog: "echo".into(), - args: vec!["hi".into()], - }, - options: Default::default() - } - .to_spawnable() - .group_status() + assert!(Box::into_pin( + Command { + program: Program::Exec { + prog: "echo".into(), + args: vec!["hi".into()], + }, + options: Default::default() + } + .to_spawnable() + .spawn()? + .wait() + ) .await? .success()); Ok(()) @@ -95,16 +109,19 @@ async fn windows_shell_none() -> Result<(), std::io::Error> { #[tokio::test] #[cfg(windows)] async fn windows_shell_cmd() -> Result<(), std::io::Error> { - assert!(Command { - program: Program::Shell { - shell: Shell::cmd(), - args: Vec::new(), - command: r#""echo" hi"#.into() - }, - options: Default::default() - } - .to_spawnable() - .group_status() + assert!(Box::into_pin( + Command { + program: Program::Shell { + shell: Shell::cmd(), + args: Vec::new(), + command: r#""echo" hi"#.into() + }, + options: Default::default() + } + .to_spawnable() + .spawn()? + .wait() + ) .await? .success()); Ok(()) @@ -113,16 +130,19 @@ async fn windows_shell_cmd() -> Result<(), std::io::Error> { #[tokio::test] #[cfg(windows)] async fn windows_shell_powershell() -> Result<(), std::io::Error> { - assert!(Command { - program: Program::Shell { - shell: Shell::new("pwsh.exe"), - args: Vec::new(), - command: "echo hi".into() - }, - options: Default::default() - } - .to_spawnable() - .group_status() + assert!(Box::into_pin( + Command { + program: Program::Shell { + shell: Shell::new("pwsh.exe"), + args: Vec::new(), + command: "echo hi".into() + }, + options: Default::default() + } + .to_spawnable() + .spawn()? + .wait() + ) .await? .success()); Ok(())