diff --git a/src/main.rs b/src/main.rs index e7320c8..e977415 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,9 +15,11 @@ use std::io::Write; use std::ops::Deref; #[cfg(target_family = "unix")] use std::os::unix::fs::PermissionsExt; -use std::path::{Path, Component}; +use std::path::{Path, PathBuf, Component}; use std::process; use std::sync::Arc; +use std::sync::mpsc::channel; +use std::time; use clap::{App, AppSettings, Arg}; use atty::Stream; @@ -63,6 +65,14 @@ struct FdOptions { /// all files under subdirectories of the current directory, etc. max_depth: Option, + /// The number of threads to use. + threads: Option, + + /// Time to buffer results internally before streaming to the console. This is useful to + /// provide a sorted output, in case the total execution time is shorter than + /// `max_buffer_time`. + max_buffer_time: Option, + /// Display results as relative or absolute path. path_display: PathDisplay, @@ -78,7 +88,7 @@ static ROOT_DIR : &'static str = "/"; static PARENT_DIR : &'static str = ".."; /// Print a search result to the console. -fn print_entry(base: &Path, entry: &Path, config: &FdOptions) { +fn print_entry(base: &Path, entry: &PathBuf, config: &FdOptions) { let path_full = base.join(entry); let path_str = entry.to_string_lossy(); @@ -143,17 +153,22 @@ fn print_entry(base: &Path, entry: &Path, config: &FdOptions) { } }; - write!(handle, "{}", style.paint(comp_str)); + write!(handle, "{}", style.paint(comp_str)).ok(); if component_path.is_dir() && component_path != path_full { let sep = std::path::MAIN_SEPARATOR.to_string(); - write!(handle, "{}", style.paint(sep)); + write!(handle, "{}", style.paint(sep)).ok(); } } - if config.null_separator { - writeln!(handle, "{}", '\0'); + + let r = if config.null_separator { + write!(handle, "{}", '\0') } else { - writeln!(handle); + writeln!(handle, "") + }; + if r.is_err() { + // Probably a broken pipe. Exit gracefully. + process::exit(0); } } else { // Uncolorized output @@ -161,7 +176,7 @@ fn print_entry(base: &Path, entry: &Path, config: &FdOptions) { let prefix = if config.path_display == PathDisplay::Absolute { ROOT_DIR } else { "" }; let separator = if config.null_separator { "\0" } else { "\n" }; - let r = writeln!(&mut std::io::stdout(), "{}{}{}", prefix, path_str, separator); + let r = write!(&mut std::io::stdout(), "{}{}{}", prefix, path_str, separator); if r.is_err() { // Probably a broken pipe. Exit gracefully. @@ -172,6 +187,8 @@ fn print_entry(base: &Path, entry: &Path, config: &FdOptions) { /// Recursively scan the given search path and search for files / pathnames matching the pattern. fn scan(root: &Path, pattern: Arc, base: &Path, config: Arc) { + let (tx, rx) = channel(); + let walker = WalkBuilder::new(root) .hidden(config.ignore_hidden) .ignore(config.read_ignore) @@ -181,12 +198,14 @@ fn scan(root: &Path, pattern: Arc, base: &Path, config: Arc) { .git_exclude(config.read_ignore) .follow_links(config.follow_links) .max_depth(config.max_depth) + .threads(config.threads.unwrap_or(0)) .build_parallel(); walker.run(|| { let base = base.to_owned(); let config = config.clone(); let pattern = pattern.clone(); + let tx_thread = tx.clone(); Box::new(move |entry_o| { let entry = match entry_o { @@ -209,13 +228,56 @@ fn scan(root: &Path, pattern: Arc, base: &Path, config: Arc) { }; if let Some(search_str) = search_str_o { + // TODO: take care of the unwrap call pattern.find(&*search_str) - .map(|_| print_entry(&base, path_rel, &config)); + .map(|_| tx_thread.send(path_rel_buf.to_owned()).unwrap()); } ignore::WalkState::Continue }) }); + + // Drop the initial sender. If we don't do this, the receiver will block even + // if all threads have finished, since there is still one sender around. + drop(tx); + + let start = time::Instant::now(); + + let mut buffer = vec!(); + + // Are we still buffering (or streaming to the console)? + let mut buffering = true; + + // Maximum time to wait before we start streaming to the console. + let max_buffer_time = config.max_buffer_time + .unwrap_or_else(|| time::Duration::from_millis(100)); + + for value in rx { + if buffering { + buffer.push(value); + + // Have we reached the maximum time? + if time::Instant::now() - start > max_buffer_time { + // Flush the buffer + for v in &buffer { + print_entry(base, v, &config); + } + buffer.clear(); + + // Start streaming + buffering = false; + } + } else { + print_entry(base, &value, &config); + } + } + + // If we have finished fast enough (faster than max_buffer_time), we haven't streamed + // anything to the console, yet. In this case, sort the output and print it: + buffer.sort(); + for value in buffer { + print_entry(base, &value, &config); + } } /// Print error message to stderr and exit with status `1`. @@ -269,6 +331,16 @@ fn main() { .short("d") .takes_value(true) .help("Set maximum search depth (default: none)")) + .arg(Arg::with_name("threads") + .long("threads") + .short("j") + .takes_value(true) + .help("the number of threads to use")) + .arg(Arg::with_name("max-buffer-time") + .long("max-buffer-time") + .takes_value(true) + .help("the time to buffer in milliseconds before streaming to the console") + .hidden(true)) .arg(Arg::with_name("pattern") .help("the search pattern, a regular expression (optional)")) .arg(Arg::with_name("path") @@ -334,7 +406,12 @@ fn main() { follow_links: matches.is_present("follow"), null_separator: matches.is_present("null_separator"), max_depth: matches.value_of("depth") - .and_then(|ds| usize::from_str_radix(ds, 10).ok()), + .and_then(|n| usize::from_str_radix(n, 10).ok()), + threads: matches.value_of("threads") + .and_then(|n| usize::from_str_radix(n, 10).ok()), + max_buffer_time: matches.value_of("max-buffer-time") + .and_then(|n| u64::from_str_radix(n, 10).ok()) + .map(time::Duration::from_millis), path_display: if matches.is_present("absolute-path") || root_dir_is_absolute { PathDisplay::Absolute } else {