Replace process code with command-group

This commit is contained in:
Félix Saparelli 2021-07-21 17:26:45 +12:00
parent 4d6ad2cc1f
commit 9c20c8c8b5
4 changed files with 115 additions and 395 deletions

39
Cargo.lock generated
View file

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.15.2"
@ -112,7 +114,7 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9eff80751e49709362458c0b612c3c682693d7a58966e0bf429872888f647e20"
dependencies = [
"nix",
"nix 0.20.0",
"terminfo",
"thiserror",
"which",
@ -146,6 +148,16 @@ dependencies = [
"tracing-error",
]
[[package]]
name = "command-group"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba0e6179c8bfe0251e18e65da6f251f81451ead1968aa3c24401b425fc1a184b"
dependencies = [
"nix 0.22.0",
"winapi 0.3.9",
]
[[package]]
name = "console"
version = "0.14.1"
@ -526,6 +538,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"
[[package]]
name = "memoffset"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
dependencies = [
"autocfg",
]
[[package]]
name = "miniz_oxide"
version = "0.4.4"
@ -602,6 +623,19 @@ dependencies = [
"libc",
]
[[package]]
name = "nix"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1e25ee6b412c2a1e3fcb6a4499a5c1bfe7f43e014bdce9a6b6666e5aa2d187"
dependencies = [
"bitflags",
"cc",
"cfg-if 1.0.0",
"libc",
"memoffset",
]
[[package]]
name = "nom"
version = "5.1.2"
@ -1159,12 +1193,13 @@ name = "watchexec"
version = "1.16.1"
dependencies = [
"clearscreen",
"command-group",
"derive_builder",
"glob",
"globset",
"lazy_static",
"log",
"nix",
"nix 0.22.0",
"notify",
"walkdir",
"winapi 0.3.9",

View file

@ -16,6 +16,7 @@ edition = "2018"
[dependencies]
clearscreen = "1.0.4"
command-group = "1.0.3"
derive_builder = "0.10.0"
glob = "0.3.0"
globset = "=0.4.6"
@ -25,11 +26,11 @@ notify = "4.0.15"
walkdir = "2.3.2"
[target.'cfg(unix)'.dependencies]
nix = "0.20.0"
nix = "0.22.0"
[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.9"
features = ["ioapiset", "jobapi2", "tlhelp32"]
# features = ["ioapiset", "jobapi2", "tlhelp32"]
[package.metadata.binstall]
pkg-url = "{ repo }/releases/download/cli-v{ version }/watchexec-{ version }-{ target }.tar.xz"

View file

@ -7,6 +7,7 @@ use std::{
path::PathBuf,
process::Command,
};
use command_group::{CommandGroup, GroupChild};
/// Shell to use to run commands.
///
@ -123,370 +124,22 @@ pub fn spawn(
updated_paths: &[PathOp],
shell: Shell,
environment: bool,
) -> Result<Process> {
self::imp::Process::new(cmd, updated_paths, shell, environment).map_err(|e| e.into())
}
pub use self::imp::Process;
#[cfg(target_family = "unix")]
mod imp {
use super::Shell;
use crate::pathop::PathOp;
use crate::signal::Signal;
use nix::libc::*;
use nix::{self, Error};
use std::io::{self, Result};
use std::sync::*;
pub struct Process {
pgid: pid_t,
lock: Mutex<bool>,
cvar: Condvar,
}
fn from_nix_error(err: nix::Error) -> io::Error {
match err {
Error::Sys(errno) => io::Error::from_raw_os_error(errno as i32),
Error::InvalidPath => io::Error::new(io::ErrorKind::InvalidInput, err),
_ => io::Error::new(io::ErrorKind::Other, err),
}
}
#[allow(clippy::mutex_atomic)]
impl Process {
pub fn new(
cmd: &[String],
updated_paths: &[PathOp],
shell: Shell,
environment: bool,
) -> Result<Self> {
use nix::unistd::*;
use std::convert::TryInto;
use std::os::unix::process::CommandExt;
) -> Result<GroupChild> {
let mut command = shell.to_command(&cmd);
debug!("Assembled command {:?}", command);
let command_envs = if !environment {
Vec::new()
} else {
super::collect_path_env_vars(updated_paths)
collect_path_env_vars(updated_paths)
};
for &(ref name, ref val) in &command_envs {
command.env(name, val);
}
unsafe {
command.pre_exec(|| setsid().map_err(from_nix_error).map(|_| ()));
}
command.spawn().map(|p| Self {
pgid: p
.id()
.try_into()
.expect("u32 -> i32 failed in process::new"),
lock: Mutex::new(false),
cvar: Condvar::new(),
})
}
pub fn reap(&self) {
use nix::sys::wait::*;
use nix::unistd::Pid;
let mut finished = true;
loop {
match waitpid(Pid::from_raw(-self.pgid), Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(_, _)) | Ok(WaitStatus::Signaled(_, _, _)) => {}
Ok(_) => {
finished = false;
break;
}
Err(_) => break,
}
}
if finished {
let mut done = self.lock.lock().expect("poisoned lock in process::reap");
*done = true;
self.cvar.notify_one();
}
}
pub fn signal(&self, signal: Signal) {
use crate::signal::ConvertToLibc;
let signo = signal.convert_to_libc();
debug!("Sending {:?} (int: {}) to the command", signal, signo);
self.c_signal(signo);
}
fn c_signal(&self, sig: c_int) {
extern "C" {
fn killpg(pgrp: pid_t, sig: c_int) -> c_int;
}
unsafe {
killpg(self.pgid, sig);
}
}
pub fn wait(&self) {
let mut done = self.lock.lock().expect("poisoned lock in process::wait");
while !*done {
done = self
.cvar
.wait(done)
.expect("poisoned cvar in process::wait");
}
}
pub fn is_complete(&self) -> bool {
*self
.lock
.lock()
.expect("poisoned lock in process::is_complete")
}
}
}
#[cfg(target_family = "windows")]
mod imp {
use super::Shell;
use crate::pathop::PathOp;
use crate::signal::Signal;
use std::{
convert::TryInto,
io::{self, Result},
mem,
os::windows::{io::IntoRawHandle, process::CommandExt},
ptr,
};
use winapi::{
shared::{
basetsd::ULONG_PTR,
minwindef::{DWORD, LPVOID},
},
um::{
handleapi::{CloseHandle, INVALID_HANDLE_VALUE},
ioapiset::{CreateIoCompletionPort, GetQueuedCompletionStatus},
jobapi2::{
AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject,
TerminateJobObject,
},
minwinbase::LPOVERLAPPED,
processthreadsapi::{GetProcessId, OpenThread, ResumeThread},
tlhelp32::{
CreateToolhelp32Snapshot, Thread32First, Thread32Next, TH32CS_SNAPTHREAD,
THREADENTRY32,
},
winbase::{CREATE_SUSPENDED, INFINITE},
winnt::{
JobObjectAssociateCompletionPortInformation, JobObjectExtendedLimitInformation,
HANDLE, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO, PVOID,
},
},
};
pub struct Process {
job: HANDLE,
completion_port: HANDLE,
}
#[repr(C)]
struct JOBOBJECT_ASSOCIATE_COMPLETION_PORT {
completion_key: PVOID,
completion_port: HANDLE,
}
impl Process {
pub fn new(
cmd: &[String],
updated_paths: &[PathOp],
shell: Shell,
environment: bool,
) -> Result<Self> {
fn last_err() -> io::Error {
io::Error::last_os_error()
}
let job = unsafe { CreateJobObjectW(ptr::null_mut(), ptr::null()) };
if job.is_null() {
panic!("failed to create job object: {}", last_err());
}
let completion_port =
unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, ptr::null_mut(), 0, 1) };
if job.is_null() {
panic!(
"unable to create IO completion port for job: {}",
last_err()
);
}
let mut associate_completion: JOBOBJECT_ASSOCIATE_COMPLETION_PORT =
unsafe { mem::zeroed() };
associate_completion.completion_key = job;
associate_completion.completion_port = completion_port;
unsafe {
let r = SetInformationJobObject(
job,
JobObjectAssociateCompletionPortInformation,
&mut associate_completion as *mut _ as LPVOID,
mem::size_of_val(&associate_completion)
.try_into()
.expect("cannot safely cast to DWORD"),
);
if r == 0 {
panic!(
"failed to associate completion port with job: {}",
last_err()
);
}
}
let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { mem::zeroed() };
info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
let r = unsafe {
SetInformationJobObject(
job,
JobObjectExtendedLimitInformation,
&mut info as *mut _ as LPVOID,
mem::size_of_val(&info)
.try_into()
.expect("cannot safely cast to DWORD"),
)
};
if r == 0 {
panic!("failed to set job info: {}", last_err());
}
let mut command = shell.to_command(&cmd);
command.creation_flags(CREATE_SUSPENDED);
debug!("Assembled command {:?}", command);
let command_envs = if !environment {
Vec::new()
} else {
super::collect_path_env_vars(updated_paths)
};
for &(ref name, ref val) in &command_envs {
command.env(name, val);
}
command.spawn().map(|p| {
let handle = p.into_raw_handle();
if unsafe { AssignProcessToJobObject(job, handle) } == 0 {
panic!("failed to add to job object: {}", last_err());
}
resume_threads(handle);
Self {
job,
completion_port,
}
})
}
pub const fn reap(&self) {}
pub fn signal(&self, _signal: Signal) {
unsafe {
let _ = TerminateJobObject(self.job, 1);
}
}
// Some(true) if complete, Some(false) if incomplete, None if timed out
fn wait_for_completion(&self, timeout: DWORD) -> Option<bool> {
let mut code: DWORD = 0;
let mut key: ULONG_PTR = 0;
let mut overlapped = mem::MaybeUninit::<LPOVERLAPPED>::uninit();
if 0 == unsafe {
GetQueuedCompletionStatus(
self.completion_port,
&mut code,
&mut key,
overlapped.as_mut_ptr(),
timeout,
)
} {
// TODO: handle errors? how does that manifest for timeouts?
None
} else {
Some(code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO && (key as HANDLE) == self.job)
}
}
pub fn is_complete(&self) -> bool {
// you need to poll a few times to get the real answer
// if the process has already completed... odd but okay
for _ in 0..5 {
if let Some(true) = self.wait_for_completion(0) {
return true;
}
}
false
}
pub fn wait(&self) {
loop {
if self.wait_for_completion(INFINITE).expect(
"GetQueuedCompletionStatus passed INFINITE timeout but timed out anyway",
) {
break;
}
}
}
}
impl Drop for Process {
fn drop(&mut self) {
unsafe {
let _ = CloseHandle(self.job);
let _ = CloseHandle(self.completion_port);
}
}
}
unsafe impl Send for Process {}
unsafe impl Sync for Process {}
// This is pretty terrible, but it's either this or we re-implement all of Rust's std::process just to get at PROCESS_INFORMATION
fn resume_threads(child_process: HANDLE) {
unsafe {
let child_id = GetProcessId(child_process);
let h = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0);
let mut entry = THREADENTRY32 {
dwSize: 28,
cntUsage: 0,
th32ThreadID: 0,
th32OwnerProcessID: 0,
tpBasePri: 0,
tpDeltaPri: 0,
dwFlags: 0,
};
let mut result = Thread32First(h, &mut entry);
while result != 0 {
if entry.th32OwnerProcessID == child_id {
let thread_handle = OpenThread(0x0002, 0, entry.th32ThreadID);
ResumeThread(thread_handle);
CloseHandle(thread_handle);
}
result = Thread32Next(h, &mut entry);
}
CloseHandle(h);
}
}
let child = command.group_spawn()?;
Ok(child)
}
/// Collect `PathOp` details into op-categories to pass onto the exec'd command as env-vars

