Use Process & ProcessReaper to handle async process waiting

This commit is contained in:
Matt Green 2016-10-26 11:01:55 -04:00
parent 1cc4d92345
commit f04083a21d
4 changed files with 212 additions and 283 deletions

View File

@ -1,3 +1,4 @@
use std::process::Command;
use std::path::Path;
use clap::{App, Arg};
@ -17,6 +18,16 @@ pub struct Args {
pub poll_interval: u32,
}
#[cfg(target_family = "windows")]
pub fn clear_screen() {
let _ = Command::new("cls").status();
}
#[cfg(target_family = "unix")]
pub fn clear_screen() {
let _ = Command::new("clear").status();
}
pub fn get_args() -> Args {
let args = App::new("watchexec")
.version(crate_version!())
@ -83,7 +94,7 @@ pub fn get_args() -> Args {
if let Some(extensions) = args.values_of("extensions") {
for exts in extensions {
filters.extend(exts.split(",")
filters.extend(exts.split(',')
.filter(|ext| !ext.is_empty())
.map(|ext| format!("*.{}", ext.replace(".", ""))));

View File

@ -22,11 +22,11 @@ extern crate kernel32;
#[cfg(test)]
extern crate mktemp;
mod args;
mod cli;
mod gitignore;
mod interrupt_handler;
mod notification_filter;
mod runner;
mod process;
mod watcher;
use std::env;
@ -35,7 +35,7 @@ use std::time::Duration;
use std::path::{Path, PathBuf};
use notification_filter::NotificationFilter;
use runner::Runner;
use process::{Process, ProcessReaper};
use watcher::{Event, Watcher};
// Starting at the specified path, search for gitignore files,
@ -73,7 +73,7 @@ fn init_logger(debug: bool) {
fn main() {
let interrupt_rx = interrupt_handler::install();
let args = args::get_args();
let args = cli::get_args();
init_logger(args.debug);
@ -121,46 +121,43 @@ fn main() {
}
let cmd = args.cmd;
let (mut runner, child_rx) = Runner::new();
let mut child_process = None;
if args.run_initially {
let (child_finish_tx, child_finish_rx) = channel();
let reaper = ProcessReaper::new(child_finish_tx);
let mut child_process = if args.run_initially {
if args.clear_screen {
runner.clear_screen();
cli::clear_screen();
}
child_process = runner.run_command(&cmd, vec![]);
}
Process::new(&cmd, vec![])
} else { None };
while !interrupt_handler::interrupt_requested() {
match wait(&rx, &interrupt_rx, &filter) {
Some(paths) => {
let updated = paths.iter()
.map(|p| p.to_str().unwrap())
.collect();
if let Some(paths) = wait(&rx, &interrupt_rx, &filter) {
let updated = paths.iter()
.map(|p| p.to_str().unwrap())
.collect();
if let Some(mut child) = child_process {
if args.restart {
debug!("Killing child process");
child.kill();
}
debug!("Waiting for process to exit...");
select! {
_ = child_rx.recv() => {},
_ = interrupt_rx.recv() => break
}
if let Some(mut child) = child_process {
if args.restart {
debug!("Killing child process");
child.kill();
}
if args.clear_screen {
runner.clear_screen();
}
debug!("Waiting for process to exit...");
reaper.wait_process(child);
select! {
_ = child_finish_rx.recv() => {},
_ = interrupt_rx.recv() => break
};
}
child_process = runner.run_command(&cmd, updated);
}
None => {
// interrupted
if args.clear_screen {
cli::clear_screen();
}
child_process = Process::new(&cmd, updated);
}
}
}

170
src/process.rs Normal file
View File

@ -0,0 +1,170 @@
use threadpool::ThreadPool;
use std::process::{Child, Command};
use std::sync::mpsc::{Sender};
pub struct Process {
process: Child,
killed: bool
}
#[cfg(target_family = "unix")]
impl Process {
pub fn new(cmd: &str, updated_paths: Vec<&str>) -> Option<Process>{
use libc;
use std::os::unix::process::CommandExt;
let mut command = Command::new("sh");
command.arg("-c").arg(cmd);
if !updated_paths.is_empty() {
command.env("WATCHEXEC_UPDATED_PATH", updated_paths[0]);
}
command.before_exec(|| unsafe {
libc::setpgid(0, 0);
Ok(())
})
.spawn()
.ok()
.and_then(|p| Some(Process { process: p, killed: false }))
}
pub fn kill(&mut self) {
if self.killed {
return;
}
use libc;
extern "C" {
fn killpg(pgrp: libc::pid_t, sig: libc::c_int) -> libc::c_int;
}
unsafe {
killpg(self.process.id() as i32, libc::SIGTERM);
}
self.killed = true;
}
pub fn wait(&mut self) {
use nix::sys::wait::waitpid;
let pid = self.process.id() as i32;
let _ = waitpid(-pid, None);
}
}
#[cfg(target_family = "windows")]
impl Process {
pub fn new(cmd: &str, updated_paths: Vec<&str>) -> Option<Process> {
use std::os::windows::io::AsRawHandle;
let mut command = Command::new("cmd.exe");
command.arg("/C").arg(cmd);
if !updated_paths.is_empty() {
command.env("WATCHEXEC_UPDATED_PATH", updated_paths[0]);
}
command.spawn()
.ok()
.and_then(|p| { Some(Process { process: p, killed: false })})
}
pub fn kill(&mut self) {
if self.killed {
return;
}
self.process.kill();
self.killed = true;
}
pub fn wait(&mut self) {
self.process.wait();
}
}
impl Drop for Process {
fn drop(&mut self) {
self.kill();
}
}
pub struct ProcessReaper {
pool: ThreadPool,
tx: Sender<()>,
}
impl ProcessReaper {
pub fn new(tx: Sender<()>) -> ProcessReaper {
ProcessReaper {
pool: ThreadPool::new(1),
tx: tx
}
}
pub fn wait_process(&self, mut process: Process) {
let tx = self.tx.clone();
self.pool.execute(move || {
process.wait();
let _ = tx.send(());
});
}
}
#[cfg(test)]
#[cfg(target_family = "unix")]
mod tests {
use std::path::Path;
use std::thread;
use std::time::Duration;
use mktemp::Temp;
use super::Process;
fn file_contents(path: &Path) -> String {
use std::fs::File;
use std::io::Read;
let mut f = File::open(path).unwrap();
let mut s = String::new();
f.read_to_string(&mut s).unwrap();
s
}
#[test]
fn test_start() {
let process = Process::new("echo hi", vec![]);
assert!(process.is_some());
}
#[test]
fn test_wait() {
let file = Temp::new_file().unwrap();
let path = file.to_path_buf();
let mut process = Process::new(&format!("echo hi > {}", path.to_str().unwrap()), vec![]).unwrap();
process.wait();
assert!(file_contents(&path).starts_with("hi"));
}
#[test]
fn test_kill() {
let file = Temp::new_file().unwrap();
let path = file.to_path_buf();
let mut process = Process::new(&format!("sleep 20; echo hi > {}", path.to_str().unwrap()), vec![]).unwrap();
thread::sleep(Duration::from_millis(250));
process.kill();
process.wait();
assert!(file_contents(&path) == "");
}
}

View File

@ -1,249 +0,0 @@
use std::process::Command;
use std::sync::mpsc::{channel, Receiver, Sender};
use threadpool::ThreadPool;
/// Runs child processes and provides a channel to asynchronously wait on
/// their completion.
///
/// This enables us to remain responsive to interruption requests even if
/// the child process does not honor our kill requests.
pub struct Runner {
pool: ThreadPool,
tx: Sender<()>,
}
impl Runner {
pub fn new() -> (Runner, Receiver<()>) {
let (tx, rx) = channel();
(Runner {
pool: ThreadPool::new(1),
tx: tx,
},
rx)
}
#[cfg(target_family = "windows")]
pub fn clear_screen(&self) {
let _ = Command::new("cls").status();
}
#[cfg(target_family = "unix")]
pub fn clear_screen(&self) {
let _ = Command::new("clear").status();
}
pub fn run_command(&mut self,
cmd: &str,
updated_paths: Vec<&str>)
-> Option<Process> {
let child = Process::new(cmd, updated_paths);
if let Some(ref process) = child {
let tx = self.tx.clone();
let mut p = process.as_platform_process();
self.pool.execute(move || {
p.wait();
let _ = tx.send(());
});
}
child
}
}
/// High-level wrapper around a child process
/// Unlike `platform::Process`, `Process` kills the child when it is dropped.
pub struct Process {
process: platform::Process
}
impl Process {
pub fn new(cmd: &str, updated_paths: Vec<&str>) -> Option<Process> {
platform::Process::new(cmd, updated_paths).and_then(|p| {
Some(Process { process: p })
})
}
fn as_platform_process(&self) -> platform::Process {
self.process.clone()
}
pub fn kill(&mut self) {
self.process.kill();
}
#[allow(dead_code)]
pub fn wait(&mut self) {
self.process.wait();
}
}
impl Drop for Process {
fn drop(&mut self) {
self.kill();
}
}
#[cfg(target_family = "unix")]
mod platform {
use std::process::Command;
#[derive(Clone)]
pub struct Process {
child_pid: i32,
}
#[cfg(target_family = "unix")]
impl Process {
pub fn new(cmd: &str, updated_paths: Vec<&str>) -> Option<Process> {
use libc;
use std::os::unix::process::CommandExt;
let mut command = Command::new("sh");
command.arg("-c").arg(cmd);
if !updated_paths.is_empty() {
command.env("WATCHEXEC_UPDATED_PATH", updated_paths[0]);
}
let c = command.before_exec(|| unsafe {
libc::setpgid(0, 0);
Ok(())
})
.spawn()
.ok();
match c {
Some(process) => {
Some(Process {
child_pid: process.id() as i32,
})
}
None => None,
}
}
pub fn kill(&mut self) {
use libc;
extern "C" {
fn killpg(pgrp: libc::pid_t, sig: libc::c_int) -> libc::c_int;
}
unsafe {
killpg(self.child_pid, libc::SIGTERM);
}
}
pub fn wait(&mut self) {
use nix::sys::wait::waitpid;
let _ = waitpid(-self.child_pid, None);
}
}
}
#[cfg(target_family = "windows")]
mod platform {
use std::process::Command;
use winapi::winnt::HANDLE;
#[derive(Clone)]
pub struct Process {
child_handle: HANDLE,
}
unsafe impl Send for Process {}
#[cfg(target_family = "windows")]
impl Process {
pub fn new(cmd: &str, updated_paths: Vec<&str>) -> Option<Process> {
use std::os::windows::io::AsRawHandle;
let mut command = Command::new("cmd.exe");
command.arg("/C").arg(cmd);
if !updated_paths.is_empty() {
command.env("WATCHEXEC_UPDATED_PATH", updated_paths[0]);
}
match command.spawn().ok() {
Some(process) => Some(Process { child_handle: process.as_raw_handle() }),
None => None,
}
}
pub fn kill(&mut self) {
use kernel32::TerminateProcess;
unsafe {
let _ = TerminateProcess(self.child_handle, 0);
}
}
pub fn wait(&mut self) {
use kernel32::WaitForSingleObject;
use winapi::winbase::INFINITE;
unsafe {
let _ = WaitForSingleObject(self.child_handle, INFINITE);
}
}
}
}
#[cfg(test)]
#[cfg(target_family = "unix")]
mod process_tests {
use std::path::Path;
use std::thread;
use std::time::Duration;
use mktemp::Temp;
use super::Process;
fn file_contents(path: &Path) -> String {
use std::fs::File;
use std::io::Read;
let mut f = File::open(path).unwrap();
let mut s = String::new();
f.read_to_string(&mut s).unwrap();
s
}
#[test]
fn test_start() {
let process = Process::new("echo hi", vec![]);
assert!(process.is_some());
}
#[test]
fn test_wait() {
let file = Temp::new_file().unwrap();
let path = file.to_path_buf();
let mut process = Process::new(&format!("echo hi > {}", path.to_str().unwrap()), vec![]).unwrap();
process.wait();
assert!(file_contents(&path).starts_with("hi"));
}
#[test]
fn test_kill() {
let file = Temp::new_file().unwrap();
let path = file.to_path_buf();
let mut process = Process::new(&format!("sleep 20; echo hi > {}", path.to_str().unwrap()), vec![]).unwrap();
thread::sleep(Duration::from_millis(250));
process.kill();
process.wait();
assert!(file_contents(&path) == "");
}
}