From 9fb0c5d372062d6193798519526cb7b14ea24fcc Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Mon, 28 Feb 2022 00:39:52 -0700 Subject: [PATCH] Group together output from multi exec commands So that if multiple `--exec` options are given, and the commands are run in parallel, the buffered output for related commands will be consecutive. --- src/exec/command.rs | 122 ++++++++++++++++++++++++++++++-------------- src/exec/job.rs | 5 +- src/exec/mod.rs | 38 ++++++-------- src/walk.rs | 8 +-- 4 files changed, 102 insertions(+), 71 deletions(-) diff --git a/src/exec/command.rs b/src/exec/command.rs index 9c2c5e2..b373ca2 100644 --- a/src/exec/command.rs +++ b/src/exec/command.rs @@ -6,47 +6,93 @@ use std::sync::Mutex; use crate::error::print_error; use crate::exit_codes::ExitCode; -/// Executes a command. -pub fn execute_command( - mut cmd: Command, - out_perm: &Mutex<()>, - enable_output_buffering: bool, -) -> ExitCode { - // Spawn the supplied command. - let output = if enable_output_buffering { - cmd.output() - } else { - // If running on only one thread, don't buffer output - // Allows for viewing and interacting with intermediate command output - cmd.spawn().and_then(|c| c.wait_with_output()) - }; +struct Outputs { + stdout: Vec, + stderr: Vec, +} +struct OutputBuf<'a> { + out_perm: &'a Mutex<()>, + outputs: Vec, +} - // Then wait for the command to exit, if it was spawned. - match output { - Ok(output) => { - // While this lock is active, this thread will be the only thread allowed - // to write its outputs. - let _lock = out_perm.lock().unwrap(); - - let stdout = io::stdout(); - let stderr = io::stderr(); - - let _ = stdout.lock().write_all(&output.stdout); - let _ = stderr.lock().write_all(&output.stderr); - - if output.status.code() == Some(0) { - ExitCode::Success - } else { - ExitCode::GeneralError - } +impl<'a> OutputBuf<'a> { + fn new(out_perm: &'a Mutex<()>) -> Self { + Self { + out_perm, + outputs: Vec::new(), } - Err(ref why) if why.kind() == io::ErrorKind::NotFound => { - print_error(format!("Command not found: {:?}", cmd)); - ExitCode::GeneralError + } + + fn push(&mut self, stdout: Vec, stderr: Vec) { + self.outputs.push(Outputs { stdout, stderr }); + } + + fn write(self) { + // avoid taking the lock if there is nothing to do + if self.outputs.is_empty() { + return; } - Err(why) => { - print_error(format!("Problem while executing command: {}", why)); - ExitCode::GeneralError + // While this lock is active, this thread will be the only thread allowed + // to write its outputs. + let _lock = self.out_perm.lock().unwrap(); + + let stdout = io::stdout(); + let stderr = io::stderr(); + + let mut stdout = stdout.lock(); + let mut stderr = stderr.lock(); + + for output in self.outputs.iter() { + let _ = stdout.write_all(&output.stdout); + let _ = stderr.write_all(&output.stderr); } } } + +/// Executes a command. +pub fn execute_commands>( + cmds: I, + out_perm: &Mutex<()>, + enable_output_buffering: bool, +) -> ExitCode { + let mut out_buf = OutputBuf::new(out_perm); + for mut cmd in cmds { + // Spawn the supplied command. + let output = if enable_output_buffering { + cmd.output() + } else { + // If running on only one thread, don't buffer output + // Allows for viewing and interacting with intermediate command output + cmd.spawn().and_then(|c| c.wait_with_output()) + }; + + // Then wait for the command to exit, if it was spawned. + match output { + Ok(output) => { + if enable_output_buffering { + out_buf.push(output.stdout, output.stderr); + } + if output.status.code() != Some(0) { + out_buf.write(); + return ExitCode::GeneralError; + } + } + Err(why) => { + out_buf.write(); + return handle_cmd_error(&cmd, why); + } + } + } + out_buf.write(); + ExitCode::Success +} + +pub fn handle_cmd_error(cmd: &Command, err: io::Error) -> ExitCode { + if err.kind() == io::ErrorKind::NotFound { + print_error(format!("Command not found: {:?}", cmd)); + ExitCode::GeneralError + } else { + print_error(format!("Problem while executing command: {}", err)); + ExitCode::GeneralError + } +} diff --git a/src/exec/job.rs b/src/exec/job.rs index feda262..f192543 100644 --- a/src/exec/job.rs +++ b/src/exec/job.rs @@ -49,7 +49,6 @@ pub fn batch( rx: Receiver, cmd: &CommandSet, show_filesystem_errors: bool, - buffer_output: bool, limit: usize, ) -> ExitCode { let paths = rx.iter().filter_map(|value| match value { @@ -63,14 +62,14 @@ pub fn batch( }); if limit == 0 { // no limit - return cmd.execute_batch(paths, buffer_output); + return cmd.execute_batch(paths); } let mut exit_codes = Vec::new(); let mut peekable = paths.peekable(); while peekable.peek().is_some() { let limited = peekable.by_ref().take(limit); - let exit_code = cmd.execute_batch(limited, buffer_output); + let exit_code = cmd.execute_batch(limited); exit_codes.push(exit_code); } merge_exitcodes(exit_codes) diff --git a/src/exec/mod.rs b/src/exec/mod.rs index 5cf8414..633b44a 100644 --- a/src/exec/mod.rs +++ b/src/exec/mod.rs @@ -15,7 +15,7 @@ use regex::Regex; use crate::exit_codes::ExitCode; -use self::command::execute_command; +use self::command::{execute_commands, handle_cmd_error}; use self::input::{basename, dirname, remove_extension}; pub use self::job::{batch, job}; use self::token::Token; @@ -86,17 +86,14 @@ impl CommandSet { buffer_output: bool, ) -> ExitCode { let path_separator = self.path_separator.as_deref(); - for cmd in &self.commands { - let exit = - cmd.generate_and_execute(input, path_separator, &mut out_perm, buffer_output); - if exit != ExitCode::Success { - return exit; - } - } - ExitCode::Success + let commands = self + .commands + .iter() + .map(|c| c.generate(input, path_separator)); + execute_commands(commands, &mut out_perm, buffer_output) } - pub fn execute_batch(&self, paths: I, buffer_output: bool) -> ExitCode + pub fn execute_batch(&self, paths: I) -> ExitCode where I: Iterator, { @@ -104,7 +101,7 @@ impl CommandSet { let mut paths = paths.collect::>(); paths.sort(); for cmd in &self.commands { - let exit = cmd.generate_and_execute_batch(&paths, path_separator, buffer_output); + let exit = cmd.generate_and_execute_batch(&paths, path_separator); if exit != ExitCode::Success { return exit; } @@ -189,27 +186,19 @@ impl CommandTemplate { /// Generates and executes a command. /// /// Using the internal `args` field, and a supplied `input` variable, a `Command` will be - /// build. Once all arguments have been processed, the command is executed. - fn generate_and_execute( - &self, - input: &Path, - path_separator: Option<&str>, - out_perm: &mut Arc>, - buffer_output: bool, - ) -> ExitCode { + /// build. + fn generate(&self, input: &Path, path_separator: Option<&str>) -> Command { let mut cmd = Command::new(self.args[0].generate(&input, path_separator)); for arg in &self.args[1..] { cmd.arg(arg.generate(&input, path_separator)); } - - execute_command(cmd, out_perm, buffer_output) + cmd } fn generate_and_execute_batch( &self, paths: &[PathBuf], path_separator: Option<&str>, - buffer_output: bool, ) -> ExitCode { let mut cmd = Command::new(self.args[0].generate("", None)); cmd.stdin(Stdio::inherit()); @@ -230,7 +219,10 @@ impl CommandTemplate { } if has_path { - execute_command(cmd, &Mutex::new(()), buffer_output) + match cmd.spawn().and_then(|mut c| c.wait()) { + Ok(_) => ExitCode::Success, + Err(e) => handle_cmd_error(&cmd, e), + } } else { ExitCode::Success } diff --git a/src/walk.rs b/src/walk.rs index 4343bc3..d947ade 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -350,13 +350,7 @@ fn spawn_receiver( // This will be set to `Some` if the `--exec` argument was supplied. if let Some(ref cmd) = config.command { if cmd.in_batch_mode() { - exec::batch( - rx, - cmd, - show_filesystem_errors, - enable_output_buffering, - config.batch_size, - ) + exec::batch(rx, cmd, show_filesystem_errors, config.batch_size) } else { let shared_rx = Arc::new(Mutex::new(rx));