watchexec/src/process.rs

370 lines
12 KiB
Rust
Raw Normal View History

use std::path::PathBuf;
2017-04-06 22:31:52 +02:00
pub fn spawn(cmd: &str, updated_paths: Vec<PathBuf>, no_shell: bool) -> Process {
self::imp::Process::new(cmd, updated_paths, no_shell).expect("unable to spawn process")
2016-11-23 18:59:56 +01:00
}
pub use self::imp::Process;
#[cfg(target_family = "unix")]
mod imp {
use libc::*;
use std::io::Result;
use std::path::PathBuf;
use std::process::Command;
2016-12-20 17:44:18 +01:00
use std::sync::*;
use signal::Signal;
pub struct Process {
pgid: pid_t,
2016-12-20 17:44:18 +01:00
lock: Mutex<bool>,
cvar: Condvar,
}
2016-12-20 18:20:21 +01:00
#[allow(unknown_lints)]
#[allow(mutex_atomic)]
impl Process {
2017-04-06 22:31:52 +02:00
pub fn new(cmd: &str, updated_paths: Vec<PathBuf>, no_shell: bool) -> Result<Process> {
use nix::unistd::*;
use std::io;
use std::os::unix::process::CommandExt;
2017-04-06 22:31:52 +02:00
// Assemble command to run.
// This is either the first argument from cmd (if no_shell was given) or "sh".
// Using "sh -c" gives us features like supportin pipes and redirects,
// but is a little less performant and can cause trouble when using custom signals
// (e.g. --signal SIGHUP)
let mut command = if no_shell {
let mut split = cmd.split_whitespace();
let mut command = Command::new(split.next().unwrap());
command.args(split);
command
} else {
let mut command = Command::new("sh");
command.arg("-c").arg(cmd);
command
2017-04-06 22:31:52 +02:00
};
debug!("Assembled command {:?}", command);
if let Some(single_path) = super::get_single_updated_path(&updated_paths) {
command.env("WATCHEXEC_UPDATED_PATH", single_path);
}
if let Some(common_path) = super::get_longest_common_path(&updated_paths) {
command.env("WATCHEXEC_COMMON_PATH", common_path);
}
2017-03-23 23:39:38 +01:00
command
.before_exec(|| setpgid(0, 0).map_err(io::Error::from))
.spawn()
.and_then(|p| {
2017-03-23 23:39:38 +01:00
Ok(Process {
pgid: p.id() as i32,
lock: Mutex::new(false),
cvar: Condvar::new(),
})
})
}
2016-12-20 17:44:18 +01:00
pub fn reap(&self) {
use nix::sys::wait::*;
let mut finished = true;
loop {
match waitpid(-self.pgid, Some(WNOHANG)) {
2016-12-20 18:20:21 +01:00
Ok(WaitStatus::Exited(_, _)) |
2016-12-20 17:44:18 +01:00
Ok(WaitStatus::Signaled(_, _, _)) => finished = finished && true,
Ok(_) => {
finished = false;
break;
}
2016-12-20 18:20:21 +01:00
Err(_) => break,
2016-12-20 17:44:18 +01:00
}
}
if finished {
let mut done = self.lock.lock().unwrap();
*done = true;
self.cvar.notify_one();
}
}
2016-12-15 02:19:58 +01:00
pub fn signal(&self, signal: Signal) {
use signal::ConvertToLibc;
let signo = signal.convert_to_libc();
debug!("Sending {:?} (int: {}) to child process", signal, signo);
self.c_signal(signo);
}
fn c_signal(&self, sig: c_int) {
extern "C" {
fn killpg(pgrp: pid_t, sig: c_int) -> c_int;
}
unsafe {
2016-12-15 02:19:58 +01:00
killpg(self.pgid, sig);
}
2016-12-15 02:19:58 +01:00
}
2016-11-09 15:44:00 +01:00
pub fn wait(&self) {
2016-12-20 17:44:18 +01:00
let mut done = self.lock.lock().unwrap();
while !*done {
done = self.cvar.wait(done).unwrap();
}
}
}
}
#[cfg(target_family = "windows")]
mod imp {
use std::io;
use std::io::Result;
use std::mem;
use std::path::PathBuf;
use std::process::Command;
use std::ptr;
use kernel32::*;
use winapi::*;
2017-03-14 17:30:19 +01:00
use signal::Signal;
pub struct Process {
job: HANDLE,
completion_port: HANDLE,
}
#[repr(C)]
struct JOBOBJECT_ASSOCIATE_COMPLETION_PORT {
completion_key: PVOID,
completion_port: HANDLE,
}
impl Process {
2017-04-06 23:04:57 +02:00
pub fn new(cmd: &str, updated_paths: Vec<PathBuf>, no_shell: bool) -> Result<Process> {
use std::os::windows::io::IntoRawHandle;
use std::os::windows::process::CommandExt;
fn last_err() -> io::Error {
io::Error::last_os_error()
}
let job = unsafe { CreateJobObjectW(0 as *mut _, 0 as *const _) };
if job.is_null() {
panic!("failed to create job object: {}", last_err());
}
let completion_port = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, ptr::null_mut(), 0, 1) };
if job.is_null() {
panic!("unable to create IO completion port for job: {}", last_err());
}
let mut associate_completion: JOBOBJECT_ASSOCIATE_COMPLETION_PORT = unsafe { mem::zeroed() };
associate_completion.completion_key = job;
associate_completion.completion_port = completion_port;
unsafe {
let r = SetInformationJobObject(job,
JobObjectAssociateCompletionPortInformation,
&mut associate_completion as *mut _ as LPVOID,
mem::size_of_val(&associate_completion) as DWORD);
if r == 0 {
panic!("failed to associate completion port with job: {}", last_err());
}
}
let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { mem::zeroed() };
info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
let r = unsafe {
SetInformationJobObject(job,
JobObjectExtendedLimitInformation,
&mut info as *mut _ as LPVOID,
mem::size_of_val(&info) as DWORD)
};
if r == 0 {
panic!("failed to set job info: {}", last_err());
}
2017-04-06 23:04:57 +02:00
let mut iter_args = cmd.split_whitespace();
let arg0 = match no_shell {
true => iter_args.next().unwrap(),
false => "cmd.exe",
};
// TODO: There might be a better way of doing this with &str.
// I've had to fall back to String, as I wasn't able to join(" ") a Vec<&str>
// into a &str
let args: Vec<String> = match no_shell {
true => iter_args.map(str::to_string).collect(),
false => vec!["/C".to_string(), iter_args.collect::<Vec<&str>>().join(" ")],
};
let mut command = Command::new(arg0);
command.args(args);
command.creation_flags(CREATE_SUSPENDED);
2017-04-06 23:04:57 +02:00
debug!("Assembled command {:?}", command);
if let Some(single_path) = super::get_single_updated_path(&updated_paths) {
command.env("WATCHEXEC_UPDATED_PATH", single_path);
}
if let Some(common_path) = super::get_longest_common_path(&updated_paths) {
command.env("WATCHEXEC_COMMON_PATH", common_path);
}
2017-03-23 23:39:38 +01:00
command
.spawn()
.and_then(|p| {
let handle = p.into_raw_handle();
let r = unsafe { AssignProcessToJobObject(job, handle) };
2017-03-23 23:39:38 +01:00
if r == 0 {
panic!("failed to add to job object: {}", last_err());
}
resume_threads(handle);
Ok(Process { job: job, completion_port: completion_port })
2017-03-23 23:39:38 +01:00
})
}
2016-12-20 18:20:21 +01:00
pub fn reap(&self) {}
2016-12-20 17:44:18 +01:00
pub fn signal(&self, _signal: Signal) {
unsafe {
let _ = TerminateJobObject(self.job, 1);
}
}
2016-11-09 15:44:00 +01:00
pub fn wait(&self) {
unsafe {
loop {
let mut code: DWORD = 0;
let mut key: ULONG_PTR = 0;
let mut overlapped: LPOVERLAPPED = mem::uninitialized();
GetQueuedCompletionStatus(self.completion_port, &mut code, &mut key, &mut overlapped, INFINITE);
if code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO && (key as HANDLE) == self.job {
break;
}
}
}
}
}
impl Drop for Process {
2016-11-09 23:20:13 +01:00
fn drop(&mut self) {
unsafe {
let _ = CloseHandle(self.job);
let _ = CloseHandle(self.completion_port);
}
}
}
unsafe impl Send for Process {}
2016-11-09 15:44:00 +01:00
unsafe impl Sync for Process {}
// This is pretty terrible, but it's either this or we re-implement all of Rust's std::process just to get at PROCESS_INFORMATION
fn resume_threads(child_process: HANDLE) {
unsafe {
let child_id = GetProcessId(child_process);
let h = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0);
let mut entry = THREADENTRY32 { dwSize: 28, cntUsage: 0, th32ThreadID: 0, th32OwnerProcessID: 0, tpBasePri: 0, tpDeltaPri: 0, dwFlags: 0};
let mut result = Thread32First(h, &mut entry);
while result != 0 {
if entry.th32OwnerProcessID == child_id {
let thread_handle = OpenThread(0x0002, 0, entry.th32ThreadID);
ResumeThread(thread_handle);
CloseHandle(thread_handle);
}
result = Thread32Next(h, &mut entry);
}
CloseHandle(h);
}
}
}
2016-10-30 17:37:34 +01:00
fn get_single_updated_path(paths: &[PathBuf]) -> Option<&str> {
paths.get(0).and_then(|p| p.to_str())
}
fn get_longest_common_path(paths: &[PathBuf]) -> Option<String> {
match paths.len() {
0 => return None,
1 => return paths[0].to_str().map(|ref_val| ref_val.to_string()),
_ => {}
};
let mut longest_path: Vec<_> = paths[0].components().collect();
for path in &paths[1..] {
let mut greatest_distance = 0;
for component_pair in path.components().zip(longest_path.iter()) {
if component_pair.0 != *component_pair.1 {
break;
}
greatest_distance += 1;
}
if greatest_distance != longest_path.len() {
longest_path.truncate(greatest_distance);
}
}
let mut result = PathBuf::new();
for component in longest_path {
result.push(component.as_os_str());
}
result.to_str().map(|ref_val| ref_val.to_string())
}
#[cfg(test)]
#[cfg(target_family = "unix")]
mod tests {
use std::path::PathBuf;
2016-11-23 18:59:56 +01:00
use super::spawn;
use super::get_longest_common_path;
#[test]
fn test_start() {
2017-04-06 23:05:53 +02:00
let _ = spawn("echo hi", vec![], true);
}
#[test]
fn longest_common_path_should_return_correct_value() {
let single_path = vec![PathBuf::from("/tmp/random/")];
let single_result = get_longest_common_path(&single_path).unwrap();
assert_eq!(single_result, "/tmp/random/");
2016-10-30 17:28:54 +01:00
let common_paths = vec![PathBuf::from("/tmp/logs/hi"),
PathBuf::from("/tmp/logs/bye"),
PathBuf::from("/tmp/logs/bye"),
PathBuf::from("/tmp/logs/fly")];
let common_result = get_longest_common_path(&common_paths).unwrap();
assert_eq!(common_result, "/tmp/logs");
2016-10-30 17:28:54 +01:00
let diverging_paths = vec![PathBuf::from("/tmp/logs/hi"), PathBuf::from("/var/logs/hi")];
let diverging_result = get_longest_common_path(&diverging_paths).unwrap();
assert_eq!(diverging_result, "/");
2016-10-30 17:28:54 +01:00
let uneven_paths = vec![PathBuf::from("/tmp/logs/hi"),
PathBuf::from("/tmp/logs/"),
PathBuf::from("/tmp/logs/bye")];
let uneven_result = get_longest_common_path(&uneven_paths).unwrap();
assert_eq!(uneven_result, "/tmp/logs");
}
}