2023-10-27 22:43:06 +02:00
|
|
|
use std::borrow::Cow;
|
2019-09-15 14:42:48 +02:00
|
|
|
use std::ffi::OsStr;
|
2023-10-27 22:43:06 +02:00
|
|
|
use std::io::{self, Write};
|
2021-11-26 23:43:43 +01:00
|
|
|
use std::mem;
|
2021-11-09 11:01:09 +01:00
|
|
|
use std::path::PathBuf;
|
2017-11-22 23:05:09 +01:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2023-11-05 20:55:37 +01:00
|
|
|
use std::sync::{Arc, Mutex, MutexGuard};
|
2017-10-10 08:01:17 +02:00
|
|
|
use std::thread;
|
2021-11-26 23:43:43 +01:00
|
|
|
use std::time::{Duration, Instant};
|
2017-10-10 08:01:17 +02:00
|
|
|
|
2020-04-03 21:18:54 +02:00
|
|
|
use anyhow::{anyhow, Result};
|
2023-11-05 20:55:37 +01:00
|
|
|
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, SendError, Sender};
|
2023-04-22 04:26:23 +02:00
|
|
|
use etcetera::BaseStrategy;
|
2023-10-27 22:43:06 +02:00
|
|
|
use ignore::overrides::{Override, OverrideBuilder};
|
2024-03-13 08:37:24 +01:00
|
|
|
use ignore::{WalkBuilder, WalkParallel, WalkState};
|
2019-09-15 14:42:48 +02:00
|
|
|
use regex::bytes::Regex;
|
2017-10-10 08:01:17 +02:00
|
|
|
|
2021-08-23 13:31:01 +02:00
|
|
|
use crate::config::Config;
|
2021-11-30 08:51:16 +01:00
|
|
|
use crate::dir_entry::DirEntry;
|
2020-04-03 21:18:54 +02:00
|
|
|
use crate::error::print_error;
|
|
|
|
use crate::exec;
|
|
|
|
use crate::exit_codes::{merge_exitcodes, ExitCode};
|
|
|
|
use crate::filesystem;
|
|
|
|
use crate::output;
|
|
|
|
|
2017-10-10 08:01:17 +02:00
|
|
|
/// The receiver thread can either be buffering results or directly streaming to the console.
|
2021-11-26 23:43:43 +01:00
|
|
|
#[derive(PartialEq)]
|
2017-10-10 08:01:17 +02:00
|
|
|
enum ReceiverMode {
|
|
|
|
/// Receiver is still buffering in order to sort the results, if the search finishes fast
|
|
|
|
/// enough.
|
|
|
|
Buffering,
|
|
|
|
|
|
|
|
/// Receiver is directly printing results to the output.
|
|
|
|
Streaming,
|
|
|
|
}
|
|
|
|
|
2018-09-30 15:01:23 +02:00
|
|
|
/// The Worker threads can result in a valid entry having PathBuf or an error.
|
2022-11-02 09:21:09 +01:00
|
|
|
#[allow(clippy::large_enum_variant)]
|
2023-11-05 20:55:37 +01:00
|
|
|
#[derive(Debug)]
|
2018-09-30 15:01:23 +02:00
|
|
|
pub enum WorkerResult {
|
2022-11-02 09:21:09 +01:00
|
|
|
// Errors should be rare, so it's probably better to allow large_enum_variant than
|
|
|
|
// to box the Entry variant
|
2021-11-09 11:01:09 +01:00
|
|
|
Entry(DirEntry),
|
2018-09-30 22:56:32 +02:00
|
|
|
Error(ignore::Error),
|
2018-09-30 15:01:23 +02:00
|
|
|
}
|
|
|
|
|
2023-11-05 20:55:37 +01:00
|
|
|
/// A batch of WorkerResults to send over a channel.
|
|
|
|
#[derive(Clone)]
|
|
|
|
struct Batch {
|
|
|
|
items: Arc<Mutex<Option<Vec<WorkerResult>>>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Batch {
|
|
|
|
fn new() -> Self {
|
|
|
|
Self {
|
|
|
|
items: Arc::new(Mutex::new(Some(vec![]))),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn lock(&self) -> MutexGuard<'_, Option<Vec<WorkerResult>>> {
|
|
|
|
self.items.lock().unwrap()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl IntoIterator for Batch {
|
|
|
|
type Item = WorkerResult;
|
|
|
|
type IntoIter = std::vec::IntoIter<WorkerResult>;
|
|
|
|
|
|
|
|
fn into_iter(self) -> Self::IntoIter {
|
|
|
|
self.lock().take().unwrap().into_iter()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Wrapper that sends batches of items at once over a channel.
|
|
|
|
struct BatchSender {
|
|
|
|
batch: Batch,
|
|
|
|
tx: Sender<Batch>,
|
2023-11-08 16:24:00 +01:00
|
|
|
limit: usize,
|
2023-11-05 20:55:37 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
impl BatchSender {
|
2023-11-08 16:24:00 +01:00
|
|
|
fn new(tx: Sender<Batch>, limit: usize) -> Self {
|
2023-11-05 20:55:37 +01:00
|
|
|
Self {
|
|
|
|
batch: Batch::new(),
|
|
|
|
tx,
|
2023-11-08 16:24:00 +01:00
|
|
|
limit,
|
2023-11-05 20:55:37 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Check if we need to flush a batch.
|
2023-11-08 16:24:00 +01:00
|
|
|
fn needs_flush(&self, batch: Option<&Vec<WorkerResult>>) -> bool {
|
2023-11-05 20:55:37 +01:00
|
|
|
match batch {
|
|
|
|
// Limit the batch size to provide some backpressure
|
2023-11-08 16:24:00 +01:00
|
|
|
Some(vec) => vec.len() >= self.limit,
|
2023-11-05 20:55:37 +01:00
|
|
|
// Batch was already taken by the receiver, so make a new one
|
|
|
|
None => true,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Add an item to a batch.
|
|
|
|
fn send(&mut self, item: WorkerResult) -> Result<(), SendError<()>> {
|
|
|
|
let mut batch = self.batch.lock();
|
|
|
|
|
2023-11-08 16:24:00 +01:00
|
|
|
if self.needs_flush(batch.as_ref()) {
|
2023-11-05 20:55:37 +01:00
|
|
|
drop(batch);
|
|
|
|
self.batch = Batch::new();
|
|
|
|
batch = self.batch.lock();
|
|
|
|
}
|
|
|
|
|
|
|
|
let items = batch.as_mut().unwrap();
|
|
|
|
items.push(item);
|
|
|
|
|
|
|
|
if items.len() == 1 {
|
|
|
|
// New batch, send it over the channel
|
|
|
|
self.tx
|
|
|
|
.send(self.batch.clone())
|
|
|
|
.map_err(|_| SendError(()))?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-03 11:51:50 +02:00
|
|
|
/// Maximum size of the output buffer before flushing results to the console
|
2023-10-27 22:43:06 +02:00
|
|
|
const MAX_BUFFER_LENGTH: usize = 1000;
|
2021-08-21 22:44:35 +02:00
|
|
|
/// Default duration until output buffering switches to streaming.
|
2023-10-27 22:43:06 +02:00
|
|
|
const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100);
|
2019-01-26 01:01:01 +01:00
|
|
|
|
2021-11-26 23:43:43 +01:00
|
|
|
/// Wrapper for the receiver thread's buffering behavior.
|
2023-10-27 22:43:06 +02:00
|
|
|
struct ReceiverBuffer<'a, W> {
|
2021-11-26 23:43:43 +01:00
|
|
|
/// The configuration.
|
2023-10-27 22:43:06 +02:00
|
|
|
config: &'a Config,
|
2021-12-05 17:52:18 +01:00
|
|
|
/// For shutting down the senders.
|
2023-10-27 22:43:06 +02:00
|
|
|
quit_flag: &'a AtomicBool,
|
2021-11-26 23:43:43 +01:00
|
|
|
/// The ^C notifier.
|
2023-10-27 22:43:06 +02:00
|
|
|
interrupt_flag: &'a AtomicBool,
|
2021-11-26 23:43:43 +01:00
|
|
|
/// Receiver for worker results.
|
2023-11-05 20:55:37 +01:00
|
|
|
rx: Receiver<Batch>,
|
2021-11-26 23:43:43 +01:00
|
|
|
/// Standard output.
|
|
|
|
stdout: W,
|
|
|
|
/// The current buffer mode.
|
|
|
|
mode: ReceiverMode,
|
|
|
|
/// The deadline to switch to streaming mode.
|
|
|
|
deadline: Instant,
|
|
|
|
/// The buffer of quickly received paths.
|
2021-11-09 11:01:09 +01:00
|
|
|
buffer: Vec<DirEntry>,
|
2021-11-26 23:43:43 +01:00
|
|
|
/// Result count.
|
|
|
|
num_results: usize,
|
|
|
|
}
|
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
impl<'a, W: Write> ReceiverBuffer<'a, W> {
|
2021-11-26 23:43:43 +01:00
|
|
|
/// Create a new receiver buffer.
|
2023-11-05 20:55:37 +01:00
|
|
|
fn new(state: &'a WorkerState, rx: Receiver<Batch>, stdout: W) -> Self {
|
2023-10-27 22:43:06 +02:00
|
|
|
let config = &state.config;
|
|
|
|
let quit_flag = state.quit_flag.as_ref();
|
|
|
|
let interrupt_flag = state.interrupt_flag.as_ref();
|
2021-11-26 23:43:43 +01:00
|
|
|
let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME);
|
|
|
|
let deadline = Instant::now() + max_buffer_time;
|
|
|
|
|
|
|
|
Self {
|
|
|
|
config,
|
2021-12-05 17:52:18 +01:00
|
|
|
quit_flag,
|
|
|
|
interrupt_flag,
|
2021-11-26 23:43:43 +01:00
|
|
|
rx,
|
|
|
|
stdout,
|
|
|
|
mode: ReceiverMode::Buffering,
|
|
|
|
deadline,
|
|
|
|
buffer: Vec::with_capacity(MAX_BUFFER_LENGTH),
|
|
|
|
num_results: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Process results until finished.
|
|
|
|
fn process(&mut self) -> ExitCode {
|
|
|
|
loop {
|
|
|
|
if let Err(ec) = self.poll() {
|
2021-12-05 17:52:18 +01:00
|
|
|
self.quit_flag.store(true, Ordering::Relaxed);
|
2021-11-26 23:43:43 +01:00
|
|
|
return ec;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Receive the next worker result.
|
2023-11-05 20:55:37 +01:00
|
|
|
fn recv(&self) -> Result<Batch, RecvTimeoutError> {
|
2021-11-26 23:43:43 +01:00
|
|
|
match self.mode {
|
|
|
|
ReceiverMode::Buffering => {
|
|
|
|
// Wait at most until we should switch to streaming
|
2022-10-24 16:17:02 +02:00
|
|
|
self.rx.recv_deadline(self.deadline)
|
2021-11-26 23:43:43 +01:00
|
|
|
}
|
|
|
|
ReceiverMode::Streaming => {
|
|
|
|
// Wait however long it takes for a result
|
|
|
|
Ok(self.rx.recv()?)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Wait for a result or state change.
|
|
|
|
fn poll(&mut self) -> Result<(), ExitCode> {
|
|
|
|
match self.recv() {
|
2023-11-05 20:55:37 +01:00
|
|
|
Ok(batch) => {
|
|
|
|
for result in batch {
|
|
|
|
match result {
|
|
|
|
WorkerResult::Entry(dir_entry) => {
|
|
|
|
if self.config.quiet {
|
|
|
|
return Err(ExitCode::HasResults(true));
|
|
|
|
}
|
2021-11-26 23:43:43 +01:00
|
|
|
|
2023-11-05 20:55:37 +01:00
|
|
|
match self.mode {
|
|
|
|
ReceiverMode::Buffering => {
|
|
|
|
self.buffer.push(dir_entry);
|
|
|
|
if self.buffer.len() > MAX_BUFFER_LENGTH {
|
|
|
|
self.stream()?;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ReceiverMode::Streaming => {
|
|
|
|
self.print(&dir_entry)?;
|
|
|
|
}
|
|
|
|
}
|
2021-11-26 23:43:43 +01:00
|
|
|
|
2023-11-05 20:55:37 +01:00
|
|
|
self.num_results += 1;
|
|
|
|
if let Some(max_results) = self.config.max_results {
|
|
|
|
if self.num_results >= max_results {
|
|
|
|
return self.stop();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
WorkerResult::Error(err) => {
|
|
|
|
if self.config.show_filesystem_errors {
|
|
|
|
print_error(err.to_string());
|
|
|
|
}
|
|
|
|
}
|
2021-11-26 23:43:43 +01:00
|
|
|
}
|
|
|
|
}
|
walk: Flush stdout in batches
The previous behaviour was designed to mimic the output buffering of
typical UNIX tools: line-buffered if stdout is a TTY, and fully-buffered
otherwise. More precicely, when printing to a terminal, fd would flush
explicitly after printing any buffered results, then flush after every
single result once streaming mode started. When not printing to a
terminal, fd never explicitly flushed, so writes would only happen as
the BufWriter filled up.
The new behaviour actually unifies the TTY and non-TTY cases: we flush
after printing the buffered results, then once we start streaming, we
flush after each batch, but *only when the channel is empty*. This
provides a good balance: if the channel is empty, the receiver thread
might as well flush before it goes to sleep waiting for more results.
If the channel is non-empty, we might as well process those results
before deciding to flush.
For TTYs, this should improve performance by consolidating write() calls
without sacrificing interactivity. For non-TTYs, we'll be flushing more
often, but only when the receiver would otherwise have nothing to do,
thus improving interactivity without sacrificing performance. This is
particularly handy when fd is piped into another command (such as head
or grep): with the old behaviour, fd could wait for the whole traversal
to finish before printing anything. With the new behaviour, fd will
print those results soon after they are received.
Fixes #1313.
2023-12-13 20:23:14 +01:00
|
|
|
|
|
|
|
// If we don't have another batch ready, flush before waiting
|
|
|
|
if self.mode == ReceiverMode::Streaming && self.rx.is_empty() {
|
|
|
|
self.flush()?;
|
|
|
|
}
|
2021-11-26 23:43:43 +01:00
|
|
|
}
|
|
|
|
Err(RecvTimeoutError::Timeout) => {
|
|
|
|
self.stream()?;
|
|
|
|
}
|
|
|
|
Err(RecvTimeoutError::Disconnected) => {
|
|
|
|
return self.stop();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Output a path.
|
2021-11-09 11:01:09 +01:00
|
|
|
fn print(&mut self, entry: &DirEntry) -> Result<(), ExitCode> {
|
2024-03-08 19:46:24 +01:00
|
|
|
output::print_entry(&mut self.stdout, entry, self.config);
|
2021-12-05 17:23:54 +01:00
|
|
|
|
2021-12-05 17:52:18 +01:00
|
|
|
if self.interrupt_flag.load(Ordering::Relaxed) {
|
2021-12-05 17:23:54 +01:00
|
|
|
// Ignore any errors on flush, because we're about to exit anyway
|
|
|
|
let _ = self.flush();
|
|
|
|
return Err(ExitCode::KilledBySigint);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
2021-11-26 23:43:43 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Switch ourselves into streaming mode.
|
|
|
|
fn stream(&mut self) -> Result<(), ExitCode> {
|
|
|
|
self.mode = ReceiverMode::Streaming;
|
|
|
|
|
|
|
|
let buffer = mem::take(&mut self.buffer);
|
|
|
|
for path in buffer {
|
2021-12-05 17:23:54 +01:00
|
|
|
self.print(&path)?;
|
2021-11-26 23:43:43 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
self.flush()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Stop looping.
|
|
|
|
fn stop(&mut self) -> Result<(), ExitCode> {
|
|
|
|
if self.mode == ReceiverMode::Buffering {
|
2022-03-16 17:38:16 +01:00
|
|
|
self.buffer.sort();
|
2021-11-26 23:43:43 +01:00
|
|
|
self.stream()?;
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.config.quiet {
|
|
|
|
Err(ExitCode::HasResults(self.num_results > 0))
|
|
|
|
} else {
|
|
|
|
Err(ExitCode::Success)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Flush stdout if necessary.
|
|
|
|
fn flush(&mut self) -> Result<(), ExitCode> {
|
walk: Flush stdout in batches
The previous behaviour was designed to mimic the output buffering of
typical UNIX tools: line-buffered if stdout is a TTY, and fully-buffered
otherwise. More precicely, when printing to a terminal, fd would flush
explicitly after printing any buffered results, then flush after every
single result once streaming mode started. When not printing to a
terminal, fd never explicitly flushed, so writes would only happen as
the BufWriter filled up.
The new behaviour actually unifies the TTY and non-TTY cases: we flush
after printing the buffered results, then once we start streaming, we
flush after each batch, but *only when the channel is empty*. This
provides a good balance: if the channel is empty, the receiver thread
might as well flush before it goes to sleep waiting for more results.
If the channel is non-empty, we might as well process those results
before deciding to flush.
For TTYs, this should improve performance by consolidating write() calls
without sacrificing interactivity. For non-TTYs, we'll be flushing more
often, but only when the receiver would otherwise have nothing to do,
thus improving interactivity without sacrificing performance. This is
particularly handy when fd is piped into another command (such as head
or grep): with the old behaviour, fd could wait for the whole traversal
to finish before printing anything. With the new behaviour, fd will
print those results soon after they are received.
Fixes #1313.
2023-12-13 20:23:14 +01:00
|
|
|
if self.stdout.flush().is_err() {
|
2021-11-26 23:43:43 +01:00
|
|
|
// Probably a broken pipe. Exit gracefully.
|
|
|
|
return Err(ExitCode::GeneralError);
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
/// State shared by the sender and receiver threads.
|
|
|
|
struct WorkerState {
|
|
|
|
/// The search patterns.
|
|
|
|
patterns: Vec<Regex>,
|
|
|
|
/// The command line configuration.
|
|
|
|
config: Config,
|
|
|
|
/// Flag for cleanly shutting down the parallel walk
|
|
|
|
quit_flag: Arc<AtomicBool>,
|
|
|
|
/// Flag specifically for quitting due to ^C
|
|
|
|
interrupt_flag: Arc<AtomicBool>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl WorkerState {
|
|
|
|
fn new(patterns: Vec<Regex>, config: Config) -> Self {
|
|
|
|
let quit_flag = Arc::new(AtomicBool::new(false));
|
|
|
|
let interrupt_flag = Arc::new(AtomicBool::new(false));
|
|
|
|
|
|
|
|
Self {
|
|
|
|
patterns,
|
|
|
|
config,
|
|
|
|
quit_flag,
|
|
|
|
interrupt_flag,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn build_overrides(&self, paths: &[PathBuf]) -> Result<Override> {
|
|
|
|
let first_path = &paths[0];
|
|
|
|
let config = &self.config;
|
|
|
|
|
|
|
|
let mut builder = OverrideBuilder::new(first_path);
|
|
|
|
|
|
|
|
for pattern in &config.exclude_patterns {
|
|
|
|
builder
|
|
|
|
.add(pattern)
|
|
|
|
.map_err(|e| anyhow!("Malformed exclude pattern: {}", e))?;
|
|
|
|
}
|
|
|
|
|
|
|
|
if config.read_vcsignore {
|
|
|
|
builder.add("!.git/").expect("Invalid exclude pattern");
|
|
|
|
}
|
|
|
|
|
|
|
|
builder
|
|
|
|
.build()
|
|
|
|
.map_err(|_| anyhow!("Mismatch in exclude patterns"))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn build_walker(&self, paths: &[PathBuf]) -> Result<WalkParallel> {
|
|
|
|
let first_path = &paths[0];
|
|
|
|
let config = &self.config;
|
|
|
|
let overrides = self.build_overrides(paths)?;
|
|
|
|
|
|
|
|
let mut builder = WalkBuilder::new(first_path);
|
|
|
|
builder
|
|
|
|
.hidden(config.ignore_hidden)
|
|
|
|
.ignore(config.read_fdignore)
|
|
|
|
.parents(config.read_parent_ignore && (config.read_fdignore || config.read_vcsignore))
|
|
|
|
.git_ignore(config.read_vcsignore)
|
|
|
|
.git_global(config.read_vcsignore)
|
|
|
|
.git_exclude(config.read_vcsignore)
|
|
|
|
.require_git(config.require_git_to_read_vcsignore)
|
|
|
|
.overrides(overrides)
|
|
|
|
.follow_links(config.follow_links)
|
|
|
|
// No need to check for supported platforms, option is unavailable on unsupported ones
|
|
|
|
.same_file_system(config.one_file_system)
|
|
|
|
.max_depth(config.max_depth);
|
|
|
|
|
|
|
|
if config.read_fdignore {
|
|
|
|
builder.add_custom_ignore_filename(".fdignore");
|
|
|
|
}
|
|
|
|
|
|
|
|
if config.read_global_ignore {
|
|
|
|
if let Ok(basedirs) = etcetera::choose_base_strategy() {
|
|
|
|
let global_ignore_file = basedirs.config_dir().join("fd").join("ignore");
|
|
|
|
if global_ignore_file.is_file() {
|
|
|
|
let result = builder.add_ignore(global_ignore_file);
|
|
|
|
match result {
|
|
|
|
Some(ignore::Error::Partial(_)) => (),
|
|
|
|
Some(err) => {
|
|
|
|
print_error(format!(
|
|
|
|
"Malformed pattern in global ignore file. {}.",
|
|
|
|
err
|
|
|
|
));
|
|
|
|
}
|
|
|
|
None => (),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for ignore_file in &config.ignore_files {
|
|
|
|
let result = builder.add_ignore(ignore_file);
|
|
|
|
match result {
|
|
|
|
Some(ignore::Error::Partial(_)) => (),
|
|
|
|
Some(err) => {
|
|
|
|
print_error(format!("Malformed pattern in custom ignore file. {}.", err));
|
|
|
|
}
|
|
|
|
None => (),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for path in &paths[1..] {
|
|
|
|
builder.add(path);
|
|
|
|
}
|
|
|
|
|
|
|
|
let walker = builder.threads(config.threads).build_parallel();
|
|
|
|
Ok(walker)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Run the receiver work, either on this thread or a pool of background
|
|
|
|
/// threads (for --exec).
|
2023-11-05 20:55:37 +01:00
|
|
|
fn receive(&self, rx: Receiver<Batch>) -> ExitCode {
|
2023-10-27 22:43:06 +02:00
|
|
|
let config = &self.config;
|
2019-01-26 01:12:55 +01:00
|
|
|
|
2017-10-14 18:04:11 +02:00
|
|
|
// This will be set to `Some` if the `--exec` argument was supplied.
|
2019-01-26 01:12:55 +01:00
|
|
|
if let Some(ref cmd) = config.command {
|
2018-11-12 18:43:40 +01:00
|
|
|
if cmd.in_batch_mode() {
|
2024-03-08 19:46:24 +01:00
|
|
|
exec::batch(rx.into_iter().flatten(), cmd, config)
|
2018-11-11 18:00:01 +01:00
|
|
|
} else {
|
2023-01-19 07:24:05 +01:00
|
|
|
let out_perm = Mutex::new(());
|
2018-11-11 18:00:01 +01:00
|
|
|
|
2023-01-19 07:24:05 +01:00
|
|
|
thread::scope(|scope| {
|
2023-10-27 22:43:06 +02:00
|
|
|
// Each spawned job will store its thread handle in here.
|
|
|
|
let threads = config.threads;
|
2023-01-19 07:24:05 +01:00
|
|
|
let mut handles = Vec::with_capacity(threads);
|
|
|
|
for _ in 0..threads {
|
|
|
|
let rx = rx.clone();
|
2018-11-11 18:00:01 +01:00
|
|
|
|
2023-01-19 07:24:05 +01:00
|
|
|
// Spawn a job thread that will listen for and execute inputs.
|
2023-11-05 20:55:37 +01:00
|
|
|
let handle = scope
|
2024-03-08 19:46:24 +01:00
|
|
|
.spawn(|| exec::job(rx.into_iter().flatten(), cmd, &out_perm, config));
|
2018-11-11 18:00:01 +01:00
|
|
|
|
2023-01-19 07:24:05 +01:00
|
|
|
// Push the handle of the spawned thread into the vector for later joining.
|
|
|
|
handles.push(handle);
|
|
|
|
}
|
2023-01-19 07:36:54 +01:00
|
|
|
let exit_codes = handles.into_iter().map(|handle| handle.join().unwrap());
|
2023-01-19 07:24:05 +01:00
|
|
|
merge_exitcodes(exit_codes)
|
|
|
|
})
|
2017-10-14 20:04:04 +02:00
|
|
|
}
|
2017-10-14 18:04:11 +02:00
|
|
|
} else {
|
2023-10-27 22:43:06 +02:00
|
|
|
let stdout = io::stdout().lock();
|
2021-11-26 23:43:43 +01:00
|
|
|
let stdout = io::BufWriter::new(stdout);
|
2021-08-14 17:57:01 +02:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
ReceiverBuffer::new(self, rx, stdout).process()
|
2017-10-10 08:01:17 +02:00
|
|
|
}
|
2023-10-27 22:43:06 +02:00
|
|
|
}
|
2018-01-03 09:26:11 +01:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
/// Spawn the sender threads.
|
2023-11-05 20:55:37 +01:00
|
|
|
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<Batch>) {
|
2023-10-27 22:43:06 +02:00
|
|
|
walker.run(|| {
|
|
|
|
let patterns = &self.patterns;
|
|
|
|
let config = &self.config;
|
|
|
|
let quit_flag = self.quit_flag.as_ref();
|
2023-11-08 16:24:00 +01:00
|
|
|
|
|
|
|
let mut limit = 0x100;
|
|
|
|
if let Some(cmd) = &config.command {
|
|
|
|
if !cmd.in_batch_mode() && config.threads > 1 {
|
|
|
|
// Evenly distribute work between multiple receivers
|
|
|
|
limit = 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let mut tx = BatchSender::new(tx.clone(), limit);
|
2023-10-27 22:43:06 +02:00
|
|
|
|
|
|
|
Box::new(move |entry| {
|
|
|
|
if quit_flag.load(Ordering::Relaxed) {
|
|
|
|
return WalkState::Quit;
|
2020-02-28 18:19:54 +01:00
|
|
|
}
|
2023-10-27 22:43:06 +02:00
|
|
|
|
|
|
|
let entry = match entry {
|
|
|
|
Ok(ref e) if e.depth() == 0 => {
|
|
|
|
// Skip the root directory entry.
|
|
|
|
return WalkState::Continue;
|
2020-02-28 18:19:54 +01:00
|
|
|
}
|
2023-10-27 22:43:06 +02:00
|
|
|
Ok(e) => DirEntry::normal(e),
|
|
|
|
Err(ignore::Error::WithPath {
|
|
|
|
path,
|
|
|
|
err: inner_err,
|
|
|
|
}) => match inner_err.as_ref() {
|
|
|
|
ignore::Error::Io(io_error)
|
|
|
|
if io_error.kind() == io::ErrorKind::NotFound
|
|
|
|
&& path
|
|
|
|
.symlink_metadata()
|
|
|
|
.ok()
|
|
|
|
.map_or(false, |m| m.file_type().is_symlink()) =>
|
|
|
|
{
|
|
|
|
DirEntry::broken_symlink(path)
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
return match tx.send(WorkerResult::Error(ignore::Error::WithPath {
|
|
|
|
path,
|
|
|
|
err: inner_err,
|
|
|
|
})) {
|
|
|
|
Ok(_) => WalkState::Continue,
|
|
|
|
Err(_) => WalkState::Quit,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
Err(err) => {
|
|
|
|
return match tx.send(WorkerResult::Error(err)) {
|
|
|
|
Ok(_) => WalkState::Continue,
|
|
|
|
Err(_) => WalkState::Quit,
|
2020-10-27 20:26:34 +01:00
|
|
|
}
|
|
|
|
}
|
2023-10-27 22:43:06 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
if let Some(min_depth) = config.min_depth {
|
|
|
|
if entry.depth().map_or(true, |d| d < min_depth) {
|
|
|
|
return WalkState::Continue;
|
2020-10-27 20:26:34 +01:00
|
|
|
}
|
2021-08-09 09:57:53 +02:00
|
|
|
}
|
2017-10-10 08:01:17 +02:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
// Check the name first, since it doesn't require metadata
|
|
|
|
let entry_path = entry.path();
|
2020-04-15 16:17:01 +02:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
let search_str: Cow<OsStr> = if config.search_full_path {
|
|
|
|
let path_abs_buf = filesystem::path_absolute_form(entry_path)
|
|
|
|
.expect("Retrieving absolute path succeeds");
|
|
|
|
Cow::Owned(path_abs_buf.as_os_str().to_os_string())
|
|
|
|
} else {
|
|
|
|
match entry_path.file_name() {
|
|
|
|
Some(filename) => Cow::Borrowed(filename),
|
|
|
|
None => unreachable!(
|
|
|
|
"Encountered file system entry without a file name. This should only \
|
|
|
|
happen for paths like 'foo/bar/..' or '/' which are not supposed to \
|
|
|
|
appear in a file system traversal."
|
|
|
|
),
|
|
|
|
}
|
|
|
|
};
|
2019-04-26 03:17:42 +02:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
if !patterns
|
|
|
|
.iter()
|
|
|
|
.all(|pat| pat.is_match(&filesystem::osstr_to_bytes(search_str.as_ref())))
|
|
|
|
{
|
|
|
|
return WalkState::Continue;
|
2019-09-15 14:42:48 +02:00
|
|
|
}
|
2019-04-26 03:17:42 +02:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
// Filter out unwanted extensions.
|
|
|
|
if let Some(ref exts_regex) = config.extensions {
|
|
|
|
if let Some(path_str) = entry_path.file_name() {
|
|
|
|
if !exts_regex.is_match(&filesystem::osstr_to_bytes(path_str)) {
|
|
|
|
return WalkState::Continue;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return WalkState::Continue;
|
|
|
|
}
|
|
|
|
}
|
2019-04-26 03:17:42 +02:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
// Filter out unwanted file types.
|
|
|
|
if let Some(ref file_types) = config.file_types {
|
|
|
|
if file_types.should_ignore(&entry) {
|
|
|
|
return WalkState::Continue;
|
2019-04-26 03:17:42 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
#[cfg(unix)]
|
|
|
|
{
|
|
|
|
if let Some(ref owner_constraint) = config.owner_constraint {
|
|
|
|
if let Some(metadata) = entry.metadata() {
|
|
|
|
if !owner_constraint.matches(metadata) {
|
|
|
|
return WalkState::Continue;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return WalkState::Continue;
|
|
|
|
}
|
|
|
|
}
|
2018-02-25 11:11:14 +01:00
|
|
|
}
|
2017-10-10 08:01:17 +02:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
// Filter out unwanted sizes if it is a file and we have been given size constraints.
|
|
|
|
if !config.size_constraints.is_empty() {
|
|
|
|
if entry_path.is_file() {
|
|
|
|
if let Some(metadata) = entry.metadata() {
|
|
|
|
let file_size = metadata.len();
|
|
|
|
if config
|
|
|
|
.size_constraints
|
|
|
|
.iter()
|
|
|
|
.any(|sc| !sc.is_within(file_size))
|
|
|
|
{
|
|
|
|
return WalkState::Continue;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return WalkState::Continue;
|
2018-06-30 21:57:20 +02:00
|
|
|
}
|
|
|
|
} else {
|
2023-10-27 22:43:06 +02:00
|
|
|
return WalkState::Continue;
|
2018-06-30 21:57:20 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
// Filter out unwanted modification times
|
|
|
|
if !config.time_constraints.is_empty() {
|
|
|
|
let mut matched = false;
|
2021-10-11 18:21:27 +02:00
|
|
|
if let Some(metadata) = entry.metadata() {
|
2023-10-27 22:43:06 +02:00
|
|
|
if let Ok(modified) = metadata.modified() {
|
|
|
|
matched = config
|
|
|
|
.time_constraints
|
|
|
|
.iter()
|
|
|
|
.all(|tf| tf.applies_to(&modified));
|
2018-04-25 08:25:02 +02:00
|
|
|
}
|
2018-04-23 01:38:10 +02:00
|
|
|
}
|
2023-10-27 22:43:06 +02:00
|
|
|
if !matched {
|
|
|
|
return WalkState::Continue;
|
|
|
|
}
|
2018-04-23 01:38:10 +02:00
|
|
|
}
|
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
if config.is_printing() {
|
|
|
|
if let Some(ls_colors) = &config.ls_colors {
|
|
|
|
// Compute colors in parallel
|
|
|
|
entry.style(ls_colors);
|
2018-10-09 13:47:42 +02:00
|
|
|
}
|
|
|
|
}
|
2023-10-27 22:43:06 +02:00
|
|
|
|
|
|
|
let send_result = tx.send(WorkerResult::Entry(entry));
|
|
|
|
|
|
|
|
if send_result.is_err() {
|
|
|
|
return WalkState::Quit;
|
2018-10-10 12:13:19 +02:00
|
|
|
}
|
2018-10-09 13:47:42 +02:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
// Apply pruning.
|
|
|
|
if config.prune {
|
|
|
|
return WalkState::Skip;
|
2022-10-31 16:52:23 +01:00
|
|
|
}
|
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
WalkState::Continue
|
|
|
|
})
|
|
|
|
});
|
|
|
|
}
|
2019-12-20 19:27:10 +01:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
/// Perform the recursive scan.
|
|
|
|
fn scan(&self, paths: &[PathBuf]) -> Result<ExitCode> {
|
|
|
|
let config = &self.config;
|
|
|
|
let walker = self.build_walker(paths)?;
|
2017-10-10 08:01:17 +02:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
if config.ls_colors.is_some() && config.is_printing() {
|
|
|
|
let quit_flag = Arc::clone(&self.quit_flag);
|
|
|
|
let interrupt_flag = Arc::clone(&self.interrupt_flag);
|
|
|
|
|
|
|
|
ctrlc::set_handler(move || {
|
|
|
|
quit_flag.store(true, Ordering::Relaxed);
|
2020-10-25 08:16:01 +01:00
|
|
|
|
2023-10-27 22:43:06 +02:00
|
|
|
if interrupt_flag.fetch_or(true, Ordering::Relaxed) {
|
|
|
|
// Ctrl-C has been pressed twice, exit NOW
|
|
|
|
ExitCode::KilledBySigint.exit();
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
|
2023-11-08 16:24:00 +01:00
|
|
|
let (tx, rx) = bounded(2 * config.threads);
|
2023-10-27 22:43:06 +02:00
|
|
|
|
|
|
|
let exit_code = thread::scope(|scope| {
|
|
|
|
// Spawn the receiver thread(s)
|
|
|
|
let receiver = scope.spawn(|| self.receive(rx));
|
|
|
|
|
|
|
|
// Spawn the sender threads.
|
|
|
|
self.spawn_senders(walker, tx);
|
|
|
|
|
|
|
|
receiver.join().unwrap()
|
|
|
|
});
|
|
|
|
|
|
|
|
if self.interrupt_flag.load(Ordering::Relaxed) {
|
|
|
|
Ok(ExitCode::KilledBySigint)
|
|
|
|
} else {
|
|
|
|
Ok(exit_code)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Recursively scan the given search path for files / pathnames matching the patterns.
|
|
|
|
///
|
|
|
|
/// If the `--exec` argument was supplied, this will create a thread pool for executing
|
|
|
|
/// jobs in parallel from a given command line and the discovered paths. Otherwise, each
|
|
|
|
/// path will simply be written to standard output.
|
|
|
|
pub fn scan(paths: &[PathBuf], patterns: Vec<Regex>, config: Config) -> Result<ExitCode> {
|
|
|
|
WorkerState::new(patterns, config).scan(paths)
|
2017-10-10 08:01:17 +02:00
|
|
|
}
|