From 5ade72a5e1277d98f1cc2c05842a86416ce1fe51 Mon Sep 17 00:00:00 2001 From: Alexandru Macovei Date: Sat, 26 Jan 2019 02:01:01 +0200 Subject: [PATCH] split spawn_receiver(..) and spawn_senders(..) from scan(..). This is just a split commit, refraining from renaming too much. The drop(tx) call is no longer necessary, as the first sender is dropped at the end of spawn_senders(..) --- src/walk.rs | 55 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/src/walk.rs b/src/walk.rs index cda2e9d..812c697 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -17,7 +17,7 @@ use std::io; use std::path::PathBuf; use std::process; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::channel; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time; @@ -54,7 +54,6 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) { .expect("Error: Path vector can not be empty"); let (tx, rx) = channel(); let threads = config.threads; - let show_filesystem_errors = config.show_filesystem_errors; let mut override_builder = OverrideBuilder::new(first_path_buf.as_path()); @@ -121,8 +120,34 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) { } // Spawn the thread that receives all results through the channel. - let rx_config = Arc::clone(&config); - let receiver_thread = thread::spawn(move || { + let receiver_thread = spawn_receiver(Arc::clone(&config), receiver_wtq, rx); + + // Spawn the sender threads. + spawn_senders( + Arc::clone(&config), + pattern, + sender_wtq, + parallel_walker, + tx, + ); + + // Wait for the receiver thread to print out all results. + receiver_thread.join().unwrap(); + + if wants_to_quit.load(Ordering::Relaxed) { + process::exit(ExitCode::KilledBySigint.into()); + } +} + +fn spawn_receiver( + rx_config: Arc, + receiver_wtq: Arc, + rx: Receiver, +) -> thread::JoinHandle<()> { + let show_filesystem_errors = rx_config.show_filesystem_errors; + let threads = rx_config.threads; + + thread::spawn(move || { // This will be set to `Some` if the `--exec` argument was supplied. if let Some(ref cmd) = rx_config.command { if cmd.in_batch_mode() { @@ -222,9 +247,16 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) { } } } - }); + }) +} - // Spawn the sender threads. +fn spawn_senders( + config: Arc, + pattern: Arc, + sender_wtq: Arc, + parallel_walker: ignore::WalkParallel, + tx: Sender, +) { parallel_walker.run(|| { let config = Arc::clone(&config); let pattern = Arc::clone(&pattern); @@ -349,15 +381,4 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) { ignore::WalkState::Continue }) }); - - // Drop the initial sender. If we don't do this, the receiver will block even - // if all threads have finished, since there is still one sender around. - drop(tx); - - // Wait for the receiver thread to print out all results. - receiver_thread.join().unwrap(); - - if wants_to_quit.load(Ordering::Relaxed) { - process::exit(ExitCode::KilledBySigint.into()); - } }