mirror of
https://github.com/sharkdp/fd.git
synced 2024-11-16 00:48:28 +01:00
Merge pull request #1422 from tavianator/batch
walk: Send WorkerResults in batches
This commit is contained in:
commit
84f032eba8
3 changed files with 147 additions and 52 deletions
|
@ -8,11 +8,13 @@ use lscolors::{Colorable, LsColors, Style};
|
|||
use crate::config::Config;
|
||||
use crate::filesystem::strip_current_dir;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum DirEntryInner {
|
||||
Normal(ignore::DirEntry),
|
||||
BrokenSymlink(PathBuf),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DirEntry {
|
||||
inner: DirEntryInner,
|
||||
metadata: OnceCell<Option<Metadata>>,
|
||||
|
|
|
@ -1,9 +1,6 @@
|
|||
use std::sync::Mutex;
|
||||
|
||||
use crossbeam_channel::Receiver;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::dir_entry::DirEntry;
|
||||
use crate::error::print_error;
|
||||
use crate::exit_codes::{merge_exitcodes, ExitCode};
|
||||
use crate::walk::WorkerResult;
|
||||
|
@ -14,7 +11,7 @@ use super::CommandSet;
|
|||
/// generate a command with the supplied command template. The generated command will then
|
||||
/// be executed, and this process will continue until the receiver's sender has closed.
|
||||
pub fn job(
|
||||
rx: Receiver<WorkerResult>,
|
||||
results: impl IntoIterator<Item = WorkerResult>,
|
||||
cmd: &CommandSet,
|
||||
out_perm: &Mutex<()>,
|
||||
config: &Config,
|
||||
|
@ -22,35 +19,39 @@ pub fn job(
|
|||
// Output should be buffered when only running a single thread
|
||||
let buffer_output: bool = config.threads > 1;
|
||||
|
||||
let mut results: Vec<ExitCode> = Vec::new();
|
||||
loop {
|
||||
let mut ret = ExitCode::Success;
|
||||
for result in results {
|
||||
// Obtain the next result from the receiver, else if the channel
|
||||
// has closed, exit from the loop
|
||||
let dir_entry: DirEntry = match rx.recv() {
|
||||
Ok(WorkerResult::Entry(dir_entry)) => dir_entry,
|
||||
Ok(WorkerResult::Error(err)) => {
|
||||
let dir_entry = match result {
|
||||
WorkerResult::Entry(dir_entry) => dir_entry,
|
||||
WorkerResult::Error(err) => {
|
||||
if config.show_filesystem_errors {
|
||||
print_error(err.to_string());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Err(_) => break,
|
||||
};
|
||||
|
||||
// Generate a command, execute it and store its exit code.
|
||||
results.push(cmd.execute(
|
||||
let code = cmd.execute(
|
||||
dir_entry.stripped_path(config),
|
||||
config.path_separator.as_deref(),
|
||||
out_perm,
|
||||
buffer_output,
|
||||
))
|
||||
);
|
||||
ret = merge_exitcodes([ret, code]);
|
||||
}
|
||||
// Returns error in case of any error.
|
||||
merge_exitcodes(results)
|
||||
ret
|
||||
}
|
||||
|
||||
pub fn batch(rx: Receiver<WorkerResult>, cmd: &CommandSet, config: &Config) -> ExitCode {
|
||||
let paths = rx
|
||||
pub fn batch(
|
||||
results: impl IntoIterator<Item = WorkerResult>,
|
||||
cmd: &CommandSet,
|
||||
config: &Config,
|
||||
) -> ExitCode {
|
||||
let paths = results
|
||||
.into_iter()
|
||||
.filter_map(|worker_result| match worker_result {
|
||||
WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)),
|
||||
|
|
166
src/walk.rs
166
src/walk.rs
|
@ -4,12 +4,12 @@ use std::io::{self, Write};
|
|||
use std::mem;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
|
||||
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, SendError, Sender};
|
||||
use etcetera::BaseStrategy;
|
||||
use ignore::overrides::{Override, OverrideBuilder};
|
||||
use ignore::{self, WalkBuilder, WalkParallel, WalkState};
|
||||
|
@ -36,6 +36,7 @@ enum ReceiverMode {
|
|||
|
||||
/// The Worker threads can result in a valid entry having PathBuf or an error.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Debug)]
|
||||
pub enum WorkerResult {
|
||||
// Errors should be rare, so it's probably better to allow large_enum_variant than
|
||||
// to box the Entry variant
|
||||
|
@ -43,6 +44,83 @@ pub enum WorkerResult {
|
|||
Error(ignore::Error),
|
||||
}
|
||||
|
||||
/// A batch of WorkerResults to send over a channel.
|
||||
#[derive(Clone)]
|
||||
struct Batch {
|
||||
items: Arc<Mutex<Option<Vec<WorkerResult>>>>,
|
||||
}
|
||||
|
||||
impl Batch {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
items: Arc::new(Mutex::new(Some(vec![]))),
|
||||
}
|
||||
}
|
||||
|
||||
fn lock(&self) -> MutexGuard<'_, Option<Vec<WorkerResult>>> {
|
||||
self.items.lock().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoIterator for Batch {
|
||||
type Item = WorkerResult;
|
||||
type IntoIter = std::vec::IntoIter<WorkerResult>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.lock().take().unwrap().into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper that sends batches of items at once over a channel.
|
||||
struct BatchSender {
|
||||
batch: Batch,
|
||||
tx: Sender<Batch>,
|
||||
limit: usize,
|
||||
}
|
||||
|
||||
impl BatchSender {
|
||||
fn new(tx: Sender<Batch>, limit: usize) -> Self {
|
||||
Self {
|
||||
batch: Batch::new(),
|
||||
tx,
|
||||
limit,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if we need to flush a batch.
|
||||
fn needs_flush(&self, batch: Option<&Vec<WorkerResult>>) -> bool {
|
||||
match batch {
|
||||
// Limit the batch size to provide some backpressure
|
||||
Some(vec) => vec.len() >= self.limit,
|
||||
// Batch was already taken by the receiver, so make a new one
|
||||
None => true,
|
||||
}
|
||||
}
|
||||
|
||||
/// Add an item to a batch.
|
||||
fn send(&mut self, item: WorkerResult) -> Result<(), SendError<()>> {
|
||||
let mut batch = self.batch.lock();
|
||||
|
||||
if self.needs_flush(batch.as_ref()) {
|
||||
drop(batch);
|
||||
self.batch = Batch::new();
|
||||
batch = self.batch.lock();
|
||||
}
|
||||
|
||||
let items = batch.as_mut().unwrap();
|
||||
items.push(item);
|
||||
|
||||
if items.len() == 1 {
|
||||
// New batch, send it over the channel
|
||||
self.tx
|
||||
.send(self.batch.clone())
|
||||
.map_err(|_| SendError(()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Maximum size of the output buffer before flushing results to the console
|
||||
const MAX_BUFFER_LENGTH: usize = 1000;
|
||||
/// Default duration until output buffering switches to streaming.
|
||||
|
@ -57,7 +135,7 @@ struct ReceiverBuffer<'a, W> {
|
|||
/// The ^C notifier.
|
||||
interrupt_flag: &'a AtomicBool,
|
||||
/// Receiver for worker results.
|
||||
rx: Receiver<WorkerResult>,
|
||||
rx: Receiver<Batch>,
|
||||
/// Standard output.
|
||||
stdout: W,
|
||||
/// The current buffer mode.
|
||||
|
@ -72,7 +150,7 @@ struct ReceiverBuffer<'a, W> {
|
|||
|
||||
impl<'a, W: Write> ReceiverBuffer<'a, W> {
|
||||
/// Create a new receiver buffer.
|
||||
fn new(state: &'a WorkerState, rx: Receiver<WorkerResult>, stdout: W) -> Self {
|
||||
fn new(state: &'a WorkerState, rx: Receiver<Batch>, stdout: W) -> Self {
|
||||
let config = &state.config;
|
||||
let quit_flag = state.quit_flag.as_ref();
|
||||
let interrupt_flag = state.interrupt_flag.as_ref();
|
||||
|
@ -103,7 +181,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
|
|||
}
|
||||
|
||||
/// Receive the next worker result.
|
||||
fn recv(&self) -> Result<WorkerResult, RecvTimeoutError> {
|
||||
fn recv(&self) -> Result<Batch, RecvTimeoutError> {
|
||||
match self.mode {
|
||||
ReceiverMode::Buffering => {
|
||||
// Wait at most until we should switch to streaming
|
||||
|
@ -119,34 +197,40 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
|
|||
/// Wait for a result or state change.
|
||||
fn poll(&mut self) -> Result<(), ExitCode> {
|
||||
match self.recv() {
|
||||
Ok(WorkerResult::Entry(dir_entry)) => {
|
||||
if self.config.quiet {
|
||||
return Err(ExitCode::HasResults(true));
|
||||
}
|
||||
Ok(batch) => {
|
||||
for result in batch {
|
||||
match result {
|
||||
WorkerResult::Entry(dir_entry) => {
|
||||
if self.config.quiet {
|
||||
return Err(ExitCode::HasResults(true));
|
||||
}
|
||||
|
||||
match self.mode {
|
||||
ReceiverMode::Buffering => {
|
||||
self.buffer.push(dir_entry);
|
||||
if self.buffer.len() > MAX_BUFFER_LENGTH {
|
||||
self.stream()?;
|
||||
match self.mode {
|
||||
ReceiverMode::Buffering => {
|
||||
self.buffer.push(dir_entry);
|
||||
if self.buffer.len() > MAX_BUFFER_LENGTH {
|
||||
self.stream()?;
|
||||
}
|
||||
}
|
||||
ReceiverMode::Streaming => {
|
||||
self.print(&dir_entry)?;
|
||||
self.flush()?;
|
||||
}
|
||||
}
|
||||
|
||||
self.num_results += 1;
|
||||
if let Some(max_results) = self.config.max_results {
|
||||
if self.num_results >= max_results {
|
||||
return self.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
WorkerResult::Error(err) => {
|
||||
if self.config.show_filesystem_errors {
|
||||
print_error(err.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
ReceiverMode::Streaming => {
|
||||
self.print(&dir_entry)?;
|
||||
self.flush()?;
|
||||
}
|
||||
}
|
||||
|
||||
self.num_results += 1;
|
||||
if let Some(max_results) = self.config.max_results {
|
||||
if self.num_results >= max_results {
|
||||
return self.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(WorkerResult::Error(err)) => {
|
||||
if self.config.show_filesystem_errors {
|
||||
print_error(err.to_string());
|
||||
}
|
||||
}
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
|
@ -319,13 +403,13 @@ impl WorkerState {
|
|||
|
||||
/// Run the receiver work, either on this thread or a pool of background
|
||||
/// threads (for --exec).
|
||||
fn receive(&self, rx: Receiver<WorkerResult>) -> ExitCode {
|
||||
fn receive(&self, rx: Receiver<Batch>) -> ExitCode {
|
||||
let config = &self.config;
|
||||
|
||||
// This will be set to `Some` if the `--exec` argument was supplied.
|
||||
if let Some(ref cmd) = config.command {
|
||||
if cmd.in_batch_mode() {
|
||||
exec::batch(rx, cmd, &config)
|
||||
exec::batch(rx.into_iter().flatten(), cmd, &config)
|
||||
} else {
|
||||
let out_perm = Mutex::new(());
|
||||
|
||||
|
@ -337,7 +421,8 @@ impl WorkerState {
|
|||
let rx = rx.clone();
|
||||
|
||||
// Spawn a job thread that will listen for and execute inputs.
|
||||
let handle = scope.spawn(|| exec::job(rx, cmd, &out_perm, &config));
|
||||
let handle = scope
|
||||
.spawn(|| exec::job(rx.into_iter().flatten(), cmd, &out_perm, &config));
|
||||
|
||||
// Push the handle of the spawned thread into the vector for later joining.
|
||||
handles.push(handle);
|
||||
|
@ -355,12 +440,20 @@ impl WorkerState {
|
|||
}
|
||||
|
||||
/// Spawn the sender threads.
|
||||
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerResult>) {
|
||||
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<Batch>) {
|
||||
walker.run(|| {
|
||||
let patterns = &self.patterns;
|
||||
let config = &self.config;
|
||||
let quit_flag = self.quit_flag.as_ref();
|
||||
let tx = tx.clone();
|
||||
|
||||
let mut limit = 0x100;
|
||||
if let Some(cmd) = &config.command {
|
||||
if !cmd.in_batch_mode() && config.threads > 1 {
|
||||
// Evenly distribute work between multiple receivers
|
||||
limit = 1;
|
||||
}
|
||||
}
|
||||
let mut tx = BatchSender::new(tx.clone(), limit);
|
||||
|
||||
Box::new(move |entry| {
|
||||
if quit_flag.load(Ordering::Relaxed) {
|
||||
|
@ -545,8 +638,7 @@ impl WorkerState {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
// Channel capacity was chosen empircally to perform similarly to an unbounded channel
|
||||
let (tx, rx) = bounded(0x4000 * config.threads);
|
||||
let (tx, rx) = bounded(2 * config.threads);
|
||||
|
||||
let exit_code = thread::scope(|scope| {
|
||||
// Spawn the receiver thread(s)
|
||||
|
|
Loading…
Reference in a new issue