View file

@ -1,10 +1,14 @@
use command_group::GroupChild;
#[cfg(unix)]
use command_group::UnixChildExt;
use crate::config::Config;
use crate::error::{Error, Result};
use crate::gitignore;
use crate::ignore;
use crate::notification_filter::NotificationFilter;
use crate::pathop::PathOp;
use crate::process::{self, Process};
use crate::process;
use crate::signal::{self, Signal};
use crate::watcher::{Event, Watcher};
use std::{
@ -12,7 +16,7 @@ use std::{
fs::canonicalize,
sync::{
mpsc::{channel, Receiver},
Arc, RwLock,
Arc, Mutex,
},
time::Duration,
};
@ -141,12 +145,12 @@ where
pub struct ExecHandler {
args: Config,
signal: Option<Signal>,
child_process: Arc<RwLock<Option<Process>>>,
child_process: Arc<Mutex<Option<GroupChild>>>,
}
impl ExecHandler {
pub fn new(args: Config) -> Result<Self> {
let child_process: Arc<RwLock<Option<Process>>> = Arc::new(RwLock::new(None));
let child_process: Arc<Mutex<Option<GroupChild>>> = Arc::new(Mutex::new(None));
let weak_child = Arc::downgrade(&child_process);
// Convert signal string to the corresponding integer
@ -154,11 +158,20 @@ impl ExecHandler {
signal::install_handler(move |sig: Signal| {
if let Some(lock) = weak_child.upgrade() {
let strong = lock.read().expect("poisoned lock in install_handler");
if let Some(ref child) = *strong {
let mut strong = lock.lock().expect("poisoned lock in install_handler");
if let Some(child) = strong.as_mut() {
match sig {
Signal::SIGCHLD => child.reap(), // SIGCHLD is special, initiate reap()
_ => child.signal(sig),
Signal::SIGCHLD => {
debug!("Try-waiting on command");
child.try_wait().ok();
},
_ => {
#[cfg(unix)]
child.signal(sig).unwrap_or_else(|err| warn!("Could not pass on signal to command: {}", err));
#[cfg(not(unix))]
child.kill().unwrap_or_else(|err| warn!("Could not pass on termination to command: {}", err));
},
}
}
}
@ -177,7 +190,7 @@ impl ExecHandler {
}
debug!("Launching command");
let mut guard = self.child_process.write()?;
let mut guard = self.child_process.lock()?;
*guard = Some(process::spawn(
&self.args.cmd,
ops,
@ -188,16 +201,17 @@ impl ExecHandler {
Ok(())
}
pub fn has_running_process(&self) -> bool {
let guard = self
pub fn has_running_process(&self) -> Result<bool> {
let mut guard = self
.child_process
.read()
.lock()
.expect("poisoned lock in has_running_process");
(*guard)
.as_ref()
.map(|process| !process.is_complete())
.unwrap_or(false)
if let Some(child) = guard.as_mut() {
Ok(child.try_wait()?.is_none())
} else {
Ok(false)
}
}
}
@ -218,8 +232,10 @@ impl Handler for ExecHandler {
// Only returns Err() on lock poisoning.
fn on_update(&self, ops: &[PathOp]) -> Result<bool> {
log::debug!("ON UPDATE: called");
let signal = self.signal.unwrap_or(Signal::SIGTERM);
let has_running_processes = self.has_running_process();
let has_running_processes = self.has_running_process()?;
log::debug!(
"ON UPDATE: has_running_processes: {} --- on_busy_update: {:?}",
@ -234,18 +250,18 @@ impl Handler for ExecHandler {
}
// Just send a signal to the command, do nothing more
(true, OnBusyUpdate::Signal) => signal_process(&self.child_process, signal),
(true, OnBusyUpdate::Signal) => signal_process(&self.child_process, signal)?,
// Send a signal to the command, wait for it to exit, then run the command again
(true, OnBusyUpdate::Restart) => {
signal_process(&self.child_process, signal);
wait_on_process(&self.child_process);
signal_process(&self.child_process, signal)?;
wait_on_process(&self.child_process)?;
self.spawn(ops)?;
}
// Wait for the command to end, then run it again
(true, OnBusyUpdate::Queue) => {
wait_on_process(&self.child_process);
wait_on_process(&self.child_process)?;
self.spawn(ops)?;
}
@ -255,10 +271,10 @@ impl Handler for ExecHandler {
// Handle once option for integration testing
if self.args.once {
if let Some(signal) = self.signal {
signal_process(&self.child_process, signal);
signal_process(&self.child_process, signal)?;
}
wait_on_process(&self.child_process);
wait_on_process(&self.child_process)?;
return Ok(false);
}
@ -326,20 +342,35 @@ fn wait_fs(
paths
}
fn signal_process(process: &RwLock<Option<Process>>, signal: Signal) {
let guard = process.read().expect("poisoned lock in signal_process");
fn signal_process(process: &Mutex<Option<GroupChild>>, signal: Signal) -> Result<()> {
let mut guard = process.lock().expect("poisoned lock in signal_process");
if let Some(ref child) = *guard {
if let Some(child) = guard.as_mut() {
#[cfg(unix)] {
debug!("Signaling process with {}", signal);
child.signal(signal);
child.signal(signal)?;
}
#[cfg(not(unix))] {
if matches!(signal, Signal::SIGTERM | Signal::SIGKILL) {
debug!("Killing process");
child.kill()?;
} else {
debug!("Ignoring signal to send to process");
}
}
}
Ok(())
}
fn wait_on_process(process: &RwLock<Option<Process>>) {
let guard = process.read().expect("poisoned lock in wait_on_process");
fn wait_on_process(process: &Mutex<Option<GroupChild>>) -> Result<()> {
let mut guard = process.lock().expect("poisoned lock in wait_on_process");
if let Some(ref child) = *guard {
if let Some(child) = guard.as_mut() {
debug!("Waiting for process to exit...");
child.wait();
child.wait()?;
}
Ok(())
}