From 3fc501c4f073dd3d705e6f8da7e2cfd7804e331b Mon Sep 17 00:00:00 2001 From: Matt Green Date: Thu, 18 May 2017 13:22:46 -0400 Subject: [PATCH] Windows: fix waiting for job to finish; closes #55 --- src/process.rs | 75 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 5 deletions(-) diff --git a/src/process.rs b/src/process.rs index a5393e8..25f7bb4 100644 --- a/src/process.rs +++ b/src/process.rs @@ -125,17 +125,26 @@ mod imp { use std::mem; use std::path::PathBuf; use std::process::Command; + use std::ptr; use kernel32::*; use winapi::*; use signal::Signal; pub struct Process { job: HANDLE, + completion_port: HANDLE, + } + + #[repr(C)] + struct JOBOBJECT_ASSOCIATE_COMPLETION_PORT { + completion_key: PVOID, + completion_port: HANDLE, } impl Process { pub fn new(cmd: &str, updated_paths: Vec, no_shell: bool) -> Result { use std::os::windows::io::IntoRawHandle; + use std::os::windows::process::CommandExt; fn last_err() -> io::Error { io::Error::last_os_error() @@ -146,6 +155,24 @@ mod imp { panic!("failed to create job object: {}", last_err()); } + let completion_port = unsafe { CreateIoCompletionPort(INVALID_HANDLE_VALUE, ptr::null_mut(), 0, 1) }; + if job.is_null() { + panic!("unable to create IO completion port for job: {}", last_err()); + } + + let mut associate_completion: JOBOBJECT_ASSOCIATE_COMPLETION_PORT = unsafe { mem::zeroed() }; + associate_completion.completion_key = job; + associate_completion.completion_port = completion_port; + unsafe { + let r = SetInformationJobObject(job, + JobObjectAssociateCompletionPortInformation, + &mut associate_completion as *mut _ as LPVOID, + mem::size_of_val(&associate_completion) as DWORD); + if r == 0 { + panic!("failed to associate completion port with job: {}", last_err()); + } + } + let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = unsafe { mem::zeroed() }; info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; let r = unsafe { @@ -158,6 +185,7 @@ mod imp { panic!("failed to set job info: {}", last_err()); } + let mut iter_args = cmd.split_whitespace(); let arg0 = match no_shell { true => iter_args.next().unwrap(), @@ -174,6 +202,7 @@ mod imp { let mut command = Command::new(arg0); command.args(args); + command.creation_flags(CREATE_SUSPENDED); debug!("Assembled command {:?}", command); if let Some(single_path) = super::get_single_updated_path(&updated_paths) { @@ -187,19 +216,21 @@ mod imp { command .spawn() .and_then(|p| { - let r = unsafe { AssignProcessToJobObject(job, p.into_raw_handle()) }; + let handle = p.into_raw_handle(); + let r = unsafe { AssignProcessToJobObject(job, handle) }; if r == 0 { panic!("failed to add to job object: {}", last_err()); } - Ok(Process { job: job }) + resume_threads(handle); + + Ok(Process { job: job, completion_port: completion_port }) }) } pub fn reap(&self) {} - pub fn signal(&self, signal: Signal) { - debug!("Ignoring signal {:?} (not supported by Windows)", signal); + pub fn signal(&self, _signal: Signal) { unsafe { let _ = TerminateJobObject(self.job, 1); } @@ -207,7 +238,16 @@ mod imp { pub fn wait(&self) { unsafe { - let _ = WaitForSingleObject(self.job, INFINITE); + loop { + let mut code: DWORD = 0; + let mut key: ULONG_PTR = 0; + let mut overlapped: LPOVERLAPPED = mem::uninitialized(); + GetQueuedCompletionStatus(self.completion_port, &mut code, &mut key, &mut overlapped, INFINITE); + + if code == JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO && (key as HANDLE) == self.job { + break; + } + } } } } @@ -216,12 +256,36 @@ mod imp { fn drop(&mut self) { unsafe { let _ = CloseHandle(self.job); + let _ = CloseHandle(self.completion_port); } } } unsafe impl Send for Process {} unsafe impl Sync for Process {} + + // This is pretty terrible, but it's either this or we re-implement all of Rust's std::process just to get at PROCESS_INFORMATION + fn resume_threads(child_process: HANDLE) { + unsafe { + let child_id = GetProcessId(child_process); + + let h = CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, 0); + let mut entry = THREADENTRY32 { dwSize: 28, cntUsage: 0, th32ThreadID: 0, th32OwnerProcessID: 0, tpBasePri: 0, tpDeltaPri: 0, dwFlags: 0}; + + let mut result = Thread32First(h, &mut entry); + while result != 0 { + if entry.th32OwnerProcessID == child_id { + let thread_handle = OpenThread(0x0002, 0, entry.th32ThreadID); + ResumeThread(thread_handle); + CloseHandle(thread_handle); + } + + result = Thread32Next(h, &mut entry); + } + + CloseHandle(h); + } + } } fn get_single_updated_path(paths: &[PathBuf]) -> Option<&str> { @@ -302,3 +366,4 @@ mod tests { assert_eq!(uneven_result, "/tmp/logs"); } } +