2020-07-03 23:32:49 +12:00

335 lines
10 KiB

use crate::cli::{clear_screen, Args};
use crate::error::{Error, Result};
use crate::gitignore;
use crate::ignore;
use crate::notification_filter::NotificationFilter;
use crate::pathop::PathOp;
use crate::process::{self, Process};
use crate::signal::{self, Signal};
use crate::watcher::{Event, Watcher};
use std::{
mpsc::{channel, Receiver},
Arc, RwLock,
pub trait Handler {
/// Called through a manual request, such as an initial run.
/// # Returns
/// A `Result` which means:
/// - `Err`: an error has occurred while processing, quit.
/// - `Ok(true)`: everything is fine and the loop can continue.
/// - `Ok(false)`: everything is fine but we should gracefully stop.
fn on_manual(&self) -> Result<bool>;
/// Called through a file-update request.
/// # Parameters
/// - `ops`: The list of events that triggered this update.
/// # Returns
/// A `Result` which means:
/// - `Err`: an error has occurred while processing, quit.
/// - `Ok(true)`: everything is fine and the loop can continue.
/// - `Ok(false)`: everything is fine but we should gracefully stop.
fn on_update(&self, ops: &[PathOp]) -> Result<bool>;
/// Called once by `watch` at the very start.
/// Not called again; any changes will never be picked up.
/// The `Args` instance should be created using `ArgsBuilder` rather than direct initialisation
/// to resist potential breaking changes (see semver policy on crate root).
fn args(&self) -> Args;
/// Starts watching, and calls a handler when something happens.
/// Given an argument structure and a `Handler` type, starts the watcher loop, blocking until done.
pub fn watch<H>(handler: &H) -> Result<()>
H: Handler,
let args = handler.args();
let mut paths = vec![];
for path in &args.paths {
.map_err(|e| Error::Canonicalization(path.to_string_lossy().into_owned(), e))?,
let ignore = ignore::load(if args.no_ignore { &[] } else { &paths });
let gitignore = gitignore::load(if args.no_vcs_ignore || args.no_ignore {
} else {
let filter = NotificationFilter::new(&args.filters, &args.ignores, gitignore, ignore)?;
let (tx, rx) = channel();
let poll = args.poll;
#[cfg(target_os = "linux")]
let poll_interval = args.poll_interval;
let watcher = Watcher::new(tx.clone(), &paths, args.poll, args.poll_interval).or_else(|err| {
if poll {
return Err(err);
#[cfg(target_os = "linux")]
use nix::libc;
let mut fallback = false;
if let notify::Error::Io(ref e) = err {
if e.raw_os_error() == Some(libc::ENOSPC) {
warn!("System notification limit is too small, falling back to polling mode. For better performance increase system limit:\n\tsysctl fs.inotify.max_user_watches=524288");
fallback = true;
if fallback {
return Watcher::new(tx, &paths, true, poll_interval);
if watcher.is_polling() {
warn!("Polling for changes every {} ms", args.poll_interval);
// Call handler initially, if necessary
if args.run_initially && !handler.on_manual()? {
return Ok(());
loop {
debug!("Waiting for filesystem activity");
let paths = wait_fs(&rx, &filter, args.debounce, args.no_meta);
debug!("Paths updated: {:?}", paths);
if !handler.on_update(&paths)? {
pub struct ExecHandler {
args: Args,
signal: Option<Signal>,
child_process: Arc<RwLock<Option<Process>>>,
impl ExecHandler {
pub fn new(args: Args) -> Result<Self> {
let child_process: Arc<RwLock<Option<Process>>> = Arc::new(RwLock::new(None));
let weak_child = Arc::downgrade(&child_process);
// Convert signal string to the corresponding integer
let signal = signal::new(args.signal.clone());
signal::install_handler(move |sig: Signal| {
if let Some(lock) = weak_child.upgrade() {
let strong ="poisoned lock in install_handler");
if let Some(ref child) = *strong {
match sig {
Signal::SIGCHLD => child.reap(), // SIGCHLD is special, initiate reap()
_ => child.signal(sig),
Ok(Self {
fn spawn(&self, ops: &[PathOp]) -> Result<()> {
if self.args.clear_screen {
debug!("Launching child process");
let mut guard = self.child_process.write()?;
*guard = Some(process::spawn(&self.args.cmd, ops, self.args.no_shell, self.args.no_environment)?);
pub fn has_running_process(&self) -> bool {
let guard = self
.expect("poisoned lock in has_running_process");
impl Handler for ExecHandler {
fn args(&self) -> Args {
// Only returns Err() on lock poisoning.
fn on_manual(&self) -> Result<bool> {
if self.args.once {
return Ok(true);
// Only returns Err() on lock poisoning.
fn on_update(&self, ops: &[PathOp]) -> Result<bool> {
// We have four scenarios here:
// 1. Just send a specified signal to the child, do nothing more
// 2. Just spawn a process if there are no running processes in the background
// 3. Send a specified signal to the child, wait for it to exit, then run the command again
// 4. Send SIGTERM to the child, wait for it to exit, then run the command again
// 5. Make sure the previous run was ended, then run the command again
let scenario = (
let running_process = self.has_running_process();
match scenario {
// SIGHUP scenario: --signal was given, but --restart was not
// Just send a signal (e.g. SIGHUP) to the child, do nothing more
(false, true, _) => signal_process(&self.child_process, self.signal, false),
// Spawn a process when there are no running processes in the background
(_, _, true) => {
if !running_process {
// Custom restart behaviour (--restart was given, and --signal specified):
// Send specified signal to the child, wait for it to exit, then run the command again
(true, true, false) => {
signal_process(&self.child_process, self.signal, true);
// Default restart behaviour (--restart was given, but --signal wasn't specified):
// Send SIGTERM to the child, wait for it to exit, then run the command again
(true, false, false) => {
let sigterm = signal::new(Some("SIGTERM".into()));
signal_process(&self.child_process, sigterm, true);
// Default behaviour (neither --signal nor --restart specified):
// Make sure the previous run was ended, then run the command again
(false, false, false) => {
signal_process(&self.child_process, None, true);
// Handle once option for integration testing
if self.args.once {
signal_process(&self.child_process, self.signal, false);
return Ok(false);
pub fn run(args: Args) -> Result<()> {
fn wait_fs(rx: &Receiver<Event>, filter: &NotificationFilter, debounce: u64, no_meta: bool) -> Vec<PathOp> {
let mut paths = Vec::new();
let mut cache = HashMap::new();
loop {
let e = rx.recv().expect("error when reading event");
if let Some(ref path) = e.path {
let pathop = PathOp::new(path, e.op.ok(), e.cookie);
if let Some(op) = pathop.op {
if no_meta && PathOp::is_meta(op) {
// Ignore cache for the initial file. Otherwise, in
// debug mode it's hard to track what's going on
let excluded = filter.is_excluded(path);
if !cache.contains_key(&pathop) {
cache.insert(pathop.clone(), excluded);
if !excluded {
// Wait for filesystem activity to cool off
let timeout = Duration::from_millis(debounce);
while let Ok(e) = rx.recv_timeout(timeout) {
if let Some(ref path) = e.path {
let pathop = PathOp::new(path, e.op.ok(), e.cookie);
if cache.contains_key(&pathop) {
let excluded = filter.is_excluded(path);
cache.insert(pathop.clone(), excluded);
if !excluded {
// signal_process sends signal to process. It waits for the process to exit if wait is true
fn signal_process(process: &RwLock<Option<Process>>, signal: Option<Signal>, wait: bool) {
let guard ="poisoned lock in signal_process");
if let Some(ref child) = *guard {
if let Some(s) = signal {
if wait {
debug!("Waiting for process to exit...");