From 16ae03c3b4d02052ce602bb079e0c3812b2e4ea3 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Fri, 26 Nov 2021 17:43:43 -0500 Subject: [PATCH 1/2] walk: Encapsulate the buffering behavior in a struct The new ReceiverBuffer struct allows us to factor out the receiver implementation into a number of helper methods. The new implementation uses rx.{recv,recv_timeout} instead of a for loop, which enables us to switch to streaming mode at the right time without waiting for more results. Fixes #868. --- src/walk.rs | 247 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 163 insertions(+), 84 deletions(-) diff --git a/src/walk.rs b/src/walk.rs index 8a1f291..244604b 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -1,12 +1,13 @@ use std::ffi::OsStr; use std::fs::{FileType, Metadata}; use std::io; +use std::mem; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread; -use std::time; +use std::time::{Duration, Instant}; use std::{borrow::Cow, io::Write}; use anyhow::{anyhow, Result}; @@ -23,6 +24,7 @@ use crate::filesystem; use crate::output; /// The receiver thread can either be buffering results or directly streaming to the console. +#[derive(PartialEq)] enum ReceiverMode { /// Receiver is still buffering in order to sort the results, if the search finishes fast /// enough. @@ -41,7 +43,7 @@ pub enum WorkerResult { /// Maximum size of the output buffer before flushing results to the console pub const MAX_BUFFER_LENGTH: usize = 1000; /// Default duration until output buffering switches to streaming. -pub const DEFAULT_MAX_BUFFER_TIME: time::Duration = time::Duration::from_millis(100); +pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); /// Recursively scan the given search path for files / pathnames matching the pattern. /// @@ -160,6 +162,161 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) -> R } } +/// Wrapper for the receiver thread's buffering behavior. +struct ReceiverBuffer { + /// The configuration. + config: Arc, + /// The ^C notifier. + wants_to_quit: Arc, + /// Receiver for worker results. + rx: Receiver, + /// Standard output. + stdout: W, + /// The current buffer mode. + mode: ReceiverMode, + /// The deadline to switch to streaming mode. + deadline: Instant, + /// The buffer of quickly received paths. + buffer: Vec, + /// Result count. + num_results: usize, +} + +impl ReceiverBuffer { + /// Create a new receiver buffer. + fn new( + config: Arc, + wants_to_quit: Arc, + rx: Receiver, + stdout: W, + ) -> Self { + let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME); + let deadline = Instant::now() + max_buffer_time; + + Self { + config, + wants_to_quit, + rx, + stdout, + mode: ReceiverMode::Buffering, + deadline, + buffer: Vec::with_capacity(MAX_BUFFER_LENGTH), + num_results: 0, + } + } + + /// Process results until finished. + fn process(&mut self) -> ExitCode { + loop { + if let Err(ec) = self.poll() { + return ec; + } + } + } + + /// Receive the next worker result. + fn recv(&self) -> Result { + match self.mode { + ReceiverMode::Buffering => { + // Wait at most until we should switch to streaming + let now = Instant::now(); + self.deadline + .checked_duration_since(now) + .ok_or(RecvTimeoutError::Timeout) + .and_then(|t| self.rx.recv_timeout(t)) + } + ReceiverMode::Streaming => { + // Wait however long it takes for a result + Ok(self.rx.recv()?) + } + } + } + + /// Wait for a result or state change. + fn poll(&mut self) -> Result<(), ExitCode> { + match self.recv() { + Ok(WorkerResult::Entry(path)) => { + if self.config.quiet { + return Err(ExitCode::HasResults(true)); + } + + match self.mode { + ReceiverMode::Buffering => { + self.buffer.push(path); + if self.buffer.len() > MAX_BUFFER_LENGTH { + self.stream()?; + } + } + ReceiverMode::Streaming => { + self.print(&path); + self.flush()?; + } + } + + self.num_results += 1; + if let Some(max_results) = self.config.max_results { + if self.num_results >= max_results { + return self.stop(); + } + } + } + Ok(WorkerResult::Error(err)) => { + if self.config.show_filesystem_errors { + print_error(err.to_string()); + } + } + Err(RecvTimeoutError::Timeout) => { + self.stream()?; + } + Err(RecvTimeoutError::Disconnected) => { + return self.stop(); + } + } + + Ok(()) + } + + /// Output a path. + fn print(&mut self, path: &Path) { + output::print_entry(&mut self.stdout, path, &self.config, &self.wants_to_quit) + } + + /// Switch ourselves into streaming mode. + fn stream(&mut self) -> Result<(), ExitCode> { + self.mode = ReceiverMode::Streaming; + + let buffer = mem::take(&mut self.buffer); + for path in buffer { + self.print(&path); + } + + self.flush() + } + + /// Stop looping. + fn stop(&mut self) -> Result<(), ExitCode> { + if self.mode == ReceiverMode::Buffering { + self.buffer.sort(); + self.stream()?; + } + + if self.config.quiet { + Err(ExitCode::HasResults(self.num_results > 0)) + } else { + Err(ExitCode::Success) + } + } + + /// Flush stdout if necessary. + fn flush(&mut self) -> Result<(), ExitCode> { + if self.config.interactive_terminal && self.stdout.flush().is_err() { + // Probably a broken pipe. Exit gracefully. + return Err(ExitCode::GeneralError); + } + Ok(()) + } +} + fn spawn_receiver( config: &Arc, wants_to_quit: &Arc, @@ -218,90 +375,12 @@ fn spawn_receiver( merge_exitcodes(exit_codes) } } else { - let start = time::Instant::now(); - - // Start in buffering mode - let mut mode = ReceiverMode::Buffering; - - // Maximum time to wait before we start streaming to the console. - let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME); - let stdout = io::stdout(); let stdout = stdout.lock(); - let mut stdout = io::BufWriter::new(stdout); + let stdout = io::BufWriter::new(stdout); - let mut num_results = 0; - let is_interactive = config.interactive_terminal; - let mut buffer = Vec::with_capacity(MAX_BUFFER_LENGTH); - for worker_result in rx { - match worker_result { - WorkerResult::Entry(path) => { - if config.quiet { - return ExitCode::HasResults(true); - } - - match mode { - ReceiverMode::Buffering => { - buffer.push(path); - - // Have we reached the maximum buffer size or maximum buffering time? - if buffer.len() > MAX_BUFFER_LENGTH - || start.elapsed() > max_buffer_time - { - // Flush the buffer - for path in &buffer { - output::print_entry( - &mut stdout, - path, - &config, - &wants_to_quit, - ); - } - buffer.clear(); - if is_interactive && stdout.flush().is_err() { - // Probably a broken pipe. Exit gracefully. - return ExitCode::GeneralError; - } - // Start streaming - mode = ReceiverMode::Streaming; - } - } - ReceiverMode::Streaming => { - output::print_entry(&mut stdout, &path, &config, &wants_to_quit); - if is_interactive && stdout.flush().is_err() { - // Probably a broken pipe. Exit gracefully. - return ExitCode::GeneralError; - } - } - } - - num_results += 1; - if let Some(max_results) = config.max_results { - if num_results >= max_results { - break; - } - } - } - WorkerResult::Error(err) => { - if show_filesystem_errors { - print_error(err.to_string()); - } - } - } - } - - // If we have finished fast enough (faster than max_buffer_time), we haven't streamed - // anything to the console, yet. In this case, sort the results and print them: - buffer.sort(); - for value in buffer { - output::print_entry(&mut stdout, &value, &config, &wants_to_quit); - } - - if config.quiet { - ExitCode::HasResults(false) - } else { - ExitCode::Success - } + let mut rxbuffer = ReceiverBuffer::new(config, wants_to_quit, rx, stdout); + rxbuffer.process() } }) } From a4bb734482c7bf3fd4d30081e7d3ce7786a3775e Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Wed, 1 Dec 2021 10:24:58 -0500 Subject: [PATCH 2/2] Switch from std::sync::mpsc to crossbeam-channel This lets us avoid https://github.com/rust-lang/rust/issues/39364, which could potentially be seen now that we're using recv_timeout(). --- Cargo.lock | 11 +++++++++++ Cargo.toml | 1 + src/exec/job.rs | 3 ++- src/walk.rs | 10 +++------- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4cc7fcf..045040a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,6 +108,16 @@ dependencies = [ "vec_map", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.5" @@ -164,6 +174,7 @@ dependencies = [ "atty", "chrono", "clap", + "crossbeam-channel", "ctrlc", "diff", "dirs-next", diff --git a/Cargo.toml b/Cargo.toml index e8f2129..e648b9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ dirs-next = "2.0" normpath = "0.3" chrono = "0.4" once_cell = "1.8.0" +crossbeam-channel = "0.5.1" [dependencies.clap] version = "2.31.3" diff --git a/src/exec/job.rs b/src/exec/job.rs index 85b30f1..0effc44 100644 --- a/src/exec/job.rs +++ b/src/exec/job.rs @@ -1,7 +1,8 @@ use std::path::PathBuf; -use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex}; +use crossbeam_channel::Receiver; + use crate::error::print_error; use crate::exit_codes::{merge_exitcodes, ExitCode}; use crate::walk::WorkerResult; diff --git a/src/walk.rs b/src/walk.rs index 244604b..151d568 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -4,13 +4,13 @@ use std::io; use std::mem; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{borrow::Cow, io::Write}; use anyhow::{anyhow, Result}; +use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use ignore::overrides::OverrideBuilder; use ignore::{self, WalkBuilder}; use once_cell::unsync::OnceCell; @@ -55,7 +55,7 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) -> R let first_path_buf = path_iter .next() .expect("Error: Path vector can not be empty"); - let (tx, rx) = channel(); + let (tx, rx) = unbounded(); let mut override_builder = OverrideBuilder::new(first_path_buf.as_path()); @@ -219,11 +219,7 @@ impl ReceiverBuffer { match self.mode { ReceiverMode::Buffering => { // Wait at most until we should switch to streaming - let now = Instant::now(); - self.deadline - .checked_duration_since(now) - .ok_or(RecvTimeoutError::Timeout) - .and_then(|t| self.rx.recv_timeout(t)) + self.rx.recv_deadline(self.deadline) } ReceiverMode::Streaming => { // Wait however long it takes for a result