Wait for FS activity to stop before proceeding; use interrupt handling

This commit is contained in:
Matt Green 2016-10-24 09:53:06 -04:00
parent d10b790c35
commit 0e0d14b475
1 changed files with 50 additions and 32 deletions

View File

@ -1,3 +1,4 @@
#![feature(mpsc_select)]
#![feature(process_exec)]
#[macro_use]
@ -24,8 +25,9 @@ mod notification_filter;
mod runner;
mod watcher;
use std::sync::mpsc::{channel, Receiver, RecvError};
use std::{env, thread, time};
use std::env;
use std::sync::mpsc::{channel, Receiver};
use std::time::Duration;
use std::path::{Path, PathBuf};
use notification_filter::NotificationFilter;
@ -66,6 +68,7 @@ fn init_logger(debug: bool) {
}
fn main() {
let interrupt_rx = interrupt_handler::install();
let args = args::get_args();
init_logger(args.debug);
@ -121,38 +124,53 @@ fn main() {
}
while !interrupt_handler::interrupt_requested() {
let e = wait(&rx, &filter).expect("error when waiting for filesystem changes");
match wait(&rx, &interrupt_rx, &filter) {
Some(paths) => {
let updated = paths
.iter()
.map(|p| p.to_str().unwrap())
.collect();
debug!("{:?}: {:?}", e.op, e.path);
// TODO: update wait to return all paths
let updated: Vec<&str> = e.path
.iter()
.map(|p| p.to_str().unwrap())
.collect();
runner.run_command(&cmd, updated);
}
}
fn wait(rx: &Receiver<Event>, filter: &NotificationFilter) -> Result<Event, RecvError> {
loop {
// Block on initial notification
let e = try!(rx.recv());
if let Some(ref path) = e.path {
if filter.is_excluded(path) {
continue;
runner.run_command(&cmd, updated);
},
None => {
// interrupted
}
}
// Accumulate subsequent events
thread::sleep(time::Duration::from_millis(250));
// Drain rx buffer and drop them
while let Ok(_) = rx.try_recv() {
// nothing to do here
}
return Ok(e);
}
}
fn wait(rx: &Receiver<Event>, interrupt_rx: &Receiver<()>, filter: &NotificationFilter) -> Option<Vec<PathBuf>> {
let mut paths = vec![];
loop {
select! {
_ = interrupt_rx.recv() => { return None; },
ev = rx.recv() => {
let e = ev.expect("error when reading event");
if let Some(ref path) = e.path {
if !filter.is_excluded(path) {
paths.push(path.to_owned());
break;
}
}
}
};
}
// Wait for filesystem activity to cool off
// Unfortunately, we can't use select! with recv_timeout :(
let timeout = Duration::from_millis(500);
while let Ok(e) = rx.recv_timeout(timeout) {
if interrupt_handler::interrupt_requested() {
break;
}
if let Some(ref path) = e.path {
paths.push(path.to_owned());
}
}
Some(paths)
}