2021-08-19 10:44:02 +02:00
|
|
|
//! Event source for changes to files and directories.
|
|
|
|
|
2021-08-16 15:15:17 +02:00
|
|
|
use std::{
|
|
|
|
collections::{HashMap, HashSet},
|
2021-10-12 17:06:39 +02:00
|
|
|
fs::metadata,
|
2021-08-22 20:08:25 +02:00
|
|
|
mem::take,
|
2021-10-14 14:38:21 +02:00
|
|
|
path::{Path, PathBuf},
|
2021-08-24 12:28:29 +02:00
|
|
|
sync::{Arc, Mutex},
|
|
|
|
time::Duration,
|
2021-08-16 15:15:17 +02:00
|
|
|
};
|
2021-08-16 11:49:12 +02:00
|
|
|
|
2021-08-19 16:59:39 +02:00
|
|
|
use notify::Watcher as _;
|
2021-08-16 15:15:17 +02:00
|
|
|
use tokio::sync::{mpsc, watch};
|
2021-08-22 19:15:55 +02:00
|
|
|
use tracing::{debug, error, trace};
|
2021-08-16 11:49:12 +02:00
|
|
|
|
2021-08-16 15:15:17 +02:00
|
|
|
use crate::{
|
|
|
|
error::{CriticalError, RuntimeError},
|
2021-09-13 09:51:07 +02:00
|
|
|
event::{Event, Source, Tag},
|
2021-08-16 15:15:17 +02:00
|
|
|
};
|
2021-08-16 11:49:12 +02:00
|
|
|
|
|
|
|
/// What kind of filesystem watcher to use.
|
|
|
|
///
|
|
|
|
/// For now only native and poll watchers are supported. In the future there may be additional
|
|
|
|
/// watchers available on some platforms.
|
|
|
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
|
|
|
#[non_exhaustive]
|
|
|
|
pub enum Watcher {
|
|
|
|
Native,
|
2021-08-24 12:28:29 +02:00
|
|
|
Poll(Duration),
|
2021-08-16 11:49:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Watcher {
|
2021-08-16 15:15:17 +02:00
|
|
|
fn default() -> Self {
|
|
|
|
Self::Native
|
|
|
|
}
|
2021-08-16 11:49:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Watcher {
|
2021-08-18 15:12:50 +02:00
|
|
|
fn create(
|
|
|
|
self,
|
2021-08-19 16:59:39 +02:00
|
|
|
f: impl notify::EventHandler,
|
2021-08-18 15:12:50 +02:00
|
|
|
) -> Result<Box<dyn notify::Watcher + Send>, RuntimeError> {
|
2021-08-16 11:49:12 +02:00
|
|
|
match self {
|
2021-08-18 15:12:50 +02:00
|
|
|
Self::Native => notify::RecommendedWatcher::new(f).map(|w| Box::new(w) as _),
|
2021-08-24 12:28:29 +02:00
|
|
|
Self::Poll(delay) => notify::PollWatcher::with_delay(Arc::new(Mutex::new(f)), delay)
|
|
|
|
.map(|w| Box::new(w) as _),
|
2021-08-16 15:15:17 +02:00
|
|
|
}
|
|
|
|
.map_err(|err| RuntimeError::FsWatcherCreate { kind: self, err })
|
2021-08-16 11:49:12 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// The working data set of the filesystem worker.
|
|
|
|
///
|
|
|
|
/// This is marked non-exhaustive so new configuration can be added without breaking.
|
|
|
|
#[derive(Clone, Debug, Default)]
|
|
|
|
#[non_exhaustive]
|
|
|
|
pub struct WorkingData {
|
2021-10-14 14:38:21 +02:00
|
|
|
pub pathset: Vec<WatchedPath>,
|
2021-08-16 11:49:12 +02:00
|
|
|
pub watcher: Watcher,
|
|
|
|
}
|
|
|
|
|
2021-10-14 14:38:21 +02:00
|
|
|
/// A path to watch.
|
|
|
|
///
|
|
|
|
/// This is currently only a wrapper around a [`PathBuf`], but may be augmented in the future.
|
|
|
|
#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
|
|
|
pub struct WatchedPath(PathBuf);
|
|
|
|
|
|
|
|
impl From<PathBuf> for WatchedPath {
|
|
|
|
fn from(path: PathBuf) -> Self {
|
|
|
|
Self(path)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<&Path> for WatchedPath {
|
|
|
|
fn from(path: &Path) -> Self {
|
|
|
|
Self(path.into())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<WatchedPath> for PathBuf {
|
|
|
|
fn from(path: WatchedPath) -> Self {
|
|
|
|
path.0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl AsRef<Path> for WatchedPath {
|
|
|
|
fn as_ref(&self) -> &Path {
|
|
|
|
self.0.as_ref()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-16 15:15:17 +02:00
|
|
|
/// Launch the filesystem event worker.
|
|
|
|
///
|
|
|
|
/// While you can run several, you should only have one.
|
2021-08-16 11:49:12 +02:00
|
|
|
///
|
2021-09-29 17:03:46 +02:00
|
|
|
/// This only does a bare minimum of setup; to actually start the work, you need to set a non-empty
|
|
|
|
/// pathset on the [`WorkingData`] with the [`watch`] channel, and send a notification. Take care
|
|
|
|
/// _not_ to drop the watch sender: this will cause the worker to stop gracefully, which may not be
|
|
|
|
/// what was expected.
|
|
|
|
///
|
|
|
|
/// Note that the paths emitted by the watcher are canonicalised. No guarantee is made about the
|
|
|
|
/// implementation or output of that canonicalisation (i.e. it might not be `std`'s).
|
2021-08-16 15:15:17 +02:00
|
|
|
///
|
|
|
|
/// # Examples
|
|
|
|
///
|
|
|
|
/// Direct usage:
|
|
|
|
///
|
|
|
|
/// ```no_run
|
|
|
|
/// use tokio::sync::{mpsc, watch};
|
|
|
|
/// use watchexec::fs::{worker, WorkingData};
|
|
|
|
///
|
|
|
|
/// #[tokio::main]
|
|
|
|
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
|
|
/// let (ev_s, _) = mpsc::channel(1024);
|
|
|
|
/// let (er_s, _) = mpsc::channel(64);
|
|
|
|
/// let (wd_s, wd_r) = watch::channel(WorkingData::default());
|
|
|
|
///
|
|
|
|
/// let mut wkd = WorkingData::default();
|
|
|
|
/// wkd.pathset = vec![".".into()];
|
|
|
|
/// wd_s.send(wkd)?;
|
|
|
|
///
|
|
|
|
/// worker(wd_r, er_s, ev_s).await?;
|
|
|
|
/// Ok(())
|
|
|
|
/// }
|
|
|
|
/// ```
|
2021-08-16 11:49:12 +02:00
|
|
|
pub async fn worker(
|
|
|
|
mut working: watch::Receiver<WorkingData>,
|
|
|
|
errors: mpsc::Sender<RuntimeError>,
|
|
|
|
events: mpsc::Sender<Event>,
|
|
|
|
) -> Result<(), CriticalError> {
|
|
|
|
debug!("launching filesystem worker");
|
|
|
|
|
|
|
|
let mut watcher_type = Watcher::default();
|
2021-08-18 14:40:35 +02:00
|
|
|
let mut watcher = None;
|
|
|
|
let mut pathset = HashSet::new();
|
2021-08-16 11:49:12 +02:00
|
|
|
|
|
|
|
while working.changed().await.is_ok() {
|
|
|
|
// In separate scope so we drop the working read lock as early as we can
|
|
|
|
let (new_watcher, to_watch, to_drop) = {
|
|
|
|
let data = working.borrow();
|
|
|
|
trace!(?data, "filesystem worker got a working data change");
|
|
|
|
|
|
|
|
if data.pathset.is_empty() {
|
|
|
|
trace!("no more watched paths, dropping watcher");
|
|
|
|
watcher.take();
|
|
|
|
pathset.drain();
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if watcher.is_none() || watcher_type != data.watcher {
|
|
|
|
pathset.drain();
|
|
|
|
|
|
|
|
(Some(data.watcher), data.pathset.clone(), Vec::new())
|
|
|
|
} else {
|
|
|
|
let mut to_watch = Vec::with_capacity(data.pathset.len());
|
|
|
|
let mut to_drop = Vec::with_capacity(pathset.len());
|
|
|
|
for path in data.pathset.iter() {
|
|
|
|
if !pathset.contains(path) {
|
|
|
|
to_watch.push(path.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for path in pathset.iter() {
|
|
|
|
if !data.pathset.contains(path) {
|
|
|
|
to_drop.push(path.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
(None, to_watch, to_drop)
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if let Some(kind) = new_watcher {
|
|
|
|
debug!(?kind, "creating new watcher");
|
|
|
|
let n_errors = errors.clone();
|
|
|
|
let n_events = events.clone();
|
2021-08-16 15:15:17 +02:00
|
|
|
match kind.create(move |nev: Result<notify::Event, notify::Error>| {
|
2021-08-16 11:49:12 +02:00
|
|
|
trace!(event = ?nev, "receiving possible event from watcher");
|
2021-08-16 15:15:17 +02:00
|
|
|
if let Err(e) = process_event(nev, kind, n_events.clone()) {
|
|
|
|
n_errors.try_send(e).ok();
|
2021-08-16 11:49:12 +02:00
|
|
|
}
|
|
|
|
}) {
|
|
|
|
Ok(w) => {
|
2021-09-29 17:03:46 +02:00
|
|
|
watcher = Some(w);
|
2021-08-16 11:49:12 +02:00
|
|
|
watcher_type = kind;
|
2021-08-16 15:15:17 +02:00
|
|
|
}
|
2021-08-16 11:49:12 +02:00
|
|
|
Err(e) => {
|
|
|
|
errors.send(e).await?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(w) = watcher.as_mut() {
|
|
|
|
debug!(?to_watch, ?to_drop, "applying changes to the watcher");
|
|
|
|
|
|
|
|
for path in to_drop {
|
|
|
|
trace!(?path, "removing path from the watcher");
|
2021-10-14 14:38:21 +02:00
|
|
|
if let Err(err) = w.unwatch(path.as_ref()) {
|
2021-08-22 19:15:55 +02:00
|
|
|
error!(?err, "notify unwatch() error");
|
2021-08-22 20:08:25 +02:00
|
|
|
for e in notify_multi_path_errors(watcher_type, path, err, true) {
|
|
|
|
errors.send(e).await?;
|
|
|
|
}
|
2021-08-16 11:49:12 +02:00
|
|
|
} else {
|
|
|
|
pathset.remove(&path);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for path in to_watch {
|
|
|
|
trace!(?path, "adding path to the watcher");
|
2021-10-14 14:38:21 +02:00
|
|
|
if let Err(err) = w.watch(path.as_ref(), notify::RecursiveMode::Recursive) {
|
2021-08-22 19:15:55 +02:00
|
|
|
error!(?err, "notify watch() error");
|
2021-08-22 20:08:25 +02:00
|
|
|
for e in notify_multi_path_errors(watcher_type, path, err, false) {
|
|
|
|
errors.send(e).await?;
|
|
|
|
}
|
|
|
|
// TODO: unwatch and re-watch manually while ignoring all the erroring paths
|
2021-08-16 11:49:12 +02:00
|
|
|
} else {
|
|
|
|
pathset.insert(path);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-16 15:37:01 +02:00
|
|
|
debug!("ending file watcher");
|
2021-08-16 11:49:12 +02:00
|
|
|
Ok(())
|
|
|
|
}
|
2021-08-18 14:40:35 +02:00
|
|
|
|
2021-08-22 20:08:25 +02:00
|
|
|
fn notify_multi_path_errors(
|
|
|
|
kind: Watcher,
|
2021-10-14 14:38:21 +02:00
|
|
|
path: WatchedPath,
|
2021-08-22 20:08:25 +02:00
|
|
|
mut err: notify::Error,
|
|
|
|
rm: bool,
|
|
|
|
) -> Vec<RuntimeError> {
|
|
|
|
let mut paths = take(&mut err.paths);
|
|
|
|
if paths.is_empty() {
|
2021-10-14 14:38:21 +02:00
|
|
|
paths.push(path.into());
|
2021-08-22 20:08:25 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
let generic = err.to_string();
|
|
|
|
let mut err = Some(err);
|
|
|
|
|
|
|
|
let mut errs = Vec::with_capacity(paths.len());
|
|
|
|
for path in paths {
|
|
|
|
let e = err
|
|
|
|
.take()
|
|
|
|
.unwrap_or_else(|| notify::Error::generic(&generic))
|
|
|
|
.add_path(path.clone());
|
|
|
|
|
|
|
|
errs.push(if rm {
|
|
|
|
RuntimeError::FsWatcherPathRemove { path, kind, err: e }
|
|
|
|
} else {
|
|
|
|
RuntimeError::FsWatcherPathAdd { path, kind, err: e }
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
errs
|
|
|
|
}
|
|
|
|
|
2021-08-16 15:15:17 +02:00
|
|
|
fn process_event(
|
|
|
|
nev: Result<notify::Event, notify::Error>,
|
|
|
|
kind: Watcher,
|
|
|
|
n_events: mpsc::Sender<Event>,
|
|
|
|
) -> Result<(), RuntimeError> {
|
|
|
|
let nev = nev.map_err(|err| RuntimeError::FsWatcherEvent { kind, err })?;
|
|
|
|
|
2021-09-13 09:34:40 +02:00
|
|
|
let mut tags = Vec::with_capacity(4);
|
|
|
|
tags.push(Tag::Source(Source::Filesystem));
|
|
|
|
tags.push(Tag::FileEventKind(nev.kind));
|
2021-08-16 15:15:17 +02:00
|
|
|
|
|
|
|
for path in nev.paths {
|
2021-10-12 17:06:39 +02:00
|
|
|
// possibly pull file_type from whatever notify (or the native driver) returns?
|
|
|
|
tags.push(Tag::Path {
|
|
|
|
file_type: metadata(&path).ok().map(|m| m.file_type()),
|
|
|
|
path: dunce::canonicalize(path)?,
|
|
|
|
});
|
2021-08-16 15:15:17 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(pid) = nev.attrs.process_id() {
|
2021-09-13 09:34:40 +02:00
|
|
|
tags.push(Tag::Process(pid));
|
2021-08-16 15:15:17 +02:00
|
|
|
}
|
|
|
|
|
2021-09-02 22:14:04 +02:00
|
|
|
let mut metadata = HashMap::new();
|
|
|
|
|
|
|
|
if let Some(uid) = nev.attrs.info() {
|
|
|
|
metadata.insert("file-event-info".to_string(), vec![uid.to_string()]);
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(src) = nev.attrs.source() {
|
|
|
|
metadata.insert("notify-backend".to_string(), vec![src.to_string()]);
|
|
|
|
}
|
|
|
|
|
2021-09-13 09:51:07 +02:00
|
|
|
let ev = Event { tags, metadata };
|
2021-08-16 15:15:17 +02:00
|
|
|
|
|
|
|
trace!(event = ?ev, "processed notify event into watchexec event");
|
|
|
|
n_events
|
|
|
|
.try_send(ev)
|
2021-08-17 11:41:13 +02:00
|
|
|
.map_err(|err| RuntimeError::EventChannelTrySend {
|
2021-08-16 15:15:17 +02:00
|
|
|
ctx: "fs watcher",
|
|
|
|
err,
|
|
|
|
})?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|