Remove use of select!

This commit is contained in:
Matt Green 2016-11-09 09:44:00 -05:00
parent 4d03519631
commit cf0a98b7a5
4 changed files with 115 additions and 199 deletions

54
src/interrupt.rs Normal file
View File

@ -0,0 +1,54 @@
#[cfg(unix)]
pub fn install_handler<F>(handler: F)
where F: Fn() + 'static + Send + Sync {
use std::thread;
use nix::sys::signal::*;
// Mask all termination signals
// These propagate to all threads started after this point
let mut mask = SigSet::empty();
mask.add(SIGTERM);
mask.add(SIGINT);
mask.thread_set_mask().expect("unable to set signal mask");
// Spawn a thread to catch these signals
thread::spawn(move || {
let sig = mask.wait().expect("unable to sigwait");
// Invoke closure
handler();
// Restore default behavior for received signal and unmask it
unsafe {
let _ = sigaction(sig, &SigAction::new(SigHandler::SigDfl, SaFlags::empty(), SigSet::empty()));
}
let mut new_mask = SigSet::empty();
new_mask.add(sig);
let _ = new_mask.thread_unblock();
// Re-raise, killing the process
let _ = raise(sig);
});
}
/// On Windows, use SetConsoleCtrlHandler() to send an interrupt
/// SetConsoleCtrlHandler runs in it's own thread, so it's safe.
#[cfg(windows)]
pub fn install() -> Receiver<()> {
use kernel32::SetConsoleCtrlHandler;
use winapi::{BOOL, DWORD, TRUE};
pub unsafe extern "system" fn ctrl_handler(_: DWORD) -> BOOL {
let _ = send_interrupt();
TRUE
}
let rx = create_channel();
unsafe {
SetConsoleCtrlHandler(Some(ctrl_handler), TRUE);
}
rx
}

View File

@ -1,82 +0,0 @@
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender, SendError};
lazy_static! {
static ref INTERRUPT_TX: Mutex<Option<Sender<()>>> = Mutex::new(None);
static ref INTERRUPT_REQUESTED: AtomicBool = AtomicBool::new(false);
}
/// On Unix platforms, mask reception of SIGINT/SIGTERM, spawn a thread,
/// and sigwait on those signals to safely relay them.
#[cfg(unix)]
pub fn install() -> Receiver<()> {
use std::thread;
use nix::sys::signal::{SigSet, SIGTERM, SIGINT};
let mut mask = SigSet::empty();
mask.add(SIGTERM);
mask.add(SIGINT);
mask.thread_set_mask().expect("unable to set signal mask");
let rx = create_channel();
thread::spawn(move || {
loop {
let _ = mask.wait().expect("unable to sigwait");
let result = send_interrupt();
if result.is_err() {
break;
}
}
});
rx
}
/// On Windows, use SetConsoleCtrlHandler() to send an interrupt
/// SetConsoleCtrlHandler runs in it's own thread, so it's safe.
#[cfg(windows)]
pub fn install() -> Receiver<()> {
use kernel32::SetConsoleCtrlHandler;
use winapi::{BOOL, DWORD, TRUE};
pub unsafe extern "system" fn ctrl_handler(_: DWORD) -> BOOL {
let _ = send_interrupt();
TRUE
}
let rx = create_channel();
unsafe {
SetConsoleCtrlHandler(Some(ctrl_handler), TRUE);
}
rx
}
pub fn interrupt_requested() -> bool {
INTERRUPT_REQUESTED.load(Ordering::Relaxed)
}
fn create_channel() -> Receiver<()> {
let mut guard = INTERRUPT_TX.lock().unwrap();
if (*guard).is_some() {
panic!("interrupt_handler::install() already called!");
}
let (tx, rx) = channel();
(*guard) = Some(tx);
rx
}
fn send_interrupt() -> Result<(), SendError<()>> {
INTERRUPT_REQUESTED.store(true, Ordering::Relaxed);
if let Some(ref mut tx) = *INTERRUPT_TX.lock().unwrap() {
tx.send(())
} else {
Err(SendError(()))
}
}

View File

