From 73260c0e3506f5df301c1b44b4fb893fecf172ec Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Sun, 5 Nov 2023 14:55:37 -0500 Subject: [PATCH] walk: Send WorkerResults in batches --- src/dir_entry.rs | 2 + src/exec/job.rs | 31 +++++----- src/walk.rs | 156 ++++++++++++++++++++++++++++++++++++----------- 3 files changed, 137 insertions(+), 52 deletions(-) diff --git a/src/dir_entry.rs b/src/dir_entry.rs index 3a19d59..f44f2be 100644 --- a/src/dir_entry.rs +++ b/src/dir_entry.rs @@ -8,11 +8,13 @@ use lscolors::{Colorable, LsColors, Style}; use crate::config::Config; use crate::filesystem::strip_current_dir; +#[derive(Debug)] enum DirEntryInner { Normal(ignore::DirEntry), BrokenSymlink(PathBuf), } +#[derive(Debug)] pub struct DirEntry { inner: DirEntryInner, metadata: OnceCell>, diff --git a/src/exec/job.rs b/src/exec/job.rs index af603cc..4864d6d 100644 --- a/src/exec/job.rs +++ b/src/exec/job.rs @@ -1,9 +1,6 @@ use std::sync::Mutex; -use crossbeam_channel::Receiver; - use crate::config::Config; -use crate::dir_entry::DirEntry; use crate::error::print_error; use crate::exit_codes::{merge_exitcodes, ExitCode}; use crate::walk::WorkerResult; @@ -14,7 +11,7 @@ use super::CommandSet; /// generate a command with the supplied command template. The generated command will then /// be executed, and this process will continue until the receiver's sender has closed. pub fn job( - rx: Receiver, + results: impl IntoIterator, cmd: &CommandSet, out_perm: &Mutex<()>, config: &Config, @@ -22,35 +19,39 @@ pub fn job( // Output should be buffered when only running a single thread let buffer_output: bool = config.threads > 1; - let mut results: Vec = Vec::new(); - loop { + let mut ret = ExitCode::Success; + for result in results { // Obtain the next result from the receiver, else if the channel // has closed, exit from the loop - let dir_entry: DirEntry = match rx.recv() { - Ok(WorkerResult::Entry(dir_entry)) => dir_entry, - Ok(WorkerResult::Error(err)) => { + let dir_entry = match result { + WorkerResult::Entry(dir_entry) => dir_entry, + WorkerResult::Error(err) => { if config.show_filesystem_errors { print_error(err.to_string()); } continue; } - Err(_) => break, }; // Generate a command, execute it and store its exit code. - results.push(cmd.execute( + let code = cmd.execute( dir_entry.stripped_path(config), config.path_separator.as_deref(), out_perm, buffer_output, - )) + ); + ret = merge_exitcodes([ret, code]); } // Returns error in case of any error. - merge_exitcodes(results) + ret } -pub fn batch(rx: Receiver, cmd: &CommandSet, config: &Config) -> ExitCode { - let paths = rx +pub fn batch( + results: impl IntoIterator, + cmd: &CommandSet, + config: &Config, +) -> ExitCode { + let paths = results .into_iter() .filter_map(|worker_result| match worker_result { WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)), diff --git a/src/walk.rs b/src/walk.rs index 691c5d0..d0ecab8 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -4,12 +4,12 @@ use std::io::{self, Write}; use std::mem; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use std::thread; use std::time::{Duration, Instant}; use anyhow::{anyhow, Result}; -use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; +use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, SendError, Sender}; use etcetera::BaseStrategy; use ignore::overrides::{Override, OverrideBuilder}; use ignore::{self, WalkBuilder, WalkParallel, WalkState}; @@ -36,6 +36,7 @@ enum ReceiverMode { /// The Worker threads can result in a valid entry having PathBuf or an error. #[allow(clippy::large_enum_variant)] +#[derive(Debug)] pub enum WorkerResult { // Errors should be rare, so it's probably better to allow large_enum_variant than // to box the Entry variant @@ -43,6 +44,81 @@ pub enum WorkerResult { Error(ignore::Error), } +/// A batch of WorkerResults to send over a channel. +#[derive(Clone)] +struct Batch { + items: Arc>>>, +} + +impl Batch { + fn new() -> Self { + Self { + items: Arc::new(Mutex::new(Some(vec![]))), + } + } + + fn lock(&self) -> MutexGuard<'_, Option>> { + self.items.lock().unwrap() + } +} + +impl IntoIterator for Batch { + type Item = WorkerResult; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.lock().take().unwrap().into_iter() + } +} + +/// Wrapper that sends batches of items at once over a channel. +struct BatchSender { + batch: Batch, + tx: Sender, +} + +impl BatchSender { + fn new(tx: Sender) -> Self { + Self { + batch: Batch::new(), + tx, + } + } + + /// Check if we need to flush a batch. + fn needs_flush(batch: Option<&Vec>) -> bool { + match batch { + // Limit the batch size to provide some backpressure + Some(vec) => vec.len() >= 0x400, + // Batch was already taken by the receiver, so make a new one + None => true, + } + } + + /// Add an item to a batch. + fn send(&mut self, item: WorkerResult) -> Result<(), SendError<()>> { + let mut batch = self.batch.lock(); + + if Self::needs_flush(batch.as_ref()) { + drop(batch); + self.batch = Batch::new(); + batch = self.batch.lock(); + } + + let items = batch.as_mut().unwrap(); + items.push(item); + + if items.len() == 1 { + // New batch, send it over the channel + self.tx + .send(self.batch.clone()) + .map_err(|_| SendError(()))?; + } + + Ok(()) + } +} + /// Maximum size of the output buffer before flushing results to the console const MAX_BUFFER_LENGTH: usize = 1000; /// Default duration until output buffering switches to streaming. @@ -57,7 +133,7 @@ struct ReceiverBuffer<'a, W> { /// The ^C notifier. interrupt_flag: &'a AtomicBool, /// Receiver for worker results. - rx: Receiver, + rx: Receiver, /// Standard output. stdout: W, /// The current buffer mode. @@ -72,7 +148,7 @@ struct ReceiverBuffer<'a, W> { impl<'a, W: Write> ReceiverBuffer<'a, W> { /// Create a new receiver buffer. - fn new(state: &'a WorkerState, rx: Receiver, stdout: W) -> Self { + fn new(state: &'a WorkerState, rx: Receiver, stdout: W) -> Self { let config = &state.config; let quit_flag = state.quit_flag.as_ref(); let interrupt_flag = state.interrupt_flag.as_ref(); @@ -103,7 +179,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> { } /// Receive the next worker result. - fn recv(&self) -> Result { + fn recv(&self) -> Result { match self.mode { ReceiverMode::Buffering => { // Wait at most until we should switch to streaming @@ -119,34 +195,40 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> { /// Wait for a result or state change. fn poll(&mut self) -> Result<(), ExitCode> { match self.recv() { - Ok(WorkerResult::Entry(dir_entry)) => { - if self.config.quiet { - return Err(ExitCode::HasResults(true)); - } + Ok(batch) => { + for result in batch { + match result { + WorkerResult::Entry(dir_entry) => { + if self.config.quiet { + return Err(ExitCode::HasResults(true)); + } - match self.mode { - ReceiverMode::Buffering => { - self.buffer.push(dir_entry); - if self.buffer.len() > MAX_BUFFER_LENGTH { - self.stream()?; + match self.mode { + ReceiverMode::Buffering => { + self.buffer.push(dir_entry); + if self.buffer.len() > MAX_BUFFER_LENGTH { + self.stream()?; + } + } + ReceiverMode::Streaming => { + self.print(&dir_entry)?; + self.flush()?; + } + } + + self.num_results += 1; + if let Some(max_results) = self.config.max_results { + if self.num_results >= max_results { + return self.stop(); + } + } + } + WorkerResult::Error(err) => { + if self.config.show_filesystem_errors { + print_error(err.to_string()); + } } } - ReceiverMode::Streaming => { - self.print(&dir_entry)?; - self.flush()?; - } - } - - self.num_results += 1; - if let Some(max_results) = self.config.max_results { - if self.num_results >= max_results { - return self.stop(); - } - } - } - Ok(WorkerResult::Error(err)) => { - if self.config.show_filesystem_errors { - print_error(err.to_string()); } } Err(RecvTimeoutError::Timeout) => { @@ -319,13 +401,13 @@ impl WorkerState { /// Run the receiver work, either on this thread or a pool of background /// threads (for --exec). - fn receive(&self, rx: Receiver) -> ExitCode { + fn receive(&self, rx: Receiver) -> ExitCode { let config = &self.config; // This will be set to `Some` if the `--exec` argument was supplied. if let Some(ref cmd) = config.command { if cmd.in_batch_mode() { - exec::batch(rx, cmd, &config) + exec::batch(rx.into_iter().flatten(), cmd, &config) } else { let out_perm = Mutex::new(()); @@ -337,7 +419,8 @@ impl WorkerState { let rx = rx.clone(); // Spawn a job thread that will listen for and execute inputs. - let handle = scope.spawn(|| exec::job(rx, cmd, &out_perm, &config)); + let handle = scope + .spawn(|| exec::job(rx.into_iter().flatten(), cmd, &out_perm, &config)); // Push the handle of the spawned thread into the vector for later joining. handles.push(handle); @@ -355,12 +438,12 @@ impl WorkerState { } /// Spawn the sender threads. - fn spawn_senders(&self, walker: WalkParallel, tx: Sender) { + fn spawn_senders(&self, walker: WalkParallel, tx: Sender) { walker.run(|| { let patterns = &self.patterns; let config = &self.config; let quit_flag = self.quit_flag.as_ref(); - let tx = tx.clone(); + let mut tx = BatchSender::new(tx.clone()); Box::new(move |entry| { if quit_flag.load(Ordering::Relaxed) { @@ -545,8 +628,7 @@ impl WorkerState { .unwrap(); } - // Channel capacity was chosen empircally to perform similarly to an unbounded channel - let (tx, rx) = bounded(0x4000 * config.threads); + let (tx, rx) = bounded(config.threads); let exit_code = thread::scope(|scope| { // Spawn the receiver thread(s)