walk: New WorkerState struct shared by the worker threads

This commit is contained in:
Tavian Barnes 2023-10-27 16:43:06 -04:00 committed by David Peter
parent cd32a3827d
commit 8bbbd7679b
2 changed files with 337 additions and 313 deletions

View File

@ -103,7 +103,7 @@ fn run() -> Result<ExitCode> {
.map(|pat| build_regex(pat, &config)) .map(|pat| build_regex(pat, &config))
.collect::<Result<Vec<Regex>>>()?; .collect::<Result<Vec<Regex>>>()?;
walk::scan(&search_paths, Arc::new(regexps), Arc::new(config)) walk::scan(&search_paths, regexps, config)
} }
#[cfg(feature = "completions")] #[cfg(feature = "completions")]

View File

@ -1,18 +1,18 @@
use std::borrow::Cow;
use std::ffi::OsStr; use std::ffi::OsStr;
use std::io; use std::io::{self, Write};
use std::mem; use std::mem;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{borrow::Cow, io::Write};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use etcetera::BaseStrategy; use etcetera::BaseStrategy;
use ignore::overrides::OverrideBuilder; use ignore::overrides::{Override, OverrideBuilder};
use ignore::{self, WalkBuilder}; use ignore::{self, WalkBuilder, WalkParallel, WalkState};
use regex::bytes::Regex; use regex::bytes::Regex;
use crate::config::Config; use crate::config::Config;
@ -44,135 +44,18 @@ pub enum WorkerResult {
} }
/// Maximum size of the output buffer before flushing results to the console /// Maximum size of the output buffer before flushing results to the console
pub const MAX_BUFFER_LENGTH: usize = 1000; const MAX_BUFFER_LENGTH: usize = 1000;
/// Default duration until output buffering switches to streaming. /// Default duration until output buffering switches to streaming.
pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100);
/// Recursively scan the given search path for files / pathnames matching the patterns.
///
/// If the `--exec` argument was supplied, this will create a thread pool for executing
/// jobs in parallel from a given command line and the discovered paths. Otherwise, each
/// path will simply be written to standard output.
pub fn scan(paths: &[PathBuf], patterns: Arc<Vec<Regex>>, config: Arc<Config>) -> Result<ExitCode> {
let first_path = &paths[0];
// Channel capacity was chosen empircally to perform similarly to an unbounded channel
let (tx, rx) = bounded(0x4000 * config.threads);
let mut override_builder = OverrideBuilder::new(first_path);
for pattern in &config.exclude_patterns {
override_builder
.add(pattern)
.map_err(|e| anyhow!("Malformed exclude pattern: {}", e))?;
}
if config.read_vcsignore {
override_builder
.add("!.git/")
.expect("Invalid exclude pattern");
}
let overrides = override_builder
.build()
.map_err(|_| anyhow!("Mismatch in exclude patterns"))?;
let mut walker = WalkBuilder::new(first_path);
walker
.hidden(config.ignore_hidden)
.ignore(config.read_fdignore)
.parents(config.read_parent_ignore && (config.read_fdignore || config.read_vcsignore))
.git_ignore(config.read_vcsignore)
.git_global(config.read_vcsignore)
.git_exclude(config.read_vcsignore)
.require_git(config.require_git_to_read_vcsignore)
.overrides(overrides)
.follow_links(config.follow_links)
// No need to check for supported platforms, option is unavailable on unsupported ones
.same_file_system(config.one_file_system)
.max_depth(config.max_depth);
if config.read_fdignore {
walker.add_custom_ignore_filename(".fdignore");
}
if config.read_global_ignore {
if let Ok(basedirs) = etcetera::choose_base_strategy() {
let global_ignore_file = basedirs.config_dir().join("fd").join("ignore");
if global_ignore_file.is_file() {
let result = walker.add_ignore(global_ignore_file);
match result {
Some(ignore::Error::Partial(_)) => (),
Some(err) => {
print_error(format!("Malformed pattern in global ignore file. {}.", err));
}
None => (),
}
}
}
}
for ignore_file in &config.ignore_files {
let result = walker.add_ignore(ignore_file);
match result {
Some(ignore::Error::Partial(_)) => (),
Some(err) => {
print_error(format!("Malformed pattern in custom ignore file. {}.", err));
}
None => (),
}
}
for path in &paths[1..] {
walker.add(path);
}
let parallel_walker = walker.threads(config.threads).build_parallel();
// Flag for cleanly shutting down the parallel walk
let quit_flag = Arc::new(AtomicBool::new(false));
// Flag specifically for quitting due to ^C
let interrupt_flag = Arc::new(AtomicBool::new(false));
if config.ls_colors.is_some() && config.is_printing() {
let quit_flag = Arc::clone(&quit_flag);
let interrupt_flag = Arc::clone(&interrupt_flag);
ctrlc::set_handler(move || {
quit_flag.store(true, Ordering::Relaxed);
if interrupt_flag.fetch_or(true, Ordering::Relaxed) {
// Ctrl-C has been pressed twice, exit NOW
ExitCode::KilledBySigint.exit();
}
})
.unwrap();
}
// Spawn the thread that receives all results through the channel.
let receiver_thread = spawn_receiver(&config, &quit_flag, &interrupt_flag, rx);
// Spawn the sender threads.
spawn_senders(&config, &quit_flag, patterns, parallel_walker, tx);
// Wait for the receiver thread to print out all results.
let exit_code = receiver_thread.join().unwrap();
if interrupt_flag.load(Ordering::Relaxed) {
Ok(ExitCode::KilledBySigint)
} else {
Ok(exit_code)
}
}
/// Wrapper for the receiver thread's buffering behavior. /// Wrapper for the receiver thread's buffering behavior.
struct ReceiverBuffer<W> { struct ReceiverBuffer<'a, W> {
/// The configuration. /// The configuration.
config: Arc<Config>, config: &'a Config,
/// For shutting down the senders. /// For shutting down the senders.
quit_flag: Arc<AtomicBool>, quit_flag: &'a AtomicBool,
/// The ^C notifier. /// The ^C notifier.
interrupt_flag: Arc<AtomicBool>, interrupt_flag: &'a AtomicBool,
/// Receiver for worker results. /// Receiver for worker results.
rx: Receiver<WorkerResult>, rx: Receiver<WorkerResult>,
/// Standard output. /// Standard output.
@ -187,15 +70,12 @@ struct ReceiverBuffer<W> {
num_results: usize, num_results: usize,
} }
impl<W: Write> ReceiverBuffer<W> { impl<'a, W: Write> ReceiverBuffer<'a, W> {
/// Create a new receiver buffer. /// Create a new receiver buffer.
fn new( fn new(state: &'a WorkerState, rx: Receiver<WorkerResult>, stdout: W) -> Self {
config: Arc<Config>, let config = &state.config;
quit_flag: Arc<AtomicBool>, let quit_flag = state.quit_flag.as_ref();
interrupt_flag: Arc<AtomicBool>, let interrupt_flag = state.interrupt_flag.as_ref();
rx: Receiver<WorkerResult>,
stdout: W,
) -> Self {
let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME); let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME);
let deadline = Instant::now() + max_buffer_time; let deadline = Instant::now() + max_buffer_time;
@ -329,18 +209,119 @@ impl<W: Write> ReceiverBuffer<W> {
} }
} }
fn spawn_receiver( /// State shared by the sender and receiver threads.
config: &Arc<Config>, struct WorkerState {
quit_flag: &Arc<AtomicBool>, /// The search patterns.
interrupt_flag: &Arc<AtomicBool>, patterns: Vec<Regex>,
rx: Receiver<WorkerResult>, /// The command line configuration.
) -> thread::JoinHandle<ExitCode> { config: Config,
let config = Arc::clone(config); /// Flag for cleanly shutting down the parallel walk
let quit_flag = Arc::clone(quit_flag); quit_flag: Arc<AtomicBool>,
let interrupt_flag = Arc::clone(interrupt_flag); /// Flag specifically for quitting due to ^C
interrupt_flag: Arc<AtomicBool>,
}
impl WorkerState {
fn new(patterns: Vec<Regex>, config: Config) -> Self {
let quit_flag = Arc::new(AtomicBool::new(false));
let interrupt_flag = Arc::new(AtomicBool::new(false));
Self {
patterns,
config,
quit_flag,
interrupt_flag,
}
}
fn build_overrides(&self, paths: &[PathBuf]) -> Result<Override> {
let first_path = &paths[0];
let config = &self.config;
let mut builder = OverrideBuilder::new(first_path);
for pattern in &config.exclude_patterns {
builder
.add(pattern)
.map_err(|e| anyhow!("Malformed exclude pattern: {}", e))?;
}
if config.read_vcsignore {
builder.add("!.git/").expect("Invalid exclude pattern");
}
builder
.build()
.map_err(|_| anyhow!("Mismatch in exclude patterns"))
}
fn build_walker(&self, paths: &[PathBuf]) -> Result<WalkParallel> {
let first_path = &paths[0];
let config = &self.config;
let overrides = self.build_overrides(paths)?;
let mut builder = WalkBuilder::new(first_path);
builder
.hidden(config.ignore_hidden)
.ignore(config.read_fdignore)
.parents(config.read_parent_ignore && (config.read_fdignore || config.read_vcsignore))
.git_ignore(config.read_vcsignore)
.git_global(config.read_vcsignore)
.git_exclude(config.read_vcsignore)
.require_git(config.require_git_to_read_vcsignore)
.overrides(overrides)
.follow_links(config.follow_links)
// No need to check for supported platforms, option is unavailable on unsupported ones
.same_file_system(config.one_file_system)
.max_depth(config.max_depth);
if config.read_fdignore {
builder.add_custom_ignore_filename(".fdignore");
}
if config.read_global_ignore {
if let Ok(basedirs) = etcetera::choose_base_strategy() {
let global_ignore_file = basedirs.config_dir().join("fd").join("ignore");
if global_ignore_file.is_file() {
let result = builder.add_ignore(global_ignore_file);
match result {
Some(ignore::Error::Partial(_)) => (),
Some(err) => {
print_error(format!(
"Malformed pattern in global ignore file. {}.",
err
));
}
None => (),
}
}
}
}
for ignore_file in &config.ignore_files {
let result = builder.add_ignore(ignore_file);
match result {
Some(ignore::Error::Partial(_)) => (),
Some(err) => {
print_error(format!("Malformed pattern in custom ignore file. {}.", err));
}
None => (),
}
}
for path in &paths[1..] {
builder.add(path);
}
let walker = builder.threads(config.threads).build_parallel();
Ok(walker)
}
/// Run the receiver work, either on this thread or a pool of background
/// threads (for --exec).
fn receive(&self, rx: Receiver<WorkerResult>) -> ExitCode {
let config = &self.config;
let threads = config.threads;
thread::spawn(move || {
// This will be set to `Some` if the `--exec` argument was supplied. // This will be set to `Some` if the `--exec` argument was supplied.
if let Some(ref cmd) = config.command { if let Some(ref cmd) = config.command {
if cmd.in_batch_mode() { if cmd.in_batch_mode() {
@ -349,7 +330,8 @@ fn spawn_receiver(
let out_perm = Mutex::new(()); let out_perm = Mutex::new(());
thread::scope(|scope| { thread::scope(|scope| {
// Each spawned job will store it's thread handle in here. // Each spawned job will store its thread handle in here.
let threads = config.threads;
let mut handles = Vec::with_capacity(threads); let mut handles = Vec::with_capacity(threads);
for _ in 0..threads { for _ in 0..threads {
let rx = rx.clone(); let rx = rx.clone();
@ -365,188 +347,230 @@ fn spawn_receiver(
}) })
} }
} else { } else {
let stdout = io::stdout(); let stdout = io::stdout().lock();
let stdout = stdout.lock();
let stdout = io::BufWriter::new(stdout); let stdout = io::BufWriter::new(stdout);
let mut rxbuffer = ReceiverBuffer::new(config, quit_flag, interrupt_flag, rx, stdout); ReceiverBuffer::new(self, rx, stdout).process()
rxbuffer.process()
} }
}) }
}
fn spawn_senders( /// Spawn the sender threads.
config: &Arc<Config>, fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerResult>) {
quit_flag: &Arc<AtomicBool>, walker.run(|| {
patterns: Arc<Vec<Regex>>, let patterns = &self.patterns;
parallel_walker: ignore::WalkParallel, let config = &self.config;
tx: Sender<WorkerResult>, let quit_flag = self.quit_flag.as_ref();
) { let tx = tx.clone();
parallel_walker.run(|| {
let config = Arc::clone(config);
let patterns = Arc::clone(&patterns);
let tx_thread = tx.clone();
let quit_flag = Arc::clone(quit_flag);
Box::new(move |entry_o| { Box::new(move |entry| {
if quit_flag.load(Ordering::Relaxed) { if quit_flag.load(Ordering::Relaxed) {
return ignore::WalkState::Quit; return WalkState::Quit;
}
let entry = match entry_o {
Ok(ref e) if e.depth() == 0 => {
// Skip the root directory entry.
return ignore::WalkState::Continue;
} }
Ok(e) => DirEntry::normal(e),
Err(ignore::Error::WithPath { let entry = match entry {
path, Ok(ref e) if e.depth() == 0 => {
err: inner_err, // Skip the root directory entry.
}) => match inner_err.as_ref() { return WalkState::Continue;
ignore::Error::Io(io_error)
if io_error.kind() == io::ErrorKind::NotFound
&& path
.symlink_metadata()
.ok()
.map_or(false, |m| m.file_type().is_symlink()) =>
{
DirEntry::broken_symlink(path)
} }
_ => { Ok(e) => DirEntry::normal(e),
return match tx_thread.send(WorkerResult::Error(ignore::Error::WithPath { Err(ignore::Error::WithPath {
path, path,
err: inner_err, err: inner_err,
})) { }) => match inner_err.as_ref() {
Ok(_) => ignore::WalkState::Continue, ignore::Error::Io(io_error)
Err(_) => ignore::WalkState::Quit, if io_error.kind() == io::ErrorKind::NotFound
} && path
} .symlink_metadata()
}, .ok()
Err(err) => { .map_or(false, |m| m.file_type().is_symlink()) =>
return match tx_thread.send(WorkerResult::Error(err)) {
Ok(_) => ignore::WalkState::Continue,
Err(_) => ignore::WalkState::Quit,
}
}
};
if let Some(min_depth) = config.min_depth {
if entry.depth().map_or(true, |d| d < min_depth) {
return ignore::WalkState::Continue;
}
}
// Check the name first, since it doesn't require metadata
let entry_path = entry.path();
let search_str: Cow<OsStr> = if config.search_full_path {
let path_abs_buf = filesystem::path_absolute_form(entry_path)
.expect("Retrieving absolute path succeeds");
Cow::Owned(path_abs_buf.as_os_str().to_os_string())
} else {
match entry_path.file_name() {
Some(filename) => Cow::Borrowed(filename),
None => unreachable!(
"Encountered file system entry without a file name. This should only \
happen for paths like 'foo/bar/..' or '/' which are not supposed to \
appear in a file system traversal."
),
}
};
if !patterns
.iter()
.all(|pat| pat.is_match(&filesystem::osstr_to_bytes(search_str.as_ref())))
{
return ignore::WalkState::Continue;
}
// Filter out unwanted extensions.
if let Some(ref exts_regex) = config.extensions {
if let Some(path_str) = entry_path.file_name() {
if !exts_regex.is_match(&filesystem::osstr_to_bytes(path_str)) {
return ignore::WalkState::Continue;
}
} else {
return ignore::WalkState::Continue;
}
}
// Filter out unwanted file types.
if let Some(ref file_types) = config.file_types {
if file_types.should_ignore(&entry) {
return ignore::WalkState::Continue;
}
}
#[cfg(unix)]
{
if let Some(ref owner_constraint) = config.owner_constraint {
if let Some(metadata) = entry.metadata() {
if !owner_constraint.matches(metadata) {
return ignore::WalkState::Continue;
}
} else {
return ignore::WalkState::Continue;
}
}
}
// Filter out unwanted sizes if it is a file and we have been given size constraints.
if !config.size_constraints.is_empty() {
if entry_path.is_file() {
if let Some(metadata) = entry.metadata() {
let file_size = metadata.len();
if config
.size_constraints
.iter()
.any(|sc| !sc.is_within(file_size))
{ {
return ignore::WalkState::Continue; DirEntry::broken_symlink(path)
}
_ => {
return match tx.send(WorkerResult::Error(ignore::Error::WithPath {
path,
err: inner_err,
})) {
Ok(_) => WalkState::Continue,
Err(_) => WalkState::Quit,
}
}
},
Err(err) => {
return match tx.send(WorkerResult::Error(err)) {
Ok(_) => WalkState::Continue,
Err(_) => WalkState::Quit,
}
}
};
if let Some(min_depth) = config.min_depth {
if entry.depth().map_or(true, |d| d < min_depth) {
return WalkState::Continue;
}
}
// Check the name first, since it doesn't require metadata
let entry_path = entry.path();
let search_str: Cow<OsStr> = if config.search_full_path {
let path_abs_buf = filesystem::path_absolute_form(entry_path)
.expect("Retrieving absolute path succeeds");
Cow::Owned(path_abs_buf.as_os_str().to_os_string())
} else {
match entry_path.file_name() {
Some(filename) => Cow::Borrowed(filename),
None => unreachable!(
"Encountered file system entry without a file name. This should only \
happen for paths like 'foo/bar/..' or '/' which are not supposed to \
appear in a file system traversal."
),
}
};
if !patterns
.iter()
.all(|pat| pat.is_match(&filesystem::osstr_to_bytes(search_str.as_ref())))
{
return WalkState::Continue;
}
// Filter out unwanted extensions.
if let Some(ref exts_regex) = config.extensions {
if let Some(path_str) = entry_path.file_name() {
if !exts_regex.is_match(&filesystem::osstr_to_bytes(path_str)) {
return WalkState::Continue;
} }
} else { } else {
return ignore::WalkState::Continue; return WalkState::Continue;
}
} else {
return ignore::WalkState::Continue;
}
}
// Filter out unwanted modification times
if !config.time_constraints.is_empty() {
let mut matched = false;
if let Some(metadata) = entry.metadata() {
if let Ok(modified) = metadata.modified() {
matched = config
.time_constraints
.iter()
.all(|tf| tf.applies_to(&modified));
} }
} }
if !matched {
return ignore::WalkState::Continue; // Filter out unwanted file types.
if let Some(ref file_types) = config.file_types {
if file_types.should_ignore(&entry) {
return WalkState::Continue;
}
} }
}
if config.is_printing() { #[cfg(unix)]
if let Some(ls_colors) = &config.ls_colors { {
// Compute colors in parallel if let Some(ref owner_constraint) = config.owner_constraint {
entry.style(ls_colors); if let Some(metadata) = entry.metadata() {
if !owner_constraint.matches(metadata) {
return WalkState::Continue;
}
} else {
return WalkState::Continue;
}
}
} }
}
let send_result = tx_thread.send(WorkerResult::Entry(entry)); // Filter out unwanted sizes if it is a file and we have been given size constraints.
if !config.size_constraints.is_empty() {
if entry_path.is_file() {
if let Some(metadata) = entry.metadata() {
let file_size = metadata.len();
if config
.size_constraints
.iter()
.any(|sc| !sc.is_within(file_size))
{
return WalkState::Continue;
}
} else {
return WalkState::Continue;
}
} else {
return WalkState::Continue;
}
}
if send_result.is_err() { // Filter out unwanted modification times
return ignore::WalkState::Quit; if !config.time_constraints.is_empty() {
} let mut matched = false;
if let Some(metadata) = entry.metadata() {
if let Ok(modified) = metadata.modified() {
matched = config
.time_constraints
.iter()
.all(|tf| tf.applies_to(&modified));
}
}
if !matched {
return WalkState::Continue;
}
}
// Apply pruning. if config.is_printing() {
if config.prune { if let Some(ls_colors) = &config.ls_colors {
return ignore::WalkState::Skip; // Compute colors in parallel
} entry.style(ls_colors);
}
}
ignore::WalkState::Continue let send_result = tx.send(WorkerResult::Entry(entry));
})
}); if send_result.is_err() {
return WalkState::Quit;
}
// Apply pruning.
if config.prune {
return WalkState::Skip;
}
WalkState::Continue
})
});
}
/// Perform the recursive scan.
fn scan(&self, paths: &[PathBuf]) -> Result<ExitCode> {
let config = &self.config;
let walker = self.build_walker(paths)?;
if config.ls_colors.is_some() && config.is_printing() {
let quit_flag = Arc::clone(&self.quit_flag);
let interrupt_flag = Arc::clone(&self.interrupt_flag);
ctrlc::set_handler(move || {
quit_flag.store(true, Ordering::Relaxed);
if interrupt_flag.fetch_or(true, Ordering::Relaxed) {
// Ctrl-C has been pressed twice, exit NOW
ExitCode::KilledBySigint.exit();
}
})
.unwrap();
}
// Channel capacity was chosen empircally to perform similarly to an unbounded channel
let (tx, rx) = bounded(0x4000 * config.threads);
let exit_code = thread::scope(|scope| {
// Spawn the receiver thread(s)
let receiver = scope.spawn(|| self.receive(rx));
// Spawn the sender threads.
self.spawn_senders(walker, tx);
receiver.join().unwrap()
});
if self.interrupt_flag.load(Ordering::Relaxed) {
Ok(ExitCode::KilledBySigint)
} else {
Ok(exit_code)
}
}
}
/// Recursively scan the given search path for files / pathnames matching the patterns.
///
/// If the `--exec` argument was supplied, this will create a thread pool for executing
/// jobs in parallel from a given command line and the discovered paths. Otherwise, each
/// path will simply be written to standard output.
pub fn scan(paths: &[PathBuf], patterns: Vec<Regex>, config: Config) -> Result<ExitCode> {
WorkerState::new(patterns, config).scan(paths)
} }