split spawn_receiver(..) and spawn_senders(..) from scan(..).

This is just a split commit, refraining from renaming too much.

The drop(tx) call is no longer necessary, as the first sender
is dropped at the end of spawn_senders(..)
This commit is contained in:
Alexandru Macovei 2019-01-26 02:01:01 +02:00 committed by David Peter
parent d8bd5f9d51
commit 5ade72a5e1
1 changed files with 38 additions and 17 deletions

View File

@ -17,7 +17,7 @@ use std::io;
use std::path::PathBuf;
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time;
@ -54,7 +54,6 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<FdOptions>) {
.expect("Error: Path vector can not be empty");
let (tx, rx) = channel();
let threads = config.threads;
let show_filesystem_errors = config.show_filesystem_errors;
let mut override_builder = OverrideBuilder::new(first_path_buf.as_path());
@ -121,8 +120,34 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<FdOptions>) {
}
// Spawn the thread that receives all results through the channel.
let rx_config = Arc::clone(&config);
let receiver_thread = thread::spawn(move || {
let receiver_thread = spawn_receiver(Arc::clone(&config), receiver_wtq, rx);
// Spawn the sender threads.
spawn_senders(
Arc::clone(&config),
pattern,
sender_wtq,
parallel_walker,
tx,
);
// Wait for the receiver thread to print out all results.
receiver_thread.join().unwrap();
if wants_to_quit.load(Ordering::Relaxed) {
process::exit(ExitCode::KilledBySigint.into());
}
}
fn spawn_receiver(
rx_config: Arc<FdOptions>,
receiver_wtq: Arc<AtomicBool>,
rx: Receiver<WorkerResult>,
) -> thread::JoinHandle<()> {
let show_filesystem_errors = rx_config.show_filesystem_errors;
let threads = rx_config.threads;
thread::spawn(move || {
// This will be set to `Some` if the `--exec` argument was supplied.
if let Some(ref cmd) = rx_config.command {
if cmd.in_batch_mode() {
@ -222,9 +247,16 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<FdOptions>) {
}
}
}
});
})
}
// Spawn the sender threads.
fn spawn_senders(
config: Arc<FdOptions>,
pattern: Arc<Regex>,
sender_wtq: Arc<AtomicBool>,
parallel_walker: ignore::WalkParallel,
tx: Sender<WorkerResult>,
) {
parallel_walker.run(|| {
let config = Arc::clone(&config);
let pattern = Arc::clone(&pattern);
@ -349,15 +381,4 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<FdOptions>) {
ignore::WalkState::Continue
})
});
// Drop the initial sender. If we don't do this, the receiver will block even
// if all threads have finished, since there is still one sender around.
drop(tx);
// Wait for the receiver thread to print out all results.
receiver_thread.join().unwrap();
if wants_to_quit.load(Ordering::Relaxed) {
process::exit(ExitCode::KilledBySigint.into());
}
}