Group together output from multi exec commands

So that if multiple `--exec` options are given, and the commands are run
in parallel, the buffered output for related commands will be
consecutive.
This commit is contained in:
Thayne McCombs 2022-02-28 00:39:52 -07:00 committed by David Peter
parent e54e352035
commit 9fb0c5d372
4 changed files with 102 additions and 71 deletions

View File

@ -6,12 +6,57 @@ use std::sync::Mutex;
use crate::error::print_error; use crate::error::print_error;
use crate::exit_codes::ExitCode; use crate::exit_codes::ExitCode;
struct Outputs {
stdout: Vec<u8>,
stderr: Vec<u8>,
}
struct OutputBuf<'a> {
out_perm: &'a Mutex<()>,
outputs: Vec<Outputs>,
}
impl<'a> OutputBuf<'a> {
fn new(out_perm: &'a Mutex<()>) -> Self {
Self {
out_perm,
outputs: Vec::new(),
}
}
fn push(&mut self, stdout: Vec<u8>, stderr: Vec<u8>) {
self.outputs.push(Outputs { stdout, stderr });
}
fn write(self) {
// avoid taking the lock if there is nothing to do
if self.outputs.is_empty() {
return;
}
// While this lock is active, this thread will be the only thread allowed
// to write its outputs.
let _lock = self.out_perm.lock().unwrap();
let stdout = io::stdout();
let stderr = io::stderr();
let mut stdout = stdout.lock();
let mut stderr = stderr.lock();
for output in self.outputs.iter() {
let _ = stdout.write_all(&output.stdout);
let _ = stderr.write_all(&output.stderr);
}
}
}
/// Executes a command. /// Executes a command.
pub fn execute_command( pub fn execute_commands<I: Iterator<Item = Command>>(
mut cmd: Command, cmds: I,
out_perm: &Mutex<()>, out_perm: &Mutex<()>,
enable_output_buffering: bool, enable_output_buffering: bool,
) -> ExitCode { ) -> ExitCode {
let mut out_buf = OutputBuf::new(out_perm);
for mut cmd in cmds {
// Spawn the supplied command. // Spawn the supplied command.
let output = if enable_output_buffering { let output = if enable_output_buffering {
cmd.output() cmd.output()
@ -24,29 +69,30 @@ pub fn execute_command(
// Then wait for the command to exit, if it was spawned. // Then wait for the command to exit, if it was spawned.
match output { match output {
Ok(output) => { Ok(output) => {
// While this lock is active, this thread will be the only thread allowed if enable_output_buffering {
// to write its outputs. out_buf.push(output.stdout, output.stderr);
let _lock = out_perm.lock().unwrap();
let stdout = io::stdout();
let stderr = io::stderr();
let _ = stdout.lock().write_all(&output.stdout);
let _ = stderr.lock().write_all(&output.stderr);
if output.status.code() == Some(0) {
ExitCode::Success
} else {
ExitCode::GeneralError
} }
if output.status.code() != Some(0) {
out_buf.write();
return ExitCode::GeneralError;
} }
Err(ref why) if why.kind() == io::ErrorKind::NotFound => {
print_error(format!("Command not found: {:?}", cmd));
ExitCode::GeneralError
} }
Err(why) => { Err(why) => {
print_error(format!("Problem while executing command: {}", why)); out_buf.write();
return handle_cmd_error(&cmd, why);
}
}
}
out_buf.write();
ExitCode::Success
}
pub fn handle_cmd_error(cmd: &Command, err: io::Error) -> ExitCode {
if err.kind() == io::ErrorKind::NotFound {
print_error(format!("Command not found: {:?}", cmd));
ExitCode::GeneralError
} else {
print_error(format!("Problem while executing command: {}", err));
ExitCode::GeneralError ExitCode::GeneralError
} }
}
} }

View File

@ -49,7 +49,6 @@ pub fn batch(
rx: Receiver<WorkerResult>, rx: Receiver<WorkerResult>,
cmd: &CommandSet, cmd: &CommandSet,
show_filesystem_errors: bool, show_filesystem_errors: bool,
buffer_output: bool,
limit: usize, limit: usize,
) -> ExitCode { ) -> ExitCode {
let paths = rx.iter().filter_map(|value| match value { let paths = rx.iter().filter_map(|value| match value {
@ -63,14 +62,14 @@ pub fn batch(
}); });
if limit == 0 { if limit == 0 {
// no limit // no limit
return cmd.execute_batch(paths, buffer_output); return cmd.execute_batch(paths);
} }
let mut exit_codes = Vec::new(); let mut exit_codes = Vec::new();
let mut peekable = paths.peekable(); let mut peekable = paths.peekable();
while peekable.peek().is_some() { while peekable.peek().is_some() {
let limited = peekable.by_ref().take(limit); let limited = peekable.by_ref().take(limit);
let exit_code = cmd.execute_batch(limited, buffer_output); let exit_code = cmd.execute_batch(limited);
exit_codes.push(exit_code); exit_codes.push(exit_code);
} }
merge_exitcodes(exit_codes) merge_exitcodes(exit_codes)

View File

@ -15,7 +15,7 @@ use regex::Regex;
use crate::exit_codes::ExitCode; use crate::exit_codes::ExitCode;
use self::command::execute_command; use self::command::{execute_commands, handle_cmd_error};
use self::input::{basename, dirname, remove_extension}; use self::input::{basename, dirname, remove_extension};
pub use self::job::{batch, job}; pub use self::job::{batch, job};
use self::token::Token; use self::token::Token;
@ -86,17 +86,14 @@ impl CommandSet {
buffer_output: bool, buffer_output: bool,
) -> ExitCode { ) -> ExitCode {
let path_separator = self.path_separator.as_deref(); let path_separator = self.path_separator.as_deref();
for cmd in &self.commands { let commands = self
let exit = .commands
cmd.generate_and_execute(input, path_separator, &mut out_perm, buffer_output); .iter()
if exit != ExitCode::Success { .map(|c| c.generate(input, path_separator));
return exit; execute_commands(commands, &mut out_perm, buffer_output)
}
}
ExitCode::Success
} }
pub fn execute_batch<I>(&self, paths: I, buffer_output: bool) -> ExitCode pub fn execute_batch<I>(&self, paths: I) -> ExitCode
where where
I: Iterator<Item = PathBuf>, I: Iterator<Item = PathBuf>,
{ {
@ -104,7 +101,7 @@ impl CommandSet {
let mut paths = paths.collect::<Vec<_>>(); let mut paths = paths.collect::<Vec<_>>();
paths.sort(); paths.sort();
for cmd in &self.commands { for cmd in &self.commands {
let exit = cmd.generate_and_execute_batch(&paths, path_separator, buffer_output); let exit = cmd.generate_and_execute_batch(&paths, path_separator);
if exit != ExitCode::Success { if exit != ExitCode::Success {
return exit; return exit;
} }
@ -189,27 +186,19 @@ impl CommandTemplate {
/// Generates and executes a command. /// Generates and executes a command.
/// ///
/// Using the internal `args` field, and a supplied `input` variable, a `Command` will be /// Using the internal `args` field, and a supplied `input` variable, a `Command` will be
/// build. Once all arguments have been processed, the command is executed. /// build.
fn generate_and_execute( fn generate(&self, input: &Path, path_separator: Option<&str>) -> Command {
&self,
input: &Path,
path_separator: Option<&str>,
out_perm: &mut Arc<Mutex<()>>,
buffer_output: bool,
) -> ExitCode {
let mut cmd = Command::new(self.args[0].generate(&input, path_separator)); let mut cmd = Command::new(self.args[0].generate(&input, path_separator));
for arg in &self.args[1..] { for arg in &self.args[1..] {
cmd.arg(arg.generate(&input, path_separator)); cmd.arg(arg.generate(&input, path_separator));
} }
cmd
execute_command(cmd, out_perm, buffer_output)
} }
fn generate_and_execute_batch( fn generate_and_execute_batch(
&self, &self,
paths: &[PathBuf], paths: &[PathBuf],
path_separator: Option<&str>, path_separator: Option<&str>,
buffer_output: bool,
) -> ExitCode { ) -> ExitCode {
let mut cmd = Command::new(self.args[0].generate("", None)); let mut cmd = Command::new(self.args[0].generate("", None));
cmd.stdin(Stdio::inherit()); cmd.stdin(Stdio::inherit());
@ -230,7 +219,10 @@ impl CommandTemplate {
} }
if has_path { if has_path {
execute_command(cmd, &Mutex::new(()), buffer_output) match cmd.spawn().and_then(|mut c| c.wait()) {
Ok(_) => ExitCode::Success,
Err(e) => handle_cmd_error(&cmd, e),
}
} else { } else {
ExitCode::Success ExitCode::Success
} }

View File

@ -350,13 +350,7 @@ fn spawn_receiver(
// This will be set to `Some` if the `--exec` argument was supplied. // This will be set to `Some` if the `--exec` argument was supplied.
if let Some(ref cmd) = config.command { if let Some(ref cmd) = config.command {
if cmd.in_batch_mode() { if cmd.in_batch_mode() {
exec::batch( exec::batch(rx, cmd, show_filesystem_errors, config.batch_size)
rx,
cmd,
show_filesystem_errors,
enable_output_buffering,
config.batch_size,
)
} else { } else {
let shared_rx = Arc::new(Mutex::new(rx)); let shared_rx = Arc::new(Mutex::new(rx));