From 977e0ac694d9f228ddabcee6fc22313b4b3225d3 Mon Sep 17 00:00:00 2001 From: sharkdp Date: Sat, 9 Sep 2017 10:04:13 +0200 Subject: [PATCH] Spawn a separate thread for the receiver --- src/main.rs | 99 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 62 insertions(+), 37 deletions(-) diff --git a/src/main.rs b/src/main.rs index e977415..bc7d33b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,7 @@ use std::path::{Path, PathBuf, Component}; use std::process; use std::sync::Arc; use std::sync::mpsc::channel; +use std::thread; use std::time; use clap::{App, AppSettings, Arg}; @@ -81,6 +82,16 @@ struct FdOptions { ls_colors: Option } +/// The receiver thread can either be buffering results or directly streaming to the console. +enum ReceiverMode { + /// Receiver is still buffering in order to sort the results, if the search finishes fast + /// enough. + Buffering, + + /// Receiver is directly printing results to the output. + Streaming +} + /// Root directory static ROOT_DIR : &'static str = "/"; @@ -201,6 +212,55 @@ fn scan(root: &Path, pattern: Arc, base: &Path, config: Arc) { .threads(config.threads.unwrap_or(0)) .build_parallel(); + // Spawn the thread that receives all results through the channel. + let rx_config = Arc::clone(&config); + let rx_base = base.to_owned(); + let receiver_thread = thread::spawn(move || { + let start = time::Instant::now(); + + let mut buffer = vec!(); + + // Start in buffering mode + let mut mode = ReceiverMode::Buffering; + + // Maximum time to wait before we start streaming to the console. + let max_buffer_time = rx_config.max_buffer_time + .unwrap_or_else(|| time::Duration::from_millis(100)); + + for value in rx { + match mode { + ReceiverMode::Buffering => { + buffer.push(value); + + // Have we reached the maximum time? + if time::Instant::now() - start > max_buffer_time { + // Flush the buffer + for v in &buffer { + print_entry(&rx_base, v, &rx_config); + } + buffer.clear(); + + // Start streaming + mode = ReceiverMode::Streaming; + } + }, + ReceiverMode::Streaming => { + print_entry(&rx_base, &value, &rx_config); + } + } + } + + // If we have finished fast enough (faster than max_buffer_time), we haven't streamed + // anything to the console, yet. In this case, sort the results and print them: + if !buffer.is_empty() { + buffer.sort(); + for value in buffer { + print_entry(&rx_base, &value, &rx_config); + } + } + }); + + // Spawn the sender threads. walker.run(|| { let base = base.to_owned(); let config = config.clone(); @@ -241,43 +301,8 @@ fn scan(root: &Path, pattern: Arc, base: &Path, config: Arc) { // if all threads have finished, since there is still one sender around. drop(tx); - let start = time::Instant::now(); - - let mut buffer = vec!(); - - // Are we still buffering (or streaming to the console)? - let mut buffering = true; - - // Maximum time to wait before we start streaming to the console. - let max_buffer_time = config.max_buffer_time - .unwrap_or_else(|| time::Duration::from_millis(100)); - - for value in rx { - if buffering { - buffer.push(value); - - // Have we reached the maximum time? - if time::Instant::now() - start > max_buffer_time { - // Flush the buffer - for v in &buffer { - print_entry(base, v, &config); - } - buffer.clear(); - - // Start streaming - buffering = false; - } - } else { - print_entry(base, &value, &config); - } - } - - // If we have finished fast enough (faster than max_buffer_time), we haven't streamed - // anything to the console, yet. In this case, sort the output and print it: - buffer.sort(); - for value in buffer { - print_entry(base, &value, &config); - } + // Wait for the receiver thread to print out all results. + receiver_thread.join().unwrap(); } /// Print error message to stderr and exit with status `1`.