@ -1,4 +1,3 @@
#![feature(mpsc_select)]
#![feature(process_exec)]
#[macro_use]
@ -24,19 +23,20 @@ extern crate mktemp;
mod cli;
mod gitignore;
mod interrupt_handler;
mod interrupt;
mod notification_filter;
mod process;
mod watcher;
use std::collections::HashMap;
use std::env;
use std::sync::{Arc, RwLock};
use std::sync::mpsc::{channel, Receiver};
use std::time::Duration;
use std::path::{Path, PathBuf};
use notification_filter::NotificationFilter;
use process::{Process, ProcessReaper};
use process::{Process};
use watcher::{Event, Watcher};
fn find_gitignore(path: &Path) -> Option<PathBuf> {
@ -79,7 +79,19 @@ fn init_logger(debug: bool) {
}
fn main() {
let interrupt_rx = interrupt_handler::install();
let child_process: Arc<RwLock<Option<Process>>> = Arc::new(RwLock::new(None));
let weak_child = Arc::downgrade(&child_process);
interrupt::install_handler(move || {
if let Some(lock) = weak_child.upgrade() {
let strong = lock.read().unwrap();
if let Some(ref child) = *strong {
child.kill();
child.wait();
}
}
});
let args = cli::get_args();
init_logger(args.debug);
@ -119,89 +131,74 @@ fn main() {
}
}
let cmd = args.cmd;
let (child_finish_tx, child_finish_rx) = channel();
let reaper = ProcessReaper::new(child_finish_tx);
let mut child_process = if args.run_initially {
if args.clear_screen {
// Start child process initially, if necessary
if args.run_initially {
if args.clear_screen {
cli::clear_screen();
}
Process::new(&cmd, vec![]).ok()
} else {
None
};
let mut guard = child_process.write().unwrap();
*guard = Process::new(&args.cmd, vec![]).ok();
}
while !interrupt_handler::interrupt_requested() {
if let Some(paths) = wait(&rx, &interrupt_rx, &filter) {
loop {
let paths = wait(&rx, &filter);
if let Some(path) = paths.get(0) {
debug!("Path updated: {:?}", path);
}
if let Some(path) = paths.get(0) {
debug!("Path updated: {:?}", path);
}
//. Wait for current child process to exit
{
let guard = child_process.read().unwrap();
if let Some(mut child) = child_process {
if let Some(ref child) = *guard {
if args.restart {
debug!("Killing child process");
child.kill();
}
debug!("Waiting for process to exit...");
reaper.wait_process(child);
select! {
_ = child_finish_rx.recv() => {},
_ = interrupt_rx.recv() => break
};
child.wait();
}
}
if args.clear_screen {
cli::clear_screen();
}
// Launch child process
if args.clear_screen {
cli::clear_screen();
}
child_process = Process::new(&cmd, paths).ok();
{
let mut lock = child_process.write().unwrap();
*lock = Process::new(&args.cmd, paths).ok();
}
}
}
fn wait(rx: &Receiver<Event>,
interrupt_rx: &Receiver<()>,
filter: &NotificationFilter)
-> Option<Vec<PathBuf>> {
fn wait(rx: &Receiver<Event>, filter: &NotificationFilter) -> Vec<PathBuf> {
let mut paths = vec![];
let mut cache = HashMap::new();
loop {
select! {
_ = interrupt_rx.recv() => { return None; },
ev = rx.recv() => {
let e = ev.expect("error when reading event");
let e = rx.recv().expect("error when reading event");
if let Some(ref path) = e.path {
// Ignore cache for the initial file. Otherwise, in
// debug mode it's hard to track what's going on
let excluded = filter.is_excluded(path);
if !cache.contains_key(path) {
cache.insert(path.to_owned(), excluded);
}
if !excluded {
paths.push(path.to_owned());
break;
}
}
if let Some(ref path) = e.path {
// Ignore cache for the initial file. Otherwise, in
// debug mode it's hard to track what's going on
let excluded = filter.is_excluded(path);
if !cache.contains_key(path) {
cache.insert(path.to_owned(), excluded);
}
};
if !excluded {
paths.push(path.to_owned());
break;
}
}
}
// Wait for filesystem activity to cool off
// Unfortunately, we can't use select! with recv_timeout :(
let timeout = Duration::from_millis(500);
while let Ok(e) = rx.recv_timeout(timeout) {
if interrupt_handler::interrupt_requested() {
break;
}
if let Some(ref path) = e.path {
if cache.contains_key(path) {
continue;
@ -218,5 +215,5 @@ fn wait(rx: &Receiver<Event>,
}
}
Some(paths)
paths
}

View File

@ -1,6 +1,4 @@
use std::path::PathBuf;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;
pub use self::imp::*;
@ -12,7 +10,6 @@ mod imp {
pub struct Process {
pid: i32,
killed: bool,
}
impl Process {
@ -37,18 +34,13 @@ mod imp {
.and_then(|p| {
Ok(Process {
pid: p.id() as i32,
killed: false,
})
})
}
pub fn kill(&mut self) {
pub fn kill(&self) {
use libc;
if self.killed {
return;
}
extern "C" {
fn killpg(pgrp: libc::pid_t, sig: libc::c_int) -> libc::c_int;
}
@ -56,22 +48,14 @@ mod imp {
unsafe {
killpg(self.pid, libc::SIGTERM);
}
self.killed = true;
}
pub fn wait(&mut self) {
pub fn wait(&self) {
use nix::sys::wait::waitpid;
let _ = waitpid(-self.pid, None);
}
}
impl Drop for Process {
fn drop(&mut self) {
self.kill();
}
}
}
#[cfg(target_family = "windows")]
@ -86,7 +70,6 @@ mod imp {
pub struct Process {
job: HANDLE,
killed: bool,
}
impl Process {
@ -134,24 +117,17 @@ mod imp {
Ok(Process {
job: job,
killed: false,
})
})
}
pub fn kill(&mut self) {
if self.killed {
return;
}
pub fn kill(&self) {
unsafe {
let _ = TerminateJobObject(self.job, 1);
}
self.killed = true;
}
pub fn wait(&mut self) {
pub fn wait(&self) {
unsafe {
let _ = WaitForSingleObject(self.job, INFINITE);
}
@ -159,7 +135,7 @@ mod imp {
}
impl Drop for Process {
fn drop(&mut self) {
fn drop(&self) {
unsafe {
let _ = CloseHandle(self.job);
}
@ -167,36 +143,7 @@ mod imp {
}
unsafe impl Send for Process {}
}
/// Watches for child process death, notifying callers via a channel.
///
/// On Windows, we don't have SIGCHLD, and even if we did, we'd still need
/// to relay that over a channel.
pub struct ProcessReaper {
processes_tx: Sender<Process>,
}
impl ProcessReaper {
pub fn new(tx: Sender<()>) -> ProcessReaper {
let (processes_tx, processes_rx): (Sender<Process>, Receiver<Process>) = channel();
thread::spawn(move || {
loop {
while let Ok(mut process) = processes_rx.recv() {
process.wait();
let _ = tx.send(());
}
}
});
ProcessReaper { processes_tx: processes_tx }
}
pub fn wait_process(&self, process: imp::Process) {
let _ = self.processes_tx.send(process);
}
unsafe impl Sync for Process {}
}
fn get_single_updated_path(paths: &[PathBuf]) -> Option<&str> {