diff --git a/src/interrupt.rs b/src/interrupt.rs new file mode 100644 index 0000000..18034ae --- /dev/null +++ b/src/interrupt.rs @@ -0,0 +1,54 @@ +#[cfg(unix)] +pub fn install_handler(handler: F) + where F: Fn() + 'static + Send + Sync { + + use std::thread; + use nix::sys::signal::*; + + // Mask all termination signals + // These propagate to all threads started after this point + let mut mask = SigSet::empty(); + mask.add(SIGTERM); + mask.add(SIGINT); + mask.thread_set_mask().expect("unable to set signal mask"); + + // Spawn a thread to catch these signals + thread::spawn(move || { + let sig = mask.wait().expect("unable to sigwait"); + + // Invoke closure + handler(); + + // Restore default behavior for received signal and unmask it + unsafe { + let _ = sigaction(sig, &SigAction::new(SigHandler::SigDfl, SaFlags::empty(), SigSet::empty())); + } + + let mut new_mask = SigSet::empty(); + new_mask.add(sig); + let _ = new_mask.thread_unblock(); + + // Re-raise, killing the process + let _ = raise(sig); + }); +} + +/// On Windows, use SetConsoleCtrlHandler() to send an interrupt +/// SetConsoleCtrlHandler runs in it's own thread, so it's safe. +#[cfg(windows)] +pub fn install() -> Receiver<()> { + use kernel32::SetConsoleCtrlHandler; + use winapi::{BOOL, DWORD, TRUE}; + + pub unsafe extern "system" fn ctrl_handler(_: DWORD) -> BOOL { + let _ = send_interrupt(); + TRUE + } + + let rx = create_channel(); + unsafe { + SetConsoleCtrlHandler(Some(ctrl_handler), TRUE); + } + + rx +} diff --git a/src/interrupt_handler.rs b/src/interrupt_handler.rs deleted file mode 100644 index 8044730..0000000 --- a/src/interrupt_handler.rs +++ /dev/null @@ -1,82 +0,0 @@ -use std::sync::Mutex; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender, SendError}; - -lazy_static! { - static ref INTERRUPT_TX: Mutex>> = Mutex::new(None); - static ref INTERRUPT_REQUESTED: AtomicBool = AtomicBool::new(false); -} - -/// On Unix platforms, mask reception of SIGINT/SIGTERM, spawn a thread, -/// and sigwait on those signals to safely relay them. -#[cfg(unix)] -pub fn install() -> Receiver<()> { - use std::thread; - use nix::sys::signal::{SigSet, SIGTERM, SIGINT}; - - let mut mask = SigSet::empty(); - mask.add(SIGTERM); - mask.add(SIGINT); - mask.thread_set_mask().expect("unable to set signal mask"); - - let rx = create_channel(); - - thread::spawn(move || { - loop { - let _ = mask.wait().expect("unable to sigwait"); - - let result = send_interrupt(); - if result.is_err() { - break; - } - } - }); - - rx -} - -/// On Windows, use SetConsoleCtrlHandler() to send an interrupt -/// SetConsoleCtrlHandler runs in it's own thread, so it's safe. -#[cfg(windows)] -pub fn install() -> Receiver<()> { - use kernel32::SetConsoleCtrlHandler; - use winapi::{BOOL, DWORD, TRUE}; - - pub unsafe extern "system" fn ctrl_handler(_: DWORD) -> BOOL { - let _ = send_interrupt(); - TRUE - } - - let rx = create_channel(); - unsafe { - SetConsoleCtrlHandler(Some(ctrl_handler), TRUE); - } - - rx -} - -pub fn interrupt_requested() -> bool { - INTERRUPT_REQUESTED.load(Ordering::Relaxed) -} - -fn create_channel() -> Receiver<()> { - let mut guard = INTERRUPT_TX.lock().unwrap(); - if (*guard).is_some() { - panic!("interrupt_handler::install() already called!"); - } - - let (tx, rx) = channel(); - (*guard) = Some(tx); - - rx -} - -fn send_interrupt() -> Result<(), SendError<()>> { - INTERRUPT_REQUESTED.store(true, Ordering::Relaxed); - - if let Some(ref mut tx) = *INTERRUPT_TX.lock().unwrap() { - tx.send(()) - } else { - Err(SendError(())) - } -} diff --git a/src/main.rs b/src/main.rs index e242589..e43668e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,3 @@ -#![feature(mpsc_select)] #![feature(process_exec)] #[macro_use] @@ -24,19 +23,20 @@ extern crate mktemp; mod cli; mod gitignore; -mod interrupt_handler; +mod interrupt; mod notification_filter; mod process; mod watcher; use std::collections::HashMap; use std::env; +use std::sync::{Arc, RwLock}; use std::sync::mpsc::{channel, Receiver}; use std::time::Duration; use std::path::{Path, PathBuf}; use notification_filter::NotificationFilter; -use process::{Process, ProcessReaper}; +use process::{Process}; use watcher::{Event, Watcher}; fn find_gitignore(path: &Path) -> Option { @@ -79,7 +79,19 @@ fn init_logger(debug: bool) { } fn main() { - let interrupt_rx = interrupt_handler::install(); + let child_process: Arc>> = Arc::new(RwLock::new(None)); + + let weak_child = Arc::downgrade(&child_process); + interrupt::install_handler(move || { + if let Some(lock) = weak_child.upgrade() { + let strong = lock.read().unwrap(); + if let Some(ref child) = *strong { + child.kill(); + child.wait(); + } + } + }); + let args = cli::get_args(); init_logger(args.debug); @@ -119,89 +131,74 @@ fn main() { } } - let cmd = args.cmd; - - let (child_finish_tx, child_finish_rx) = channel(); - let reaper = ProcessReaper::new(child_finish_tx); - - let mut child_process = if args.run_initially { - if args.clear_screen { + // Start child process initially, if necessary + if args.run_initially { + if args.clear_screen { cli::clear_screen(); } - Process::new(&cmd, vec![]).ok() - } else { - None - }; + let mut guard = child_process.write().unwrap(); + *guard = Process::new(&args.cmd, vec![]).ok(); + } - while !interrupt_handler::interrupt_requested() { - if let Some(paths) = wait(&rx, &interrupt_rx, &filter) { + loop { + let paths = wait(&rx, &filter); + if let Some(path) = paths.get(0) { + debug!("Path updated: {:?}", path); + } - if let Some(path) = paths.get(0) { - debug!("Path updated: {:?}", path); - } + //. Wait for current child process to exit + { + let guard = child_process.read().unwrap(); - if let Some(mut child) = child_process { + if let Some(ref child) = *guard { if args.restart { debug!("Killing child process"); child.kill(); } debug!("Waiting for process to exit..."); - reaper.wait_process(child); - select! { - _ = child_finish_rx.recv() => {}, - _ = interrupt_rx.recv() => break - }; + child.wait(); } + } - if args.clear_screen { - cli::clear_screen(); - } + // Launch child process + if args.clear_screen { + cli::clear_screen(); + } - child_process = Process::new(&cmd, paths).ok(); + { + let mut lock = child_process.write().unwrap(); + *lock = Process::new(&args.cmd, paths).ok(); } } } -fn wait(rx: &Receiver, - interrupt_rx: &Receiver<()>, - filter: &NotificationFilter) - -> Option> { +fn wait(rx: &Receiver, filter: &NotificationFilter) -> Vec { let mut paths = vec![]; let mut cache = HashMap::new(); loop { - select! { - _ = interrupt_rx.recv() => { return None; }, - ev = rx.recv() => { - let e = ev.expect("error when reading event"); + let e = rx.recv().expect("error when reading event"); - if let Some(ref path) = e.path { - // Ignore cache for the initial file. Otherwise, in - // debug mode it's hard to track what's going on - let excluded = filter.is_excluded(path); - if !cache.contains_key(path) { - cache.insert(path.to_owned(), excluded); - } - - if !excluded { - paths.push(path.to_owned()); - break; - } - } + if let Some(ref path) = e.path { + // Ignore cache for the initial file. Otherwise, in + // debug mode it's hard to track what's going on + let excluded = filter.is_excluded(path); + if !cache.contains_key(path) { + cache.insert(path.to_owned(), excluded); } - }; + + if !excluded { + paths.push(path.to_owned()); + break; + } + } } // Wait for filesystem activity to cool off - // Unfortunately, we can't use select! with recv_timeout :( let timeout = Duration::from_millis(500); while let Ok(e) = rx.recv_timeout(timeout) { - if interrupt_handler::interrupt_requested() { - break; - } - if let Some(ref path) = e.path { if cache.contains_key(path) { continue; @@ -218,5 +215,5 @@ fn wait(rx: &Receiver, } } - Some(paths) + paths } diff --git a/src/process.rs b/src/process.rs index 38e9c15..6a050f2 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,6 +1,4 @@ use std::path::PathBuf; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::thread; pub use self::imp::*; @@ -12,7 +10,6 @@ mod imp { pub struct Process { pid: i32, - killed: bool, } impl Process { @@ -37,18 +34,13 @@ mod imp { .and_then(|p| { Ok(Process { pid: p.id() as i32, - killed: false, }) }) } - pub fn kill(&mut self) { + pub fn kill(&self) { use libc; - if self.killed { - return; - } - extern "C" { fn killpg(pgrp: libc::pid_t, sig: libc::c_int) -> libc::c_int; } @@ -56,22 +48,14 @@ mod imp { unsafe { killpg(self.pid, libc::SIGTERM); } - - self.killed = true; } - pub fn wait(&mut self) { + pub fn wait(&self) { use nix::sys::wait::waitpid; let _ = waitpid(-self.pid, None); } } - - impl Drop for Process { - fn drop(&mut self) { - self.kill(); - } - } } #[cfg(target_family = "windows")] @@ -86,7 +70,6 @@ mod imp { pub struct Process { job: HANDLE, - killed: bool, } impl Process { @@ -134,24 +117,17 @@ mod imp { Ok(Process { job: job, - killed: false, }) }) } - pub fn kill(&mut self) { - if self.killed { - return; - } - + pub fn kill(&self) { unsafe { let _ = TerminateJobObject(self.job, 1); } - - self.killed = true; } - pub fn wait(&mut self) { + pub fn wait(&self) { unsafe { let _ = WaitForSingleObject(self.job, INFINITE); } @@ -159,7 +135,7 @@ mod imp { } impl Drop for Process { - fn drop(&mut self) { + fn drop(&self) { unsafe { let _ = CloseHandle(self.job); } @@ -167,36 +143,7 @@ mod imp { } unsafe impl Send for Process {} -} - -/// Watches for child process death, notifying callers via a channel. -/// -/// On Windows, we don't have SIGCHLD, and even if we did, we'd still need -/// to relay that over a channel. -pub struct ProcessReaper { - processes_tx: Sender, -} - -impl ProcessReaper { - pub fn new(tx: Sender<()>) -> ProcessReaper { - let (processes_tx, processes_rx): (Sender, Receiver) = channel(); - - thread::spawn(move || { - loop { - while let Ok(mut process) = processes_rx.recv() { - process.wait(); - - let _ = tx.send(()); - } - } - }); - - ProcessReaper { processes_tx: processes_tx } - } - - pub fn wait_process(&self, process: imp::Process) { - let _ = self.processes_tx.send(process); - } + unsafe impl Sync for Process {} } fn get_single_updated_path(paths: &[PathBuf]) -> Option<&str> {