Canonicalise paths + add example

This commit is contained in:
Félix Saparelli 2021-08-17 01:15:17 +12:00
parent f08bdad8ee
commit 822148da03
No known key found for this signature in database
GPG Key ID: B948C4BAE44FC474
7 changed files with 229 additions and 64 deletions

61
Cargo.lock generated
View File

@ -26,6 +26,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "arrayref"
version = "0.3.6"
@ -424,6 +433,12 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56899898ce76aaf4a0f24d914c97ea6ed976d42fec6ad33fcbb0a1103e07b2b0"
[[package]]
name = "dunce"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453440c271cf5577fd2a40e4942540cb7d0d2f85e27c8d07dd0023c925a67541"
[[package]]
name = "either"
version = "1.6.1"
@ -907,6 +922,15 @@ dependencies = [
"libc",
]
[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
dependencies = [
"regex-automata",
]
[[package]]
name = "memchr"
version = "2.4.0"
@ -1499,6 +1523,9 @@ name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
@ -1880,15 +1907,47 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-log"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab69019741fca4d98be3c62d2b75254528b5432233fd8a4d2739fec20278de48"
dependencies = [
"ansi_term",
"chrono",
"lazy_static",
"matchers",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
@ -1996,11 +2055,13 @@ name = "watchexec"
version = "1.17.1"
dependencies = [
"chrono",
"dunce",
"miette",
"notify 5.0.0-pre.11",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]

View File

@ -20,8 +20,12 @@ miette = "0.7.0"
notify = "5.0.0-pre.11"
thiserror = "1.0.26"
tracing = "0.1.26"
dunce = "1.0.2"
[dependencies.tokio]
version = "1.10.0"
features = ["full"]
[dev-dependencies]
tracing-subscriber = "0.2.19"

33
lib/examples/fs.rs Normal file
View File

@ -0,0 +1,33 @@
use std::error::Error;
use tokio::sync::{mpsc, watch};
use watchexec::fs;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt::init();
let (ev_s, mut ev_r) = mpsc::channel(1024);
let (er_s, mut er_r) = mpsc::channel(64);
let (wd_s, wd_r) = watch::channel(fs::WorkingData::default());
let mut wkd = fs::WorkingData::default();
wkd.pathset = vec![".".into()];
wd_s.send(wkd)?;
tokio::spawn(async move {
while let Some(e) = ev_r.recv().await {
println!("event: {:?}", e);
}
});
tokio::spawn(async move {
while let Some(e) = er_r.recv().await {
println!("error: {}", e);
}
});
fs::worker(wd_r, er_s, ev_s).await?;
Ok(())
}

View File

@ -2,8 +2,8 @@
use std::path::PathBuf;
use thiserror::Error;
use miette::Diagnostic;
use thiserror::Error;
use tokio::sync::mpsc;
use crate::{event::Event, fs::Watcher};
@ -37,33 +37,49 @@ pub enum RuntimeError {
#[error("{kind:?} watcher failed to instantiate: {err}")]
#[diagnostic(
code(watchexec::runtime::fs_watcher_error),
help("perhaps retry with the poll watcher"),
help("perhaps retry with the poll watcher")
)]
FsWatcherCreate { kind: Watcher, #[source] err: notify::Error },
FsWatcherCreate {
kind: Watcher,
#[source]
err: notify::Error,
},
/// Error received when reading a filesystem event fails.
#[error("{kind:?} watcher received an event that we could not read: {err}")]
#[diagnostic(
code(watchexec::runtime::fs_watcher_event),
)]
FsWatcherEvent { kind: Watcher, #[source] err: notify::Error },
#[diagnostic(code(watchexec::runtime::fs_watcher_event))]
FsWatcherEvent {
kind: Watcher,
#[source]
err: notify::Error,
},
/// Error received when adding to the pathset for the filesystem watcher fails.
#[error("while adding {path:?} to the {kind:?} watcher: {err}")]
#[diagnostic(
code(watchexec::runtime::fs_watcher_path_add),
)]
FsWatcherPathAdd { path: PathBuf, kind: Watcher, #[source] err: notify::Error },
#[diagnostic(code(watchexec::runtime::fs_watcher_path_add))]
FsWatcherPathAdd {
path: PathBuf,
kind: Watcher,
#[source]
err: notify::Error,
},
/// Error received when removing from the pathset for the filesystem watcher fails.
#[error("while removing {path:?} from the {kind:?} watcher: {err}")]
#[diagnostic(
code(watchexec::runtime::fs_watcher_path_remove),
)]
FsWatcherPathRemove { path: PathBuf, kind: Watcher, #[source] err: notify::Error },
#[diagnostic(code(watchexec::runtime::fs_watcher_path_remove))]
FsWatcherPathRemove {
path: PathBuf,
kind: Watcher,
#[source]
err: notify::Error,
},
/// Error received when an event cannot be sent to the event channel.
#[error("cannot send event from {ctx}: {err}")]
#[diagnostic(code(watchexec::runtime::event_channel_send))]
EventChannelSend { ctx: &'static str, #[source] err: mpsc::error::TrySendError<Event> },
EventChannelSend {
ctx: &'static str,
#[source]
err: mpsc::error::TrySendError<Event>,
},
}

View File

@ -33,4 +33,3 @@ pub enum Source {
Mouse,
Time,
}

View File

@ -1,9 +1,15 @@
use std::{collections::{HashMap, HashSet}, path::PathBuf};
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
};
use tokio::{sync::{mpsc, watch}};
use tokio::sync::{mpsc, watch};
use tracing::{debug, trace};
use crate::{error::{CriticalError, RuntimeError}, event::{Event, Particle, Source}};
use crate::{
error::{CriticalError, RuntimeError},
event::{Event, Particle, Source},
};
/// What kind of filesystem watcher to use.
///
@ -17,17 +23,22 @@ pub enum Watcher {
}
impl Default for Watcher {
fn default() -> Self {
Self::Native
}
fn default() -> Self {
Self::Native
}
}
impl Watcher {
fn create(self, f: impl notify::EventFn) -> Result<Box<dyn notify::Watcher>, RuntimeError> {
match self {
Self::Native => notify::RecommendedWatcher::new(f).map(|w| Box::new(w) as Box<dyn notify::Watcher>),
Self::Poll => notify::PollWatcher::new(f).map(|w| Box::new(w) as Box<dyn notify::Watcher>),
}.map_err(|err| RuntimeError::FsWatcherCreate { kind: self, err })
Self::Native => {
notify::RecommendedWatcher::new(f).map(|w| Box::new(w) as Box<dyn notify::Watcher>)
}
Self::Poll => {
notify::PollWatcher::new(f).map(|w| Box::new(w) as Box<dyn notify::Watcher>)
}
}
.map_err(|err| RuntimeError::FsWatcherCreate { kind: self, err })
}
}
@ -41,10 +52,35 @@ pub struct WorkingData {
pub watcher: Watcher,
}
/// Launch a filesystem event worker.
/// Launch the filesystem event worker.
///
/// While you can run several, you should only have one.
///
/// This only does a bare minimum of setup; to actually start the work, you need to set a non-empty pathset on the
/// [`WorkingData`] with the [`watch`] channel.
/// [`WorkingData`] with the [`watch`] channel, and send a notification.
///
/// # Examples
///
/// Direct usage:
///
/// ```no_run
/// use tokio::sync::{mpsc, watch};
/// use watchexec::fs::{worker, WorkingData};
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let (ev_s, _) = mpsc::channel(1024);
/// let (er_s, _) = mpsc::channel(64);
/// let (wd_s, wd_r) = watch::channel(WorkingData::default());
///
/// let mut wkd = WorkingData::default();
/// wkd.pathset = vec![".".into()];
/// wd_s.send(wkd)?;
///
/// worker(wd_r, er_s, ev_s).await?;
/// Ok(())
/// }
/// ```
pub async fn worker(
mut working: watch::Receiver<WorkingData>,
errors: mpsc::Sender<RuntimeError>,
@ -96,45 +132,16 @@ pub async fn worker(
debug!(?kind, "creating new watcher");
let n_errors = errors.clone();
let n_events = events.clone();
match kind.create(move |nev: Result<notify::Event, notify::Error> | {
match kind.create(move |nev: Result<notify::Event, notify::Error>| {
trace!(event = ?nev, "receiving possible event from watcher");
match nev {
Err(err) => {
n_errors.try_send(RuntimeError::FsWatcherEvent { kind, err }).ok();
},
Ok(nev) => {
let mut particulars = Vec::with_capacity(4);
particulars.push(Particle::Source(Source::Filesystem));
for path in nev.paths {
particulars.push(Particle::Path(path));
}
if let Some(pid) = nev.attrs.process_id() {
particulars.push(Particle::Process(pid));
}
let ev = Event {
particulars,
metadata: HashMap::new(), // TODO
};
trace!(event = ?ev, "processed notify event into watchexec event");
if let Err(err) = n_events.try_send(ev) {
n_errors.try_send(RuntimeError::EventChannelSend {
ctx: "fs watcher",
err,
}).ok();
}
}
if let Err(e) = process_event(nev, kind, n_events.clone()) {
n_errors.try_send(e).ok();
}
}) {
Ok(w) => {
watcher.insert(w);
watcher_type = kind;
},
}
Err(e) => {
errors.send(e).await?;
}
@ -147,7 +154,13 @@ pub async fn worker(
for path in to_drop {
trace!(?path, "removing path from the watcher");
if let Err(err) = w.unwatch(&path) {
errors.send(RuntimeError::FsWatcherPathRemove { path, kind: watcher_type, err }).await?;
errors
.send(RuntimeError::FsWatcherPathRemove {
path,
kind: watcher_type,
err,
})
.await?;
} else {
pathset.remove(&path);
}
@ -156,7 +169,13 @@ pub async fn worker(
for path in to_watch {
trace!(?path, "adding path to the watcher");
if let Err(err) = w.watch(&path, notify::RecursiveMode::Recursive) {
errors.send(RuntimeError::FsWatcherPathAdd { path, kind: watcher_type, err }).await?;
errors
.send(RuntimeError::FsWatcherPathAdd {
path,
kind: watcher_type,
err,
})
.await?;
} else {
pathset.insert(path);
}
@ -166,3 +185,36 @@ pub async fn worker(
Ok(())
}
fn process_event(
nev: Result<notify::Event, notify::Error>,
kind: Watcher,
n_events: mpsc::Sender<Event>,
) -> Result<(), RuntimeError> {
let nev = nev.map_err(|err| RuntimeError::FsWatcherEvent { kind, err })?;
let mut particulars = Vec::with_capacity(4);
particulars.push(Particle::Source(Source::Filesystem));
for path in nev.paths {
particulars.push(Particle::Path(dunce::canonicalize(path)?));
}
if let Some(pid) = nev.attrs.process_id() {
particulars.push(Particle::Process(pid));
}
let ev = Event {
particulars,
metadata: HashMap::new(), // TODO
};
trace!(event = ?ev, "processed notify event into watchexec event");
n_events
.try_send(ev)
.map_err(|err| RuntimeError::EventChannelSend {
ctx: "fs watcher",
err,
})?;
Ok(())
}

View File

@ -17,4 +17,4 @@
pub mod error;
pub mod event;
mod fs;
pub mod fs;