From 0e0d14b475030eb47cd6d388178f1ccfdaed5d4c Mon Sep 17 00:00:00 2001 From: Matt Green Date: Mon, 24 Oct 2016 09:53:06 -0400 Subject: [PATCH] Wait for FS activity to stop before proceeding; use interrupt handling --- src/main.rs | 82 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9ce4040..8404834 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +#![feature(mpsc_select)] #![feature(process_exec)] #[macro_use] @@ -24,8 +25,9 @@ mod notification_filter; mod runner; mod watcher; -use std::sync::mpsc::{channel, Receiver, RecvError}; -use std::{env, thread, time}; +use std::env; +use std::sync::mpsc::{channel, Receiver}; +use std::time::Duration; use std::path::{Path, PathBuf}; use notification_filter::NotificationFilter; @@ -66,6 +68,7 @@ fn init_logger(debug: bool) { } fn main() { + let interrupt_rx = interrupt_handler::install(); let args = args::get_args(); init_logger(args.debug); @@ -121,38 +124,53 @@ fn main() { } while !interrupt_handler::interrupt_requested() { - let e = wait(&rx, &filter).expect("error when waiting for filesystem changes"); + match wait(&rx, &interrupt_rx, &filter) { + Some(paths) => { + let updated = paths + .iter() + .map(|p| p.to_str().unwrap()) + .collect(); - debug!("{:?}: {:?}", e.op, e.path); - - // TODO: update wait to return all paths - let updated: Vec<&str> = e.path - .iter() - .map(|p| p.to_str().unwrap()) - .collect(); - - runner.run_command(&cmd, updated); - } -} - -fn wait(rx: &Receiver, filter: &NotificationFilter) -> Result { - loop { - // Block on initial notification - let e = try!(rx.recv()); - if let Some(ref path) = e.path { - if filter.is_excluded(path) { - continue; + runner.run_command(&cmd, updated); + }, + None => { + // interrupted } } - - // Accumulate subsequent events - thread::sleep(time::Duration::from_millis(250)); - - // Drain rx buffer and drop them - while let Ok(_) = rx.try_recv() { - // nothing to do here - } - - return Ok(e); } } + +fn wait(rx: &Receiver, interrupt_rx: &Receiver<()>, filter: &NotificationFilter) -> Option> { + let mut paths = vec![]; + + loop { + select! { + _ = interrupt_rx.recv() => { return None; }, + ev = rx.recv() => { + let e = ev.expect("error when reading event"); + + if let Some(ref path) = e.path { + if !filter.is_excluded(path) { + paths.push(path.to_owned()); + break; + } + } + } + }; + } + + // Wait for filesystem activity to cool off + // Unfortunately, we can't use select! with recv_timeout :( + let timeout = Duration::from_millis(500); + while let Ok(e) = rx.recv_timeout(timeout) { + if interrupt_handler::interrupt_requested() { + break; + } + + if let Some(ref path) = e.path { + paths.push(path.to_owned()); + } + } + + Some(paths) +}