mirror of https://github.com/sharkdp/fd.git
Merge pull request #895 from tavianator/receiver-buffer
walk: Encapsulate the buffering behavior in a struct
This commit is contained in:
commit
7fe4bfaacb
|
@ -99,6 +99,16 @@ dependencies = [
|
||||||
"vec_map",
|
"vec_map",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crossbeam-channel"
|
||||||
|
version = "0.5.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"crossbeam-utils",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crossbeam-utils"
|
name = "crossbeam-utils"
|
||||||
version = "0.8.5"
|
version = "0.8.5"
|
||||||
|
@ -155,6 +165,7 @@ dependencies = [
|
||||||
"atty",
|
"atty",
|
||||||
"chrono",
|
"chrono",
|
||||||
"clap",
|
"clap",
|
||||||
|
"crossbeam-channel",
|
||||||
"ctrlc",
|
"ctrlc",
|
||||||
"diff",
|
"diff",
|
||||||
"dirs-next",
|
"dirs-next",
|
||||||
|
|
|
@ -49,6 +49,7 @@ dirs-next = "2.0"
|
||||||
normpath = "0.3"
|
normpath = "0.3"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
once_cell = "1.8.0"
|
once_cell = "1.8.0"
|
||||||
|
crossbeam-channel = "0.5.1"
|
||||||
|
|
||||||
[dependencies.clap]
|
[dependencies.clap]
|
||||||
version = "2.34.0"
|
version = "2.34.0"
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::mpsc::Receiver;
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use crossbeam_channel::Receiver;
|
||||||
|
|
||||||
use crate::error::print_error;
|
use crate::error::print_error;
|
||||||
use crate::exit_codes::{merge_exitcodes, ExitCode};
|
use crate::exit_codes::{merge_exitcodes, ExitCode};
|
||||||
use crate::walk::WorkerResult;
|
use crate::walk::WorkerResult;
|
||||||
|
|
245
src/walk.rs
245
src/walk.rs
|
@ -1,15 +1,16 @@
|
||||||
use std::ffi::OsStr;
|
use std::ffi::OsStr;
|
||||||
use std::fs::{FileType, Metadata};
|
use std::fs::{FileType, Metadata};
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use std::mem;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time;
|
use std::time::{Duration, Instant};
|
||||||
use std::{borrow::Cow, io::Write};
|
use std::{borrow::Cow, io::Write};
|
||||||
|
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
|
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
|
||||||
use ignore::overrides::OverrideBuilder;
|
use ignore::overrides::OverrideBuilder;
|
||||||
use ignore::{self, WalkBuilder};
|
use ignore::{self, WalkBuilder};
|
||||||
use once_cell::unsync::OnceCell;
|
use once_cell::unsync::OnceCell;
|
||||||
|
@ -23,6 +24,7 @@ use crate::filesystem;
|
||||||
use crate::output;
|
use crate::output;
|
||||||
|
|
||||||
/// The receiver thread can either be buffering results or directly streaming to the console.
|
/// The receiver thread can either be buffering results or directly streaming to the console.
|
||||||
|
#[derive(PartialEq)]
|
||||||
enum ReceiverMode {
|
enum ReceiverMode {
|
||||||
/// Receiver is still buffering in order to sort the results, if the search finishes fast
|
/// Receiver is still buffering in order to sort the results, if the search finishes fast
|
||||||
/// enough.
|
/// enough.
|
||||||
|
@ -41,7 +43,7 @@ pub enum WorkerResult {
|
||||||
/// Maximum size of the output buffer before flushing results to the console
|
/// Maximum size of the output buffer before flushing results to the console
|
||||||
pub const MAX_BUFFER_LENGTH: usize = 1000;
|
pub const MAX_BUFFER_LENGTH: usize = 1000;
|
||||||
/// Default duration until output buffering switches to streaming.
|
/// Default duration until output buffering switches to streaming.
|
||||||
pub const DEFAULT_MAX_BUFFER_TIME: time::Duration = time::Duration::from_millis(100);
|
pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100);
|
||||||
|
|
||||||
/// Recursively scan the given search path for files / pathnames matching the pattern.
|
/// Recursively scan the given search path for files / pathnames matching the pattern.
|
||||||
///
|
///
|
||||||
|
@ -53,7 +55,7 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
|
||||||
let first_path_buf = path_iter
|
let first_path_buf = path_iter
|
||||||
.next()
|
.next()
|
||||||
.expect("Error: Path vector can not be empty");
|
.expect("Error: Path vector can not be empty");
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = unbounded();
|
||||||
|
|
||||||
let mut override_builder = OverrideBuilder::new(first_path_buf.as_path());
|
let mut override_builder = OverrideBuilder::new(first_path_buf.as_path());
|
||||||
|
|
||||||
|
@ -160,6 +162,157 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wrapper for the receiver thread's buffering behavior.
|
||||||
|
struct ReceiverBuffer<W> {
|
||||||
|
/// The configuration.
|
||||||
|
config: Arc<Config>,
|
||||||
|
/// The ^C notifier.
|
||||||
|
wants_to_quit: Arc<AtomicBool>,
|
||||||
|
/// Receiver for worker results.
|
||||||
|
rx: Receiver<WorkerResult>,
|
||||||
|
/// Standard output.
|
||||||
|
stdout: W,
|
||||||
|
/// The current buffer mode.
|
||||||
|
mode: ReceiverMode,
|
||||||
|
/// The deadline to switch to streaming mode.
|
||||||
|
deadline: Instant,
|
||||||
|
/// The buffer of quickly received paths.
|
||||||
|
buffer: Vec<PathBuf>,
|
||||||
|
/// Result count.
|
||||||
|
num_results: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<W: Write> ReceiverBuffer<W> {
|
||||||
|
/// Create a new receiver buffer.
|
||||||
|
fn new(
|
||||||
|
config: Arc<Config>,
|
||||||
|
wants_to_quit: Arc<AtomicBool>,
|
||||||
|
rx: Receiver<WorkerResult>,
|
||||||
|
stdout: W,
|
||||||
|
) -> Self {
|
||||||
|
let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME);
|
||||||
|
let deadline = Instant::now() + max_buffer_time;
|
||||||
|
|
||||||
|
Self {
|
||||||
|
config,
|
||||||
|
wants_to_quit,
|
||||||
|
rx,
|
||||||
|
stdout,
|
||||||
|
mode: ReceiverMode::Buffering,
|
||||||
|
deadline,
|
||||||
|
buffer: Vec::with_capacity(MAX_BUFFER_LENGTH),
|
||||||
|
num_results: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process results until finished.
|
||||||
|
fn process(&mut self) -> ExitCode {
|
||||||
|
loop {
|
||||||
|
if let Err(ec) = self.poll() {
|
||||||
|
return ec;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receive the next worker result.
|
||||||
|
fn recv(&self) -> Result<WorkerResult, RecvTimeoutError> {
|
||||||
|
match self.mode {
|
||||||
|
ReceiverMode::Buffering => {
|
||||||
|
// Wait at most until we should switch to streaming
|
||||||
|
self.rx.recv_deadline(self.deadline)
|
||||||
|
}
|
||||||
|
ReceiverMode::Streaming => {
|
||||||
|
// Wait however long it takes for a result
|
||||||
|
Ok(self.rx.recv()?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for a result or state change.
|
||||||
|
fn poll(&mut self) -> Result<(), ExitCode> {
|
||||||
|
match self.recv() {
|
||||||
|
Ok(WorkerResult::Entry(path)) => {
|
||||||
|
if self.config.quiet {
|
||||||
|
return Err(ExitCode::HasResults(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.mode {
|
||||||
|
ReceiverMode::Buffering => {
|
||||||
|
self.buffer.push(path);
|
||||||
|
if self.buffer.len() > MAX_BUFFER_LENGTH {
|
||||||
|
self.stream()?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReceiverMode::Streaming => {
|
||||||
|
self.print(&path);
|
||||||
|
self.flush()?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.num_results += 1;
|
||||||
|
if let Some(max_results) = self.config.max_results {
|
||||||
|
if self.num_results >= max_results {
|
||||||
|
return self.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(WorkerResult::Error(err)) => {
|
||||||
|
if self.config.show_filesystem_errors {
|
||||||
|
print_error(err.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(RecvTimeoutError::Timeout) => {
|
||||||
|
self.stream()?;
|
||||||
|
}
|
||||||
|
Err(RecvTimeoutError::Disconnected) => {
|
||||||
|
return self.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Output a path.
|
||||||
|
fn print(&mut self, path: &Path) {
|
||||||
|
output::print_entry(&mut self.stdout, path, &self.config, &self.wants_to_quit)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Switch ourselves into streaming mode.
|
||||||
|
fn stream(&mut self) -> Result<(), ExitCode> {
|
||||||
|
self.mode = ReceiverMode::Streaming;
|
||||||
|
|
||||||
|
let buffer = mem::take(&mut self.buffer);
|
||||||
|
for path in buffer {
|
||||||
|
self.print(&path);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stop looping.
|
||||||
|
fn stop(&mut self) -> Result<(), ExitCode> {
|
||||||
|
if self.mode == ReceiverMode::Buffering {
|
||||||
|
self.buffer.sort();
|
||||||
|
self.stream()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.config.quiet {
|
||||||
|
Err(ExitCode::HasResults(self.num_results > 0))
|
||||||
|
} else {
|
||||||
|
Err(ExitCode::Success)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Flush stdout if necessary.
|
||||||
|
fn flush(&mut self) -> Result<(), ExitCode> {
|
||||||
|
if self.config.interactive_terminal && self.stdout.flush().is_err() {
|
||||||
|
// Probably a broken pipe. Exit gracefully.
|
||||||
|
return Err(ExitCode::GeneralError);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn spawn_receiver(
|
fn spawn_receiver(
|
||||||
config: &Arc<Config>,
|
config: &Arc<Config>,
|
||||||
wants_to_quit: &Arc<AtomicBool>,
|
wants_to_quit: &Arc<AtomicBool>,
|
||||||
|
@ -218,90 +371,12 @@ fn spawn_receiver(
|
||||||
merge_exitcodes(exit_codes)
|
merge_exitcodes(exit_codes)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let start = time::Instant::now();
|
|
||||||
|
|
||||||
// Start in buffering mode
|
|
||||||
let mut mode = ReceiverMode::Buffering;
|
|
||||||
|
|
||||||
// Maximum time to wait before we start streaming to the console.
|
|
||||||
let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME);
|
|
||||||
|
|
||||||
let stdout = io::stdout();
|
let stdout = io::stdout();
|
||||||
let stdout = stdout.lock();
|
let stdout = stdout.lock();
|
||||||
let mut stdout = io::BufWriter::new(stdout);
|
let stdout = io::BufWriter::new(stdout);
|
||||||
|
|
||||||
let mut num_results = 0;
|
let mut rxbuffer = ReceiverBuffer::new(config, wants_to_quit, rx, stdout);
|
||||||
let is_interactive = config.interactive_terminal;
|
rxbuffer.process()
|
||||||
let mut buffer = Vec::with_capacity(MAX_BUFFER_LENGTH);
|
|
||||||
for worker_result in rx {
|
|
||||||
match worker_result {
|
|
||||||
WorkerResult::Entry(path) => {
|
|
||||||
if config.quiet {
|
|
||||||
return ExitCode::HasResults(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
match mode {
|
|
||||||
ReceiverMode::Buffering => {
|
|
||||||
buffer.push(path);
|
|
||||||
|
|
||||||
// Have we reached the maximum buffer size or maximum buffering time?
|
|
||||||
if buffer.len() > MAX_BUFFER_LENGTH
|
|
||||||
|| start.elapsed() > max_buffer_time
|
|
||||||
{
|
|
||||||
// Flush the buffer
|
|
||||||
for path in &buffer {
|
|
||||||
output::print_entry(
|
|
||||||
&mut stdout,
|
|
||||||
path,
|
|
||||||
&config,
|
|
||||||
&wants_to_quit,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
buffer.clear();
|
|
||||||
if is_interactive && stdout.flush().is_err() {
|
|
||||||
// Probably a broken pipe. Exit gracefully.
|
|
||||||
return ExitCode::GeneralError;
|
|
||||||
}
|
|
||||||
// Start streaming
|
|
||||||
mode = ReceiverMode::Streaming;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ReceiverMode::Streaming => {
|
|
||||||
output::print_entry(&mut stdout, &path, &config, &wants_to_quit);
|
|
||||||
if is_interactive && stdout.flush().is_err() {
|
|
||||||
// Probably a broken pipe. Exit gracefully.
|
|
||||||
return ExitCode::GeneralError;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
num_results += 1;
|
|
||||||
if let Some(max_results) = config.max_results {
|
|
||||||
if num_results >= max_results {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
WorkerResult::Error(err) => {
|
|
||||||
if show_filesystem_errors {
|
|
||||||
print_error(err.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have finished fast enough (faster than max_buffer_time), we haven't streamed
|
|
||||||
// anything to the console, yet. In this case, sort the results and print them:
|
|
||||||
buffer.sort();
|
|
||||||
for value in buffer {
|
|
||||||
output::print_entry(&mut stdout, &value, &config, &wants_to_quit);
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.quiet {
|
|
||||||
ExitCode::HasResults(false)
|
|
||||||
} else {
|
|
||||||
ExitCode::Success
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue