Implement short-time buffering for sorted output

This commit is contained in:
sharkdp 2017-06-16 10:37:40 +02:00 committed by David Peter
parent e06189e654
commit 550e3b3572
1 changed files with 87 additions and 10 deletions

View File

@ -15,9 +15,11 @@ use std::io::Write;
use std::ops::Deref;
#[cfg(target_family = "unix")]
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, Component};
use std::path::{Path, PathBuf, Component};
use std::process;
use std::sync::Arc;
use std::sync::mpsc::channel;
use std::time;
use clap::{App, AppSettings, Arg};
use atty::Stream;
@ -63,6 +65,14 @@ struct FdOptions {
/// all files under subdirectories of the current directory, etc.
max_depth: Option<usize>,
/// The number of threads to use.
threads: Option<usize>,
/// Time to buffer results internally before streaming to the console. This is useful to
/// provide a sorted output, in case the total execution time is shorter than
/// `max_buffer_time`.
max_buffer_time: Option<time::Duration>,
/// Display results as relative or absolute path.
path_display: PathDisplay,
@ -78,7 +88,7 @@ static ROOT_DIR : &'static str = "/";
static PARENT_DIR : &'static str = "..";
/// Print a search result to the console.
fn print_entry(base: &Path, entry: &Path, config: &FdOptions) {
fn print_entry(base: &Path, entry: &PathBuf, config: &FdOptions) {
let path_full = base.join(entry);
let path_str = entry.to_string_lossy();
@ -143,17 +153,22 @@ fn print_entry(base: &Path, entry: &Path, config: &FdOptions) {
}
};
write!(handle, "{}", style.paint(comp_str));
write!(handle, "{}", style.paint(comp_str)).ok();
if component_path.is_dir() && component_path != path_full {
let sep = std::path::MAIN_SEPARATOR.to_string();
write!(handle, "{}", style.paint(sep));
write!(handle, "{}", style.paint(sep)).ok();
}
}
if config.null_separator {
writeln!(handle, "{}", '\0');
let r = if config.null_separator {
write!(handle, "{}", '\0')
} else {
writeln!(handle);
writeln!(handle, "")
};
if r.is_err() {
// Probably a broken pipe. Exit gracefully.
process::exit(0);
}
} else {
// Uncolorized output
@ -161,7 +176,7 @@ fn print_entry(base: &Path, entry: &Path, config: &FdOptions) {
let prefix = if config.path_display == PathDisplay::Absolute { ROOT_DIR } else { "" };
let separator = if config.null_separator { "\0" } else { "\n" };
let r = writeln!(&mut std::io::stdout(), "{}{}{}", prefix, path_str, separator);
let r = write!(&mut std::io::stdout(), "{}{}{}", prefix, path_str, separator);
if r.is_err() {
// Probably a broken pipe. Exit gracefully.
@ -172,6 +187,8 @@ fn print_entry(base: &Path, entry: &Path, config: &FdOptions) {
/// Recursively scan the given search path and search for files / pathnames matching the pattern.
fn scan(root: &Path, pattern: Arc<Regex>, base: &Path, config: Arc<FdOptions>) {
let (tx, rx) = channel();
let walker = WalkBuilder::new(root)
.hidden(config.ignore_hidden)
.ignore(config.read_ignore)
@ -181,12 +198,14 @@ fn scan(root: &Path, pattern: Arc<Regex>, base: &Path, config: Arc<FdOptions>) {
.git_exclude(config.read_ignore)
.follow_links(config.follow_links)
.max_depth(config.max_depth)
.threads(config.threads.unwrap_or(0))
.build_parallel();
walker.run(|| {
let base = base.to_owned();
let config = config.clone();
let pattern = pattern.clone();
let tx_thread = tx.clone();
Box::new(move |entry_o| {
let entry = match entry_o {
@ -209,13 +228,56 @@ fn scan(root: &Path, pattern: Arc<Regex>, base: &Path, config: Arc<FdOptions>) {
};
if let Some(search_str) = search_str_o {
// TODO: take care of the unwrap call
pattern.find(&*search_str)
.map(|_| print_entry(&base, path_rel, &config));
.map(|_| tx_thread.send(path_rel_buf.to_owned()).unwrap());
}
ignore::WalkState::Continue
})
});
// Drop the initial sender. If we don't do this, the receiver will block even
// if all threads have finished, since there is still one sender around.
drop(tx);
let start = time::Instant::now();
let mut buffer = vec!();
// Are we still buffering (or streaming to the console)?
let mut buffering = true;
// Maximum time to wait before we start streaming to the console.
let max_buffer_time = config.max_buffer_time
.unwrap_or_else(|| time::Duration::from_millis(100));
for value in rx {
if buffering {
buffer.push(value);
// Have we reached the maximum time?
if time::Instant::now() - start > max_buffer_time {
// Flush the buffer
for v in &buffer {
print_entry(base, v, &config);
}
buffer.clear();
// Start streaming
buffering = false;
}
} else {
print_entry(base, &value, &config);
}
}
// If we have finished fast enough (faster than max_buffer_time), we haven't streamed
// anything to the console, yet. In this case, sort the output and print it:
buffer.sort();
for value in buffer {
print_entry(base, &value, &config);
}
}
/// Print error message to stderr and exit with status `1`.
@ -269,6 +331,16 @@ fn main() {
.short("d")
.takes_value(true)
.help("Set maximum search depth (default: none)"))
.arg(Arg::with_name("threads")
.long("threads")
.short("j")
.takes_value(true)
.help("the number of threads to use"))
.arg(Arg::with_name("max-buffer-time")
.long("max-buffer-time")
.takes_value(true)
.help("the time to buffer in milliseconds before streaming to the console")
.hidden(true))
.arg(Arg::with_name("pattern")
.help("the search pattern, a regular expression (optional)"))
.arg(Arg::with_name("path")
@ -334,7 +406,12 @@ fn main() {
follow_links: matches.is_present("follow"),
null_separator: matches.is_present("null_separator"),
max_depth: matches.value_of("depth")
.and_then(|ds| usize::from_str_radix(ds, 10).ok()),
.and_then(|n| usize::from_str_radix(n, 10).ok()),
threads: matches.value_of("threads")
.and_then(|n| usize::from_str_radix(n, 10).ok()),
max_buffer_time: matches.value_of("max-buffer-time")
.and_then(|n| u64::from_str_radix(n, 10).ok())
.map(time::Duration::from_millis),
path_display: if matches.is_present("absolute-path") || root_dir_is_absolute {
PathDisplay::Absolute
} else {