Process module improvements, use later nix

This commit is contained in:
Matt Green 2016-10-26 16:14:57 -04:00
parent d14ccaaa1a
commit e26eff226c
5 changed files with 154 additions and 76 deletions

6
Cargo.lock generated
View File

@ -10,7 +10,7 @@ dependencies = [
"libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"mktemp 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "mktemp 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"nix 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"notify 2.6.3 (registry+https://github.com/rust-lang/crates.io-index)", "notify 2.6.3 (registry+https://github.com/rust-lang/crates.io-index)",
"threadpool 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "threadpool 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -178,7 +178,7 @@ dependencies = [
[[package]] [[package]]
name = "nix" name = "nix"
version = "0.6.0" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -342,7 +342,7 @@ dependencies = [
"checksum mktemp 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77001ceb9eed65439f3dc2a2543f9ba1417d912686bf224a7738d0966e6dcd69" "checksum mktemp 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77001ceb9eed65439f3dc2a2543f9ba1417d912686bf224a7738d0966e6dcd69"
"checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2" "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2"
"checksum nix 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bfb3ddedaa14746434a02041940495bf11325c22f6d36125d3bdd56090d50a79" "checksum nix 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bfb3ddedaa14746434a02041940495bf11325c22f6d36125d3bdd56090d50a79"
"checksum nix 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a7bb1da2be7da3cbffda73fc681d509ffd9e665af478d2bee1907cee0bc64b2" "checksum nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a0d95c5fa8b641c10ad0b8887454ebaafa3c92b5cd5350f8fc693adafd178e7b"
"checksum notify 2.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4e0e7eec936337952c4228b023007528a33b2fa039d96c2e8f32d764221a9c07" "checksum notify 2.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4e0e7eec936337952c4228b023007528a33b2fa039d96c2e8f32d764221a9c07"
"checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5" "checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5"
"checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b" "checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b"

View File

@ -35,7 +35,7 @@ default-features = false
features = [] features = []
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
nix = "0.6.0" nix = "0.7.0"
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
winapi = "0.2.8" winapi = "0.2.8"

View File

@ -15,8 +15,8 @@ pub fn install() -> Receiver<()> {
use nix::sys::signal::{SigSet, SIGTERM, SIGINT}; use nix::sys::signal::{SigSet, SIGTERM, SIGINT};
let mut mask = SigSet::empty(); let mut mask = SigSet::empty();
mask.add(SIGTERM).expect("unable to add SIGTERM to mask"); mask.add(SIGTERM);
mask.add(SIGINT).expect("unable to add SIGINT to mask"); mask.add(SIGINT);
mask.thread_set_mask().expect("unable to set signal mask"); mask.thread_set_mask().expect("unable to set signal mask");
let rx = create_channel(); let rx = create_channel();

View File

@ -130,8 +130,10 @@ fn main() {
cli::clear_screen(); cli::clear_screen();
} }
Process::new(&cmd, vec![]) Process::new(&cmd, vec![]).ok()
} else { None }; } else {
None
};
while !interrupt_handler::interrupt_requested() { while !interrupt_handler::interrupt_requested() {
if let Some(paths) = wait(&rx, &interrupt_rx, &filter) { if let Some(paths) = wait(&rx, &interrupt_rx, &filter) {
@ -157,7 +159,7 @@ fn main() {
cli::clear_screen(); cli::clear_screen();
} }
child_process = Process::new(&cmd, updated); child_process = Process::new(&cmd, updated).ok();
} }
} }
} }

View File

@ -1,96 +1,168 @@
use threadpool::ThreadPool; use threadpool::ThreadPool;
use std::process::{Child, Command}; use std::sync::mpsc::Sender;
use std::sync::mpsc::{Sender};
pub struct Process { pub use self::imp::*;
process: Child,
killed: bool
}
#[cfg(target_family = "unix")] #[cfg(target_family = "unix")]
impl Process { mod imp {
pub fn new(cmd: &str, updated_paths: Vec<&str>) -> Option<Process>{ use std::io::Result;
use libc; use std::process::Command;
use std::os::unix::process::CommandExt;
let mut command = Command::new("sh"); pub struct Process {
command.arg("-c").arg(cmd); pid: i32,
killed: bool,
if !updated_paths.is_empty() {
command.env("WATCHEXEC_UPDATED_PATH", updated_paths[0]);
}
command.before_exec(|| unsafe {
libc::setpgid(0, 0);
Ok(())
})
.spawn()
.ok()
.and_then(|p| Some(Process { process: p, killed: false }))
} }
pub fn kill(&mut self) { impl Process {
if self.killed { pub fn new(cmd: &str, updated_paths: Vec<&str>) -> Result<Process> {
return; use std::io;
use std::os::unix::process::CommandExt;
use nix::unistd::setpgid;
let mut command = Command::new("sh");
command.arg("-c").arg(cmd);
if !updated_paths.is_empty() {
command.env("WATCHEXEC_UPDATED_PATH", updated_paths[0]);
}
command.before_exec(|| setpgid(0, 0).map_err(io::Error::from))
.spawn()
.and_then(|p| {
Ok(Process {
pid: p.id() as i32,
killed: false,
})
})
} }
use libc; pub fn kill(&mut self) {
use libc;
extern "C" { if self.killed {
fn killpg(pgrp: libc::pid_t, sig: libc::c_int) -> libc::c_int; return;
}
extern "C" {
fn killpg(pgrp: libc::pid_t, sig: libc::c_int) -> libc::c_int;
}
unsafe {
killpg(self.pid, libc::SIGTERM);
}
self.killed = true;
} }
unsafe { pub fn wait(&mut self) {
killpg(self.process.id() as i32, libc::SIGTERM); use nix::sys::wait::waitpid;
}
self.killed = true; let _ = waitpid(-self.pid, None);
}
} }
pub fn wait(&mut self) { impl Drop for Process {
use nix::sys::wait::waitpid; fn drop(&mut self) {
self.kill();
let pid = self.process.id() as i32; }
let _ = waitpid(-pid, None);
} }
} }
#[cfg(target_family = "windows")] #[cfg(target_family = "windows")]
impl Process { mod imp {
pub fn new(cmd: &str, updated_paths: Vec<&str>) -> Option<Process> { use std::io;
let mut command = Command::new("cmd.exe"); use std::io::Result;
command.arg("/C").arg(cmd); use std::mem;
use std::process::Command;
use kernel32::*;
use winapi::*;
if !updated_paths.is_empty() { pub struct Process {
command.env("WATCHEXEC_UPDATED_PATH", updated_paths[0]); job: HANDLE,
killed: bool,
}
impl Process {
pub fn new(cmd: &str, updated_paths: Vec<&str>) -> Result<Process> {
use std::os::windows::io::IntoRawHandle;
fn last_err() -> io::Error {
io::Error::last_os_error()
}
let job = unsafe { CreateJobObjectW(0 as *mut _, 0 as *const _) };
if job.is_null() {
panic!("failed to create job object: {}", last_err());
}
let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { mem::zeroed() };
info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
let r = unsafe {
SetInformationJobObject(job,
JobObjectExtendedLimitInformation,
&mut info as *mut _ as LPVOID,
mem::size_of_val(&info) as DWORD)
};
if r == 0 {
panic!("failed to set job info: {}", last_err());
}
let mut command = Command::new("cmd.exe");
command.arg("/C").arg(cmd);
if !updated_paths.is_empty() {
command.env("WATCHEXEC_UPDATED_PATH", updated_paths[0]);
}
command.spawn()
.and_then(|p| {
let r = unsafe { AssignProcessToJobObject(job, p.into_raw_handle()) };
if r == 0 {
panic!("failed to add to job object: {}", last_err());
}
Ok(Process {
job: job,
killed: false,
})
})
} }
command.spawn() pub fn kill(&mut self) {
.ok() if self.killed {
.and_then(|p| { Some(Process { process: p, killed: false })}) return;
} }
pub fn kill(&mut self) { unsafe {
if self.killed { let _ = TerminateJobObject(self.job, 1);
return; }
self.killed = true;
} }
let _ = self.process.kill(); pub fn wait(&mut self) {
self.killed = true; unsafe {
let _ = WaitForSingleObject(self.job, INFINITE);
}
}
} }
pub fn wait(&mut self) { impl Drop for Process {
let _ = self.process.wait(); fn drop(&mut self) {
} unsafe {
} let _ = CloseHandle(self.job);
}
impl Drop for Process { }
fn drop(&mut self) {
self.kill();
} }
unsafe impl Send for Process {}
} }
/// Watches for child process death, notifying callers via a channel.
///
/// On Windows, we don't have SIGCHLD, and even if we did, we'd still need
/// to relay that over a channel.
pub struct ProcessReaper { pub struct ProcessReaper {
pool: ThreadPool, pool: ThreadPool,
tx: Sender<()>, tx: Sender<()>,
@ -100,15 +172,16 @@ impl ProcessReaper {
pub fn new(tx: Sender<()>) -> ProcessReaper { pub fn new(tx: Sender<()>) -> ProcessReaper {
ProcessReaper { ProcessReaper {
pool: ThreadPool::new(1), pool: ThreadPool::new(1),
tx: tx tx: tx,
} }
} }
pub fn wait_process(&self, mut process: Process) { pub fn wait_process(&self, mut process: imp::Process) {
let tx = self.tx.clone(); let tx = self.tx.clone();
self.pool.execute(move || { self.pool.execute(move || {
process.wait(); process.wait();
let _ = tx.send(()); let _ = tx.send(());
}); });
} }
@ -123,7 +196,7 @@ mod tests {
use mktemp::Temp; use mktemp::Temp;
use super::Process; use super::imp::Process;
fn file_contents(path: &Path) -> String { fn file_contents(path: &Path) -> String {
use std::fs::File; use std::fs::File;
@ -147,7 +220,8 @@ mod tests {
fn test_wait() { fn test_wait() {
let file = Temp::new_file().unwrap(); let file = Temp::new_file().unwrap();
let path = file.to_path_buf(); let path = file.to_path_buf();
let mut process = Process::new(&format!("echo hi > {}", path.to_str().unwrap()), vec![]).unwrap(); let mut process = Process::new(&format!("echo hi > {}", path.to_str().unwrap()), vec![])
.unwrap();
process.wait(); process.wait();
assert!(file_contents(&path).starts_with("hi")); assert!(file_contents(&path).starts_with("hi"));
@ -158,7 +232,9 @@ mod tests {
let file = Temp::new_file().unwrap(); let file = Temp::new_file().unwrap();
let path = file.to_path_buf(); let path = file.to_path_buf();
let mut process = Process::new(&format!("sleep 20; echo hi > {}", path.to_str().unwrap()), vec![]).unwrap(); let mut process = Process::new(&format!("sleep 20; echo hi > {}", path.to_str().unwrap()),
vec![])
.unwrap();
thread::sleep(Duration::from_millis(250)); thread::sleep(Duration::from_millis(250));
process.kill(); process.kill();
process.wait(); process.wait();