From 497d2db5885eef3eaac766c8de3cd39eb52f1192 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Sat, 8 Apr 2023 19:03:17 +1200 Subject: [PATCH] Add jaq filters --- crates/cli/Cargo.toml | 2 + crates/cli/src/filterer.rs | 14 +++ crates/cli/src/filterer/progs.rs | 204 +++++++++++++++++++++++++++++++ crates/lib/Cargo.toml | 2 +- 4 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 crates/cli/src/filterer/progs.rs diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index da309dd..13fe67b 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -30,6 +30,8 @@ dirs = "5.0.0" futures = "0.3.29" humantime = "2.1.0" is-terminal = "0.4.4" +jaq-core = "1.2.1" +jaq-std = "1.2.1" notify-rust = "4.9.0" serde_json = "1.0.107" tempfile = "3.8.1" diff --git a/crates/cli/src/filterer.rs b/crates/cli/src/filterer.rs index 688a588..2851545 100644 --- a/crates/cli/src/filterer.rs +++ b/crates/cli/src/filterer.rs @@ -20,12 +20,14 @@ use watchexec_filterer_globset::GlobsetFilterer; use crate::args::{Args, FsEvent}; mod dirs; +mod progs; /// A custom filterer that combines the library's Globset filterer and a switch for --no-meta #[derive(Debug)] pub struct WatchexecFilterer { inner: GlobsetFilterer, fs_events: Vec, + progs: Option, } impl Filterer for WatchexecFilterer { @@ -53,6 +55,13 @@ impl Filterer for WatchexecFilterer { return Ok(false); } + if let Some(progs) = &self.progs { + trace!("check against program filters"); + if !progs.check(event)? { + return Ok(false); + } + } + Ok(true) } } @@ -116,6 +125,11 @@ impl WatchexecFilterer { .await .into_diagnostic()?, fs_events: args.filter_fs_events.clone(), + progs: if args.filter_programs.is_empty() { + None + } else { + Some(progs::FilterProgs::new(args)?) + }, })) } } diff --git a/crates/cli/src/filterer/progs.rs b/crates/cli/src/filterer/progs.rs new file mode 100644 index 0000000..fba7c32 --- /dev/null +++ b/crates/cli/src/filterer/progs.rs @@ -0,0 +1,204 @@ +use std::{iter, marker::PhantomData}; + +use jaq_core::{ + parse::{self, filter::Filter, Def}, + Ctx, Definitions, RcIter, Val, +}; +use miette::miette; +use tokio::{ + sync::{mpsc, oneshot}, + task::{block_in_place, spawn_blocking}, +}; +use tracing::{debug, error, trace, warn}; +use watchexec::error::RuntimeError; +use watchexec_events::Event; + +use crate::args::Args; + +const BUFFER: usize = 128; + +#[derive(Debug)] +pub struct FilterProgs { + channel: Requester, +} + +#[derive(Debug, Clone)] +pub struct Requester { + sender: mpsc::Sender<(S, oneshot::Sender)>, + _receiver: PhantomData, +} + +impl Requester +where + S: Send + Sync, + R: Send + Sync, +{ + pub fn new(capacity: usize) -> (Self, mpsc::Receiver<(S, oneshot::Sender)>) { + let (sender, receiver) = mpsc::channel(capacity); + ( + Self { + sender, + _receiver: PhantomData, + }, + receiver, + ) + } + + pub fn call(&self, value: S) -> Result { + // FIXME: this should really be async with a timeout, but that needs filtering in general + // to be async, which should be done at some point + block_in_place(|| { + let (sender, receiver) = oneshot::channel(); + self.sender.blocking_send((value, sender)).map_err(|err| { + RuntimeError::External(miette!("filter progs internal channel: {}", err).into()) + })?; + receiver + .blocking_recv() + .map_err(|err| RuntimeError::External(Box::new(err))) + }) + } +} + +impl FilterProgs { + pub fn check(&self, event: &Event) -> Result { + self.channel.call(event.clone()) + } + + pub fn new(args: &Args) -> miette::Result { + let n_filters = args.filter_programs.len(); + let progs = args.filter_programs.clone(); + warn!("EXPERIMENTAL: filter programs are unstable and may change/vanish without notice"); + + let (requester, mut receiver) = Requester::::new(BUFFER); + let task = + spawn_blocking(move || { + let mut defs = load_std_defs()?; + load_watchexec_defs(&mut defs)?; + load_user_progs(&mut defs, &progs)?; + + 'chan: while let Some((event, sender)) = receiver.blocking_recv() { + let val = serde_json::to_value(&event) + .map_err(|err| miette!("failed to serialize event: {}", err)) + .map(Val::from)?; + + for n in 0..n_filters { + trace!(?n, "trying filter program"); + + let name = format!("__watchexec_filter_{n}"); + let filter = Filter::Call(name, Vec::new()); + let mut errs = Vec::new(); + let filter = defs.clone().finish( + (Vec::new(), (filter, 0..0)), + Vec::new(), + &mut errs, + ); + if !errs.is_empty() { + error!(?errs, "failed to load filter program #{}", n); + continue; + } + + let inputs = RcIter::new(iter::once(Ok(val.clone()))); + let ctx = Ctx::new(Vec::new(), &inputs); + let mut results = filter.run(ctx, val.clone()); + if let Some(res) = results.next() { + match res { + Ok(Val::Bool(false)) => { + trace!( + ?n, + verdict = false, + "filter program finished; fail so stopping there" + ); + sender + .send(false) + .unwrap_or_else(|_| warn!("failed to send filter result")); + continue 'chan; + } + Ok(Val::Bool(true)) => { + trace!( + ?n, + verdict = true, + "filter program finished; pass so trying next" + ); + continue; + } + Ok(val) => { + error!(?n, ?val, "filter program returned non-boolean, ignoring and trying next"); + continue; + } + Err(err) => { + error!(?n, ?err, "filter program failed, so trying next"); + continue; + } + } + } + } + + trace!("all filters failed, sending pass as default"); + sender + .send(true) + .unwrap_or_else(|_| warn!("failed to send filter result")); + } + + Ok(()) as miette::Result<()> + }); + + tokio::spawn(async { + match task.await { + Ok(Ok(())) => {} + Ok(Err(err)) => error!("filter progs task failed: {}", err), + Err(err) => error!("filter progs task panicked: {}", err), + } + }); + + Ok(Self { channel: requester }) + } +} + +fn load_std_defs() -> miette::Result { + debug!("loading jaq core library"); + let mut defs = Definitions::core(); + + debug!("loading jaq standard library"); + let mut errs = Vec::new(); + jaq_std::std() + .into_iter() + .for_each(|def| defs.insert(def, &mut errs)); + + if !errs.is_empty() { + return Err(miette!("failed to load jaq standard library: {:?}", errs)); + } + Ok(defs) +} + +fn load_watchexec_defs(defs: &mut Definitions) -> miette::Result<()> { + debug!("loading jaq watchexec library"); + Ok(()) +} + +fn load_user_progs(all_defs: &mut Definitions, progs: &[String]) -> miette::Result<()> { + debug!("loading jaq programs"); + for (n, prog) in progs.iter().enumerate() { + trace!(?n, ?prog, "loading filter program"); + let (main, mut errs) = parse::parse(prog, parse::main()); + + if let Some((defs, filter)) = main { + let name = format!("__watchexec_filter_{}", n); + trace!(?filter, ?name, "loading filter program into global as def"); + all_defs.insert( + Def { + name, + args: Vec::new(), + body: filter, + defs, + }, + &mut errs, + ); + } + + if !errs.is_empty() { + return Err(miette!("failed to load filter program #{}: {:?}", n, errs)); + } + } + + Ok(()) +} diff --git a/crates/lib/Cargo.toml b/crates/lib/Cargo.toml index 6ed0b7e..6706e2d 100644 --- a/crates/lib/Cargo.toml +++ b/crates/lib/Cargo.toml @@ -2,7 +2,7 @@ name = "watchexec" version = "3.0.1" -authors = ["Matt Green ", "Félix Saparelli "] +authors = ["Félix Saparelli ", "Matt Green "] license = "Apache-2.0" description = "Library to execute commands in response to file modifications" keywords = ["watcher", "filesystem", "watchexec"]