Spawn a separate thread for the receiver

This commit is contained in:
sharkdp 2017-09-09 10:04:13 +02:00 committed by David Peter
parent 550e3b3572
commit 977e0ac694
1 changed files with 62 additions and 37 deletions

View File

@ -19,6 +19,7 @@ use std::path::{Path, PathBuf, Component};
use std::process;
use std::sync::Arc;
use std::sync::mpsc::channel;
use std::thread;
use std::time;
use clap::{App, AppSettings, Arg};
@ -81,6 +82,16 @@ struct FdOptions {
ls_colors: Option<LsColors>
}
/// The receiver thread can either be buffering results or directly streaming to the console.
enum ReceiverMode {
/// Receiver is still buffering in order to sort the results, if the search finishes fast
/// enough.
Buffering,
/// Receiver is directly printing results to the output.
Streaming
}
/// Root directory
static ROOT_DIR : &'static str = "/";
@ -201,6 +212,55 @@ fn scan(root: &Path, pattern: Arc<Regex>, base: &Path, config: Arc<FdOptions>) {
.threads(config.threads.unwrap_or(0))
.build_parallel();
// Spawn the thread that receives all results through the channel.
let rx_config = Arc::clone(&config);
let rx_base = base.to_owned();
let receiver_thread = thread::spawn(move || {
let start = time::Instant::now();
let mut buffer = vec!();
// Start in buffering mode
let mut mode = ReceiverMode::Buffering;
// Maximum time to wait before we start streaming to the console.
let max_buffer_time = rx_config.max_buffer_time
.unwrap_or_else(|| time::Duration::from_millis(100));
for value in rx {
match mode {
ReceiverMode::Buffering => {
buffer.push(value);
// Have we reached the maximum time?
if time::Instant::now() - start > max_buffer_time {
// Flush the buffer
for v in &buffer {
print_entry(&rx_base, v, &rx_config);
}
buffer.clear();
// Start streaming
mode = ReceiverMode::Streaming;
}
},
ReceiverMode::Streaming => {
print_entry(&rx_base, &value, &rx_config);
}
}
}
// If we have finished fast enough (faster than max_buffer_time), we haven't streamed
// anything to the console, yet. In this case, sort the results and print them:
if !buffer.is_empty() {
buffer.sort();
for value in buffer {
print_entry(&rx_base, &value, &rx_config);
}
}
});
// Spawn the sender threads.
walker.run(|| {
let base = base.to_owned();
let config = config.clone();
@ -241,43 +301,8 @@ fn scan(root: &Path, pattern: Arc<Regex>, base: &Path, config: Arc<FdOptions>) {
// if all threads have finished, since there is still one sender around.
drop(tx);
let start = time::Instant::now();
let mut buffer = vec!();
// Are we still buffering (or streaming to the console)?
let mut buffering = true;
// Maximum time to wait before we start streaming to the console.
let max_buffer_time = config.max_buffer_time
.unwrap_or_else(|| time::Duration::from_millis(100));
for value in rx {
if buffering {
buffer.push(value);
// Have we reached the maximum time?
if time::Instant::now() - start > max_buffer_time {
// Flush the buffer
for v in &buffer {
print_entry(base, v, &config);
}
buffer.clear();
// Start streaming
buffering = false;
}
} else {
print_entry(base, &value, &config);
}
}
// If we have finished fast enough (faster than max_buffer_time), we haven't streamed
// anything to the console, yet. In this case, sort the output and print it:
buffer.sort();
for value in buffer {
print_entry(base, &value, &config);
}
// Wait for the receiver thread to print out all results.
receiver_thread.join().unwrap();
}
/// Print error message to stderr and exit with status `1`.