diff --git a/src/main.rs b/src/main.rs index b661f67..a5f77b8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -103,7 +103,7 @@ fn run() -> Result { .map(|pat| build_regex(pat, &config)) .collect::>>()?; - walk::scan(&search_paths, Arc::new(regexps), Arc::new(config)) + walk::scan(&search_paths, regexps, config) } #[cfg(feature = "completions")] diff --git a/src/walk.rs b/src/walk.rs index 01fe38c..691c5d0 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -1,18 +1,18 @@ +use std::borrow::Cow; use std::ffi::OsStr; -use std::io; +use std::io::{self, Write}; use std::mem; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; -use std::{borrow::Cow, io::Write}; use anyhow::{anyhow, Result}; use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; use etcetera::BaseStrategy; -use ignore::overrides::OverrideBuilder; -use ignore::{self, WalkBuilder}; +use ignore::overrides::{Override, OverrideBuilder}; +use ignore::{self, WalkBuilder, WalkParallel, WalkState}; use regex::bytes::Regex; use crate::config::Config; @@ -44,135 +44,18 @@ pub enum WorkerResult { } /// Maximum size of the output buffer before flushing results to the console -pub const MAX_BUFFER_LENGTH: usize = 1000; +const MAX_BUFFER_LENGTH: usize = 1000; /// Default duration until output buffering switches to streaming. -pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); - -/// Recursively scan the given search path for files / pathnames matching the patterns. -/// -/// If the `--exec` argument was supplied, this will create a thread pool for executing -/// jobs in parallel from a given command line and the discovered paths. Otherwise, each -/// path will simply be written to standard output. -pub fn scan(paths: &[PathBuf], patterns: Arc>, config: Arc) -> Result { - let first_path = &paths[0]; - - // Channel capacity was chosen empircally to perform similarly to an unbounded channel - let (tx, rx) = bounded(0x4000 * config.threads); - - let mut override_builder = OverrideBuilder::new(first_path); - - for pattern in &config.exclude_patterns { - override_builder - .add(pattern) - .map_err(|e| anyhow!("Malformed exclude pattern: {}", e))?; - } - - if config.read_vcsignore { - override_builder - .add("!.git/") - .expect("Invalid exclude pattern"); - } - - let overrides = override_builder - .build() - .map_err(|_| anyhow!("Mismatch in exclude patterns"))?; - - let mut walker = WalkBuilder::new(first_path); - walker - .hidden(config.ignore_hidden) - .ignore(config.read_fdignore) - .parents(config.read_parent_ignore && (config.read_fdignore || config.read_vcsignore)) - .git_ignore(config.read_vcsignore) - .git_global(config.read_vcsignore) - .git_exclude(config.read_vcsignore) - .require_git(config.require_git_to_read_vcsignore) - .overrides(overrides) - .follow_links(config.follow_links) - // No need to check for supported platforms, option is unavailable on unsupported ones - .same_file_system(config.one_file_system) - .max_depth(config.max_depth); - - if config.read_fdignore { - walker.add_custom_ignore_filename(".fdignore"); - } - - if config.read_global_ignore { - if let Ok(basedirs) = etcetera::choose_base_strategy() { - let global_ignore_file = basedirs.config_dir().join("fd").join("ignore"); - if global_ignore_file.is_file() { - let result = walker.add_ignore(global_ignore_file); - match result { - Some(ignore::Error::Partial(_)) => (), - Some(err) => { - print_error(format!("Malformed pattern in global ignore file. {}.", err)); - } - None => (), - } - } - } - } - - for ignore_file in &config.ignore_files { - let result = walker.add_ignore(ignore_file); - match result { - Some(ignore::Error::Partial(_)) => (), - Some(err) => { - print_error(format!("Malformed pattern in custom ignore file. {}.", err)); - } - None => (), - } - } - - for path in &paths[1..] { - walker.add(path); - } - - let parallel_walker = walker.threads(config.threads).build_parallel(); - - // Flag for cleanly shutting down the parallel walk - let quit_flag = Arc::new(AtomicBool::new(false)); - // Flag specifically for quitting due to ^C - let interrupt_flag = Arc::new(AtomicBool::new(false)); - - if config.ls_colors.is_some() && config.is_printing() { - let quit_flag = Arc::clone(&quit_flag); - let interrupt_flag = Arc::clone(&interrupt_flag); - - ctrlc::set_handler(move || { - quit_flag.store(true, Ordering::Relaxed); - - if interrupt_flag.fetch_or(true, Ordering::Relaxed) { - // Ctrl-C has been pressed twice, exit NOW - ExitCode::KilledBySigint.exit(); - } - }) - .unwrap(); - } - - // Spawn the thread that receives all results through the channel. - let receiver_thread = spawn_receiver(&config, &quit_flag, &interrupt_flag, rx); - - // Spawn the sender threads. - spawn_senders(&config, &quit_flag, patterns, parallel_walker, tx); - - // Wait for the receiver thread to print out all results. - let exit_code = receiver_thread.join().unwrap(); - - if interrupt_flag.load(Ordering::Relaxed) { - Ok(ExitCode::KilledBySigint) - } else { - Ok(exit_code) - } -} +const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); /// Wrapper for the receiver thread's buffering behavior. -struct ReceiverBuffer { +struct ReceiverBuffer<'a, W> { /// The configuration. - config: Arc, + config: &'a Config, /// For shutting down the senders. - quit_flag: Arc, + quit_flag: &'a AtomicBool, /// The ^C notifier. - interrupt_flag: Arc, + interrupt_flag: &'a AtomicBool, /// Receiver for worker results. rx: Receiver, /// Standard output. @@ -187,15 +70,12 @@ struct ReceiverBuffer { num_results: usize, } -impl ReceiverBuffer { +impl<'a, W: Write> ReceiverBuffer<'a, W> { /// Create a new receiver buffer. - fn new( - config: Arc, - quit_flag: Arc, - interrupt_flag: Arc, - rx: Receiver, - stdout: W, - ) -> Self { + fn new(state: &'a WorkerState, rx: Receiver, stdout: W) -> Self { + let config = &state.config; + let quit_flag = state.quit_flag.as_ref(); + let interrupt_flag = state.interrupt_flag.as_ref(); let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME); let deadline = Instant::now() + max_buffer_time; @@ -329,18 +209,119 @@ impl ReceiverBuffer { } } -fn spawn_receiver( - config: &Arc, - quit_flag: &Arc, - interrupt_flag: &Arc, - rx: Receiver, -) -> thread::JoinHandle { - let config = Arc::clone(config); - let quit_flag = Arc::clone(quit_flag); - let interrupt_flag = Arc::clone(interrupt_flag); +/// State shared by the sender and receiver threads. +struct WorkerState { + /// The search patterns. + patterns: Vec, + /// The command line configuration. + config: Config, + /// Flag for cleanly shutting down the parallel walk + quit_flag: Arc, + /// Flag specifically for quitting due to ^C + interrupt_flag: Arc, +} + +impl WorkerState { + fn new(patterns: Vec, config: Config) -> Self { + let quit_flag = Arc::new(AtomicBool::new(false)); + let interrupt_flag = Arc::new(AtomicBool::new(false)); + + Self { + patterns, + config, + quit_flag, + interrupt_flag, + } + } + + fn build_overrides(&self, paths: &[PathBuf]) -> Result { + let first_path = &paths[0]; + let config = &self.config; + + let mut builder = OverrideBuilder::new(first_path); + + for pattern in &config.exclude_patterns { + builder + .add(pattern) + .map_err(|e| anyhow!("Malformed exclude pattern: {}", e))?; + } + + if config.read_vcsignore { + builder.add("!.git/").expect("Invalid exclude pattern"); + } + + builder + .build() + .map_err(|_| anyhow!("Mismatch in exclude patterns")) + } + + fn build_walker(&self, paths: &[PathBuf]) -> Result { + let first_path = &paths[0]; + let config = &self.config; + let overrides = self.build_overrides(paths)?; + + let mut builder = WalkBuilder::new(first_path); + builder + .hidden(config.ignore_hidden) + .ignore(config.read_fdignore) + .parents(config.read_parent_ignore && (config.read_fdignore || config.read_vcsignore)) + .git_ignore(config.read_vcsignore) + .git_global(config.read_vcsignore) + .git_exclude(config.read_vcsignore) + .require_git(config.require_git_to_read_vcsignore) + .overrides(overrides) + .follow_links(config.follow_links) + // No need to check for supported platforms, option is unavailable on unsupported ones + .same_file_system(config.one_file_system) + .max_depth(config.max_depth); + + if config.read_fdignore { + builder.add_custom_ignore_filename(".fdignore"); + } + + if config.read_global_ignore { + if let Ok(basedirs) = etcetera::choose_base_strategy() { + let global_ignore_file = basedirs.config_dir().join("fd").join("ignore"); + if global_ignore_file.is_file() { + let result = builder.add_ignore(global_ignore_file); + match result { + Some(ignore::Error::Partial(_)) => (), + Some(err) => { + print_error(format!( + "Malformed pattern in global ignore file. {}.", + err + )); + } + None => (), + } + } + } + } + + for ignore_file in &config.ignore_files { + let result = builder.add_ignore(ignore_file); + match result { + Some(ignore::Error::Partial(_)) => (), + Some(err) => { + print_error(format!("Malformed pattern in custom ignore file. {}.", err)); + } + None => (), + } + } + + for path in &paths[1..] { + builder.add(path); + } + + let walker = builder.threads(config.threads).build_parallel(); + Ok(walker) + } + + /// Run the receiver work, either on this thread or a pool of background + /// threads (for --exec). + fn receive(&self, rx: Receiver) -> ExitCode { + let config = &self.config; - let threads = config.threads; - thread::spawn(move || { // This will be set to `Some` if the `--exec` argument was supplied. if let Some(ref cmd) = config.command { if cmd.in_batch_mode() { @@ -349,7 +330,8 @@ fn spawn_receiver( let out_perm = Mutex::new(()); thread::scope(|scope| { - // Each spawned job will store it's thread handle in here. + // Each spawned job will store its thread handle in here. + let threads = config.threads; let mut handles = Vec::with_capacity(threads); for _ in 0..threads { let rx = rx.clone(); @@ -365,188 +347,230 @@ fn spawn_receiver( }) } } else { - let stdout = io::stdout(); - let stdout = stdout.lock(); + let stdout = io::stdout().lock(); let stdout = io::BufWriter::new(stdout); - let mut rxbuffer = ReceiverBuffer::new(config, quit_flag, interrupt_flag, rx, stdout); - rxbuffer.process() + ReceiverBuffer::new(self, rx, stdout).process() } - }) -} + } -fn spawn_senders( - config: &Arc, - quit_flag: &Arc, - patterns: Arc>, - parallel_walker: ignore::WalkParallel, - tx: Sender, -) { - parallel_walker.run(|| { - let config = Arc::clone(config); - let patterns = Arc::clone(&patterns); - let tx_thread = tx.clone(); - let quit_flag = Arc::clone(quit_flag); + /// Spawn the sender threads. + fn spawn_senders(&self, walker: WalkParallel, tx: Sender) { + walker.run(|| { + let patterns = &self.patterns; + let config = &self.config; + let quit_flag = self.quit_flag.as_ref(); + let tx = tx.clone(); - Box::new(move |entry_o| { - if quit_flag.load(Ordering::Relaxed) { - return ignore::WalkState::Quit; - } - - let entry = match entry_o { - Ok(ref e) if e.depth() == 0 => { - // Skip the root directory entry. - return ignore::WalkState::Continue; + Box::new(move |entry| { + if quit_flag.load(Ordering::Relaxed) { + return WalkState::Quit; } - Ok(e) => DirEntry::normal(e), - Err(ignore::Error::WithPath { - path, - err: inner_err, - }) => match inner_err.as_ref() { - ignore::Error::Io(io_error) - if io_error.kind() == io::ErrorKind::NotFound - && path - .symlink_metadata() - .ok() - .map_or(false, |m| m.file_type().is_symlink()) => - { - DirEntry::broken_symlink(path) + + let entry = match entry { + Ok(ref e) if e.depth() == 0 => { + // Skip the root directory entry. + return WalkState::Continue; } - _ => { - return match tx_thread.send(WorkerResult::Error(ignore::Error::WithPath { - path, - err: inner_err, - })) { - Ok(_) => ignore::WalkState::Continue, - Err(_) => ignore::WalkState::Quit, - } - } - }, - Err(err) => { - return match tx_thread.send(WorkerResult::Error(err)) { - Ok(_) => ignore::WalkState::Continue, - Err(_) => ignore::WalkState::Quit, - } - } - }; - - if let Some(min_depth) = config.min_depth { - if entry.depth().map_or(true, |d| d < min_depth) { - return ignore::WalkState::Continue; - } - } - - // Check the name first, since it doesn't require metadata - let entry_path = entry.path(); - - let search_str: Cow = if config.search_full_path { - let path_abs_buf = filesystem::path_absolute_form(entry_path) - .expect("Retrieving absolute path succeeds"); - Cow::Owned(path_abs_buf.as_os_str().to_os_string()) - } else { - match entry_path.file_name() { - Some(filename) => Cow::Borrowed(filename), - None => unreachable!( - "Encountered file system entry without a file name. This should only \ - happen for paths like 'foo/bar/..' or '/' which are not supposed to \ - appear in a file system traversal." - ), - } - }; - - if !patterns - .iter() - .all(|pat| pat.is_match(&filesystem::osstr_to_bytes(search_str.as_ref()))) - { - return ignore::WalkState::Continue; - } - - // Filter out unwanted extensions. - if let Some(ref exts_regex) = config.extensions { - if let Some(path_str) = entry_path.file_name() { - if !exts_regex.is_match(&filesystem::osstr_to_bytes(path_str)) { - return ignore::WalkState::Continue; - } - } else { - return ignore::WalkState::Continue; - } - } - - // Filter out unwanted file types. - if let Some(ref file_types) = config.file_types { - if file_types.should_ignore(&entry) { - return ignore::WalkState::Continue; - } - } - - #[cfg(unix)] - { - if let Some(ref owner_constraint) = config.owner_constraint { - if let Some(metadata) = entry.metadata() { - if !owner_constraint.matches(metadata) { - return ignore::WalkState::Continue; - } - } else { - return ignore::WalkState::Continue; - } - } - } - - // Filter out unwanted sizes if it is a file and we have been given size constraints. - if !config.size_constraints.is_empty() { - if entry_path.is_file() { - if let Some(metadata) = entry.metadata() { - let file_size = metadata.len(); - if config - .size_constraints - .iter() - .any(|sc| !sc.is_within(file_size)) + Ok(e) => DirEntry::normal(e), + Err(ignore::Error::WithPath { + path, + err: inner_err, + }) => match inner_err.as_ref() { + ignore::Error::Io(io_error) + if io_error.kind() == io::ErrorKind::NotFound + && path + .symlink_metadata() + .ok() + .map_or(false, |m| m.file_type().is_symlink()) => { - return ignore::WalkState::Continue; + DirEntry::broken_symlink(path) + } + _ => { + return match tx.send(WorkerResult::Error(ignore::Error::WithPath { + path, + err: inner_err, + })) { + Ok(_) => WalkState::Continue, + Err(_) => WalkState::Quit, + } + } + }, + Err(err) => { + return match tx.send(WorkerResult::Error(err)) { + Ok(_) => WalkState::Continue, + Err(_) => WalkState::Quit, + } + } + }; + + if let Some(min_depth) = config.min_depth { + if entry.depth().map_or(true, |d| d < min_depth) { + return WalkState::Continue; + } + } + + // Check the name first, since it doesn't require metadata + let entry_path = entry.path(); + + let search_str: Cow = if config.search_full_path { + let path_abs_buf = filesystem::path_absolute_form(entry_path) + .expect("Retrieving absolute path succeeds"); + Cow::Owned(path_abs_buf.as_os_str().to_os_string()) + } else { + match entry_path.file_name() { + Some(filename) => Cow::Borrowed(filename), + None => unreachable!( + "Encountered file system entry without a file name. This should only \ + happen for paths like 'foo/bar/..' or '/' which are not supposed to \ + appear in a file system traversal." + ), + } + }; + + if !patterns + .iter() + .all(|pat| pat.is_match(&filesystem::osstr_to_bytes(search_str.as_ref()))) + { + return WalkState::Continue; + } + + // Filter out unwanted extensions. + if let Some(ref exts_regex) = config.extensions { + if let Some(path_str) = entry_path.file_name() { + if !exts_regex.is_match(&filesystem::osstr_to_bytes(path_str)) { + return WalkState::Continue; } } else { - return ignore::WalkState::Continue; - } - } else { - return ignore::WalkState::Continue; - } - } - - // Filter out unwanted modification times - if !config.time_constraints.is_empty() { - let mut matched = false; - if let Some(metadata) = entry.metadata() { - if let Ok(modified) = metadata.modified() { - matched = config - .time_constraints - .iter() - .all(|tf| tf.applies_to(&modified)); + return WalkState::Continue; } } - if !matched { - return ignore::WalkState::Continue; + + // Filter out unwanted file types. + if let Some(ref file_types) = config.file_types { + if file_types.should_ignore(&entry) { + return WalkState::Continue; + } } - } - if config.is_printing() { - if let Some(ls_colors) = &config.ls_colors { - // Compute colors in parallel - entry.style(ls_colors); + #[cfg(unix)] + { + if let Some(ref owner_constraint) = config.owner_constraint { + if let Some(metadata) = entry.metadata() { + if !owner_constraint.matches(metadata) { + return WalkState::Continue; + } + } else { + return WalkState::Continue; + } + } } - } - let send_result = tx_thread.send(WorkerResult::Entry(entry)); + // Filter out unwanted sizes if it is a file and we have been given size constraints. + if !config.size_constraints.is_empty() { + if entry_path.is_file() { + if let Some(metadata) = entry.metadata() { + let file_size = metadata.len(); + if config + .size_constraints + .iter() + .any(|sc| !sc.is_within(file_size)) + { + return WalkState::Continue; + } + } else { + return WalkState::Continue; + } + } else { + return WalkState::Continue; + } + } - if send_result.is_err() { - return ignore::WalkState::Quit; - } + // Filter out unwanted modification times + if !config.time_constraints.is_empty() { + let mut matched = false; + if let Some(metadata) = entry.metadata() { + if let Ok(modified) = metadata.modified() { + matched = config + .time_constraints + .iter() + .all(|tf| tf.applies_to(&modified)); + } + } + if !matched { + return WalkState::Continue; + } + } - // Apply pruning. - if config.prune { - return ignore::WalkState::Skip; - } + if config.is_printing() { + if let Some(ls_colors) = &config.ls_colors { + // Compute colors in parallel + entry.style(ls_colors); + } + } - ignore::WalkState::Continue - }) - }); + let send_result = tx.send(WorkerResult::Entry(entry)); + + if send_result.is_err() { + return WalkState::Quit; + } + + // Apply pruning. + if config.prune { + return WalkState::Skip; + } + + WalkState::Continue + }) + }); + } + + /// Perform the recursive scan. + fn scan(&self, paths: &[PathBuf]) -> Result { + let config = &self.config; + let walker = self.build_walker(paths)?; + + if config.ls_colors.is_some() && config.is_printing() { + let quit_flag = Arc::clone(&self.quit_flag); + let interrupt_flag = Arc::clone(&self.interrupt_flag); + + ctrlc::set_handler(move || { + quit_flag.store(true, Ordering::Relaxed); + + if interrupt_flag.fetch_or(true, Ordering::Relaxed) { + // Ctrl-C has been pressed twice, exit NOW + ExitCode::KilledBySigint.exit(); + } + }) + .unwrap(); + } + + // Channel capacity was chosen empircally to perform similarly to an unbounded channel + let (tx, rx) = bounded(0x4000 * config.threads); + + let exit_code = thread::scope(|scope| { + // Spawn the receiver thread(s) + let receiver = scope.spawn(|| self.receive(rx)); + + // Spawn the sender threads. + self.spawn_senders(walker, tx); + + receiver.join().unwrap() + }); + + if self.interrupt_flag.load(Ordering::Relaxed) { + Ok(ExitCode::KilledBySigint) + } else { + Ok(exit_code) + } + } +} + +/// Recursively scan the given search path for files / pathnames matching the patterns. +/// +/// If the `--exec` argument was supplied, this will create a thread pool for executing +/// jobs in parallel from a given command line and the discovered paths. Otherwise, each +/// path will simply be written to standard output. +pub fn scan(paths: &[PathBuf], patterns: Vec, config: Config) -> Result { + WorkerState::new(patterns, config).scan(paths) }