Merge pull request #116 from mmstick/exec

Implement the --exec flag
This commit is contained in:
David Peter 2017-10-15 21:36:51 +02:00 committed by GitHub
commit e0eab07881
13 changed files with 671 additions and 71 deletions

View file

@ -44,13 +44,13 @@ matrix:
env: TARGET=x86_64-apple-darwin
# Minimum Rust supported channel.
- os: linux
rust: 1.16.0
rust: 1.19.0
env: TARGET=x86_64-unknown-linux-gnu
- os: linux
rust: 1.16.0
rust: 1.19.0
env: TARGET=x86_64-unknown-linux-musl
- os: osx
rust: 1.16.0
rust: 1.19.0
env: TARGET=x86_64-apple-darwin
notifications:

2
Cargo.lock generated
View file

@ -7,6 +7,8 @@ dependencies = [
"clap 2.26.2 (registry+https://github.com/rust-lang/crates.io-index)",
"diff 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"ignore 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"regex-syntax 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",

View file

@ -1,37 +1,48 @@
[package]
name = "fd-find"
version = "4.0.0"
authors = ["David Peter <mail@david-peter.de>"]
description = "fd is a simple, fast and user-friendly alternative to find."
homepage = "https://github.com/sharkdp/fd"
repository = "https://github.com/sharkdp/fd"
readme = "README.md"
keywords = ["search", "find", "file", "filesystem", "tool"]
categories = ["command-line-utilities"]
license = "MIT"
exclude = ["/benchmarks/*"]
build = "build.rs"
[badges]
appveyor = { repository = "sharkdp/fd" }
travis-ci = { repository = "sharkdp/fd" }
categories = ["command-line-utilities"]
description = "fd is a simple, fast and user-friendly alternative to find."
exclude = ["/benchmarks/*"]
homepage = "https://github.com/sharkdp/fd"
keywords = [
"search",
"find",
"file",
"filesystem",
"tool",
]
license = "MIT"
name = "fd-find"
readme = "README.md"
repository = "https://github.com/sharkdp/fd"
version = "4.0.0"
[[bin]]
name = "fd"
path = "src/main.rs"
[badges.appveyor]
repository = "sharkdp/fd"
[dependencies]
ansi_term = "0.9"
clap = "2.26.0"
atty = "0.2"
regex = "0.2"
regex-syntax = "0.4"
ignore = "0.2"
num_cpus = "1.6.2"
[badges.travis-ci]
repository = "sharkdp/fd"
[build-dependencies]
clap = "2.26.0"
[dependencies]
ansi_term = "0.9"
atty = "0.2"
clap = "2.26.0"
ignore = "0.2"
lazy_static = "0.2.9"
num_cpus = "1.6.2"
regex = "0.2"
regex-syntax = "0.4"
[target.'cfg(all(unix, not(target_os = "redox")))'.dependencies]
libc = "0.2"
[dev-dependencies]
diff = "0.1"
tempdir = "0.3"

View file

@ -22,6 +22,7 @@ While it does not seek to mirror all of *find*'s powerful functionality, it prov
* Unicode-awareness.
* The command name is *50%* shorter[\*](https://github.com/ggreer/the_silver_searcher) than
`find` :-).
* Parallel command execution with a syntax similar to GNU Parallel.
## Demo
@ -96,6 +97,31 @@ complete (and more colorful) variants, see
[here](https://github.com/seebi/dircolors-solarized) or
[here](https://github.com/trapd00r/LS_COLORS).
## Parallel Command Execution
If the `--exec` flag is specified alongside a command template, a job pool will be created for
generating and executing commands in parallel with each discovered path as the inputs. The syntax
for generating commands is similar to that of GNU Parallel:
- **{}**: A placeholder token that will be replaced with the discovered path.
- **{.}**: Removes the extension from the path.
- **{/}**: Uses the basename of the discovered path.
- **{//}**: Uses the parent of the discovered path.
- **{/.}**: Uses the basename, with the extension removed.
```sh
# Demonstration of parallel job execution
fd -e flac --exec 'sleep 1; echo $\{SHELL}: {}'
# This also works, because `SHELL` is not a valid token
fd -e flac --exec 'sleep 1; echo ${SHELL}: {}'
# The token is optional -- it gets added at the end by default.
fd -e flac --exec 'echo'
# Real world example of converting flac files into opus files.
fd -e flac --type f --exec 'ffmpeg -i "{}" -c:a libopus "{.}.opus"'
```
## Install
With Rust's package manager [cargo](https://github.com/rust-lang/cargo), you can install *fd* via:
```

View file

@ -95,6 +95,7 @@ pub fn build_app() -> App<'static, 'static> {
.takes_value(true)
.hidden(true),
)
.arg(arg("exec").long("exec").short("x").takes_value(true))
.arg(arg("pattern"))
.arg(arg("path"))
}
@ -143,6 +144,17 @@ fn usage() -> HashMap<&'static str, Help> {
'f' or 'file': regular files\n \
'd' or 'directory': directories\n \
'l' or 'symlink': symbolic links");
doc!(h, "exec"
, "Execute each discovered path using the argument that follows as the command expression."
, "Execute each discovered path using the argument that follows as the command \
expression.\n \
The following are valid tokens that can be used within the expression for generating \
commands:\n \
'{}': places the input in the location of this token\n \
'{.}': removes the extension from the input\n \
'{/}': places the basename of the input\n \
'{//}': places the parent of the input\n \
'{/.}': places the basename of the input, without the extension\n");
doc!(h, "extension"
, "Filter by file extension"
, "(Additionally) filter search results by their file extension.");
@ -153,8 +165,9 @@ fn usage() -> HashMap<&'static str, Help> {
'never': do not use colorized output\n \
'always': always use colorized output");
doc!(h, "threads"
, "Set number of threads to use for searching"
, "Set number of threads to use for searching (default: number of available CPU cores)");
, "Set number of threads to use for searching & executing"
, "Set number of threads to use for searching & executing (default: number of available \
CPU cores)");
doc!(h, "max-buffer-time"
, "the time (in ms) to buffer, before streaming to the console"
, "Amount of time in milliseconds to buffer, before streaming the search results to\

42
src/exec/job.rs Normal file
View file

@ -0,0 +1,42 @@
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::Receiver;
use super::TokenizedCommand;
/// An event loop that listens for inputs from the `rx` receiver. Each received input will
/// generate a command with the supplied command template. The generated command will then
/// be executed, and this process will continue until the receiver's sender has closed.
pub fn job(
rx: Arc<Mutex<Receiver<PathBuf>>>,
base: Arc<Option<PathBuf>>,
cmd: Arc<TokenizedCommand>,
out_perm: Arc<Mutex<()>>,
) {
// A string buffer that will be re-used in each iteration.
let buffer = &mut String::with_capacity(256);
loop {
// Create a lock on the shared receiver for this thread.
let lock = rx.lock().unwrap();
// Obtain the next path from the receiver, else if the channel
// has closed, exit from the loop
let value: PathBuf = match lock.recv() {
Ok(value) => {
match *base {
Some(ref base) => base.join(&value),
None => value,
}
}
Err(_) => break,
};
// Drop the lock so that other threads can read from the the receiver.
drop(lock);
// Generate a command to store within the buffer, and execute the command.
// Note that the `then_execute()` method will clear the buffer for us.
cmd.generate(buffer, &value, out_perm.clone())
.then_execute();
}
}

163
src/exec/mod.rs Normal file
View file

@ -0,0 +1,163 @@
// TODO: Possible optimization could avoid pushing characters on a buffer.
mod ticket;
mod token;
mod job;
mod paths;
use std::path::Path;
use std::sync::{Arc, Mutex};
use self::paths::{basename, dirname, remove_extension};
use self::ticket::CommandTicket;
use self::token::Token;
pub use self::job::job;
/// Signifies that a placeholder token was found
const PLACE: u8 = 1;
/// Signifies that the '{' character was found.
const OPEN: u8 = 2;
/// Contains a collection of `Token`'s that are utilized to generate command strings.
///
/// The tokens are a represntation of the supplied command template, and are meant to be coupled
/// with an input in order to generate a command. The `generate()` method will be used to
/// generate a command and obtain a ticket for executing that command.
#[derive(Debug, Clone, PartialEq)]
pub struct TokenizedCommand {
pub tokens: Vec<Token>,
}
impl TokenizedCommand {
pub fn new(input: &str) -> TokenizedCommand {
let mut tokens = Vec::new();
let mut start = 0;
let mut flags = 0;
let mut chars = input.char_indices();
let mut text = String::new();
while let Some((id, character)) = chars.next() {
match character {
// Backslashes are useful in cases where we want to use the '{' character
// without having all occurrences of it to collect placeholder tokens.
'\\' => {
if let Some((_, nchar)) = chars.next() {
if nchar != '{' {
text.push(character);
}
text.push(nchar);
}
}
// When a raw '{' is discovered, we will note it's position, and use that for a
// later comparison against valid placeholder tokens.
'{' if flags & OPEN == 0 => {
flags |= OPEN;
start = id;
if !text.is_empty() {
append(&mut tokens, &text);
text.clear();
}
}
// If the `OPEN` bit is set, we will compare the contents between the discovered
// '{' and '}' characters against a list of valid tokens, then pushing the
// corresponding token onto the `tokens` vector.
'}' if flags & OPEN != 0 => {
flags ^= OPEN;
match &input[start + 1..id] {
"" => tokens.push(Token::Placeholder),
"." => tokens.push(Token::NoExt),
"/" => tokens.push(Token::Basename),
"//" => tokens.push(Token::Parent),
"/." => tokens.push(Token::BasenameNoExt),
_ => {
append(&mut tokens, &input[start..id + 1]);
continue;
}
}
flags |= PLACE;
}
// We aren't collecting characters for a text string if the `OPEN` bit is set.
_ if flags & OPEN != 0 => (),
// Push the character onto the text buffer
_ => text.push(character),
}
}
// Take care of any stragglers left behind.
if !text.is_empty() {
append(&mut tokens, &text);
}
// If a placeholder token was not supplied, append one at the end of the command.
if flags & PLACE == 0 {
append(&mut tokens, " ");
tokens.push(Token::Placeholder)
}
TokenizedCommand { tokens: tokens }
}
/// Generates a ticket that is required to execute the generated command.
///
/// Using the internal `tokens` field, and a supplied `input` variable, commands will be
/// written into the `command` buffer. Once all tokens have been processed, the mutable
/// reference of the `command` will be wrapped within a `CommandTicket`, which will be
/// responsible for executing the command and clearing the buffer.
pub fn generate<'a>(
&self,
command: &'a mut String,
input: &Path,
out_perm: Arc<Mutex<()>>,
) -> CommandTicket<'a> {
for token in &self.tokens {
match *token {
Token::Basename => *command += basename(&input.to_string_lossy()),
Token::BasenameNoExt => {
*command += remove_extension(basename(&input.to_string_lossy()))
}
Token::NoExt => *command += remove_extension(&input.to_string_lossy()),
Token::Parent => *command += dirname(&input.to_string_lossy()),
Token::Placeholder => *command += &input.to_string_lossy(),
Token::Text(ref string) => *command += string,
}
}
CommandTicket::new(command, out_perm)
}
}
/// If the last token is a text token, append to that token. Otherwise, create a new token.
fn append(tokens: &mut Vec<Token>, elem: &str) {
// Useful to avoid a borrowing issue with the tokens vector.
let mut append_text = false;
// If the last token is a `Text` token, simply the `elem` at the end.
match tokens.last_mut() {
Some(&mut Token::Text(ref mut string)) => *string += elem,
_ => append_text = true,
};
// Otherwise, we will need to add a new `Text` token that contains the `elem`
if append_text {
tokens.push(Token::Text(String::from(elem)));
}
}
#[cfg(test)]
mod tests {
use super::{TokenizedCommand, Token};
#[test]
fn tokens() {
let expected = TokenizedCommand {
tokens: vec![Token::Text("echo ${SHELL}: ".into()), Token::Placeholder],
};
assert_eq!(TokenizedCommand::new("echo $\\{SHELL}: {}"), expected);
assert_eq!(TokenizedCommand::new("echo ${SHELL}:"), expected);
assert_eq!(
TokenizedCommand::new("echo {.}"),
TokenizedCommand { tokens: vec![Token::Text("echo ".into()), Token::NoExt] }
);
}
}

128
src/exec/paths.rs Normal file
View file

@ -0,0 +1,128 @@
use std::path::MAIN_SEPARATOR;
pub fn basename(input: &str) -> &str {
let mut index = 0;
for (id, character) in input.char_indices() {
if character == MAIN_SEPARATOR {
index = id;
}
}
if index == 0 {
input
} else {
&input[index + 1..]
}
}
/// Removes the extension of a given input
pub fn remove_extension(input: &str) -> &str {
let mut dir_index = 0;
let mut ext_index = 0;
for (id, character) in input.char_indices() {
if character == MAIN_SEPARATOR {
dir_index = id;
}
if character == '.' {
ext_index = id;
}
}
// Account for hidden files and directories
if ext_index == 0 || dir_index + 2 > ext_index {
input
} else {
&input[0..ext_index]
}
}
pub fn dirname(input: &str) -> &str {
let mut index = 0;
for (id, character) in input.char_indices() {
if character == MAIN_SEPARATOR {
index = id;
}
}
if index == 0 { "." } else { &input[0..index] }
}
#[cfg(test)]
mod tests {
use super::*;
fn correct(input: &str) -> String {
let mut sep = String::new();
sep.push(MAIN_SEPARATOR);
input.replace('/', &sep)
}
#[test]
fn path_remove_ext_simple() {
assert_eq!(remove_extension("foo.txt"), "foo");
}
#[test]
fn path_remove_ext_dir() {
assert_eq!(
remove_extension(&correct("dir/foo.txt")),
&correct("dir/foo")
);
}
#[test]
fn path_hidden() {
assert_eq!(remove_extension(".foo"), ".foo")
}
#[test]
fn path_remove_ext_utf8() {
assert_eq!(remove_extension("💖.txt"), "💖");
}
#[test]
fn path_remove_ext_empty() {
assert_eq!(remove_extension(""), "");
}
#[test]
fn path_basename_simple() {
assert_eq!(basename("foo.txt"), "foo.txt");
}
#[test]
fn path_basename_dir() {
assert_eq!(basename(&correct("dir/foo.txt")), "foo.txt");
}
#[test]
fn path_basename_empty() {
assert_eq!(basename(""), "");
}
#[test]
fn path_basename_utf8() {
assert_eq!(basename(&correct("💖/foo.txt")), "foo.txt");
assert_eq!(basename(&correct("dir/💖.txt")), "💖.txt");
}
#[test]
fn path_dirname_simple() {
assert_eq!(dirname("foo.txt"), ".");
}
#[test]
fn path_dirname_dir() {
assert_eq!(dirname(&correct("dir/foo.txt")), "dir");
}
#[test]
fn path_dirname_utf8() {
assert_eq!(dirname(&correct("💖/foo.txt")), "💖");
assert_eq!(dirname(&correct("dir/💖.txt")), "dir");
}
#[test]
fn path_dirname_empty() {
assert_eq!(dirname(""), ".");
}
}

133
src/exec/ticket.rs Normal file
View file

@ -0,0 +1,133 @@
use std::env;
use std::process::Command;
use std::sync::{Arc, Mutex};
use std::io;
lazy_static! {
/// On non-Windows systems, the `SHELL` environment variable will be used to determine the
/// preferred shell of choice for execution. Windows will simply use `cmd`.
static ref COMMAND: (String, &'static str) = if cfg!(target_os = "windows") {
("cmd".into(), "/C")
} else {
(env::var("SHELL").unwrap_or("/bin/sh".into()), "-c")
};
}
/// A state that offers access to executing a generated command.
///
/// The ticket holds a mutable reference to a string that contains the command to be executed.
/// After execution of the the command via the `then_execute()` method, the string will be
/// cleared so that a new command can be written to the string in the future.
pub struct CommandTicket<'a> {
command: &'a mut String,
out_perm: Arc<Mutex<()>>,
}
impl<'a> CommandTicket<'a> {
pub fn new(command: &'a mut String, out_perm: Arc<Mutex<()>>) -> CommandTicket<'a> {
CommandTicket { command, out_perm }
}
/// Executes the command stored within the ticket, and
/// clearing the command's buffer when finished.'
#[cfg(not(all(unix, not(target_os = "redox"))))]
pub fn then_execute(self) {
use std::process::Stdio;
use std::io::Write;
// Spawn a shell with the supplied command.
let cmd = Command::new(COMMAND.0.as_str())
.arg(COMMAND.1)
.arg(&self.command)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output();
// Then wait for the command to exit, if it was spawned.
match cmd {
Ok(output) => {
// While this lock is active, this thread will be the only thread allowed
// to write it's outputs.
let _lock = self.out_perm.lock().unwrap();
let stdout = io::stdout();
let stderr = io::stderr();
let _ = stdout.lock().write_all(&output.stdout);
let _ = stderr.lock().write_all(&output.stderr);
}
Err(why) => eprintln!("fd: exec error: {}", why),
}
// Clear the buffer for later re-use.
self.command.clear();
}
#[cfg(all(unix, not(target_os = "redox")))]
pub fn then_execute(self) {
use libc::*;
use std::fs::File;
use std::os::unix::process::CommandExt;
use std::os::unix::io::FromRawFd;
// Initial a pair of pipes that will be used to
// pipe the std{out,err} of the spawned process.
let mut stdout_fds = [0; 2];
let mut stderr_fds = [0; 2];
unsafe {
pipe(stdout_fds.as_mut_ptr());
pipe(stderr_fds.as_mut_ptr());
}
// Spawn a shell with the supplied command.
let cmd = Command::new(COMMAND.0.as_str())
.arg(COMMAND.1)
.arg(&self.command)
// Configure the pipes accordingly in the child.
.before_exec(move || unsafe {
// Redirect the child's std{out,err} to the write ends of our pipe.
dup2(stdout_fds[1], STDOUT_FILENO);
dup2(stderr_fds[1], STDERR_FILENO);
// Close all the fds we created here, so EOF will be sent when the program exits.
close(stdout_fds[0]);
close(stdout_fds[1]);
close(stderr_fds[0]);
close(stderr_fds[1]);
Ok(())
})
.spawn();
// Open the read end of the pipes as `File`s.
let (mut pout, mut perr) = unsafe {
// Close the write ends of the pipes in the parent
close(stdout_fds[1]);
close(stderr_fds[1]);
(
// But create files from the read ends.
File::from_raw_fd(stdout_fds[0]),
File::from_raw_fd(stderr_fds[0]),
)
};
match cmd {
Ok(mut child) => {
let _ = child.wait();
// Create a lock to ensure that this thread has exclusive access to writing.
let _lock = self.out_perm.lock().unwrap();
// And then write the outputs of the process until EOF is sent to each file.
let stdout = io::stdout();
let stderr = io::stderr();
let _ = io::copy(&mut pout, &mut stdout.lock());
let _ = io::copy(&mut perr, &mut stderr.lock());
}
Err(why) => eprintln!("fd: exec error: {}", why),
}
// Clear the command string's buffer for later re-use.
self.command.clear();
}
}

29
src/exec/token.rs Normal file
View file

@ -0,0 +1,29 @@
use std::fmt::{self, Display, Formatter};
/// Designates what should be written to a buffer
///
/// Each `Token` contains either text, or a placeholder variant, which will be used to generate
/// commands after all tokens for a given command template have been collected.
#[derive(Clone, Debug, PartialEq)]
pub enum Token {
Placeholder,
Basename,
Parent,
NoExt,
BasenameNoExt,
Text(String),
}
impl Display for Token {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match *self {
Token::Placeholder => f.write_str("{}")?,
Token::Basename => f.write_str("{/}")?,
Token::Parent => f.write_str("{//}")?,
Token::NoExt => f.write_str("{.}")?,
Token::BasenameNoExt => f.write_str("{/.}")?,
Token::Text(ref string) => f.write_str(string)?,
}
Ok(())
}
}

View file

@ -2,6 +2,7 @@ use std::process;
use std::time;
use std::io::Write;
use exec::TokenizedCommand;
use lscolors::LsColors;
use walk::FileType;
use regex_syntax::{Expr, ExprBuilder};
@ -72,6 +73,9 @@ pub struct FdOptions {
///
/// The value (if present) will be a lowercase string without leading dots.
pub extension: Option<String>,
/// If a value is supplied, each item found will be used to generate and execute commands.
pub command: Option<TokenizedCommand>,
}
/// Print error message to stderr and exit with status `1`.

View file

@ -1,15 +1,20 @@
#[macro_use]
extern crate clap;
extern crate ansi_term;
extern crate atty;
#[macro_use]
extern crate clap;
extern crate ignore;
#[macro_use]
extern crate lazy_static;
#[cfg(all(unix, not(target_os = "redox")))]
extern crate libc;
extern crate num_cpus;
extern crate regex;
extern crate regex_syntax;
extern crate ignore;
extern crate num_cpus;
pub mod lscolors;
pub mod fshelper;
pub mod lscolors;
mod app;
mod exec;
mod internal;
mod output;
mod walk;
@ -23,6 +28,7 @@ use std::time;
use atty::Stream;
use regex::RegexBuilder;
use exec::TokenizedCommand;
use internal::{error, pattern_has_uppercase_char, FdOptions, PathDisplay, ROOT_DIR};
use lscolors::LsColors;
use walk::FileType;
@ -86,8 +92,10 @@ fn main() {
None
};
let command = matches.value_of("exec").map(|x| TokenizedCommand::new(&x));
let config = FdOptions {
case_sensitive: case_sensitive,
case_sensitive,
search_full_path: matches.is_present("full-path"),
ignore_hidden: !(matches.is_present("hidden") ||
matches.occurrences_of("rg-alias-hidden-ignore") >= 2),
@ -109,8 +117,8 @@ fn main() {
.value_of("max-buffer-time")
.and_then(|n| u64::from_str_radix(n, 10).ok())
.map(time::Duration::from_millis),
path_display: path_display,
ls_colors: ls_colors,
path_display,
ls_colors,
file_type: match matches.value_of("file-type") {
Some("f") | Some("file") => FileType::RegularFile,
Some("d") |
@ -121,6 +129,7 @@ fn main() {
extension: matches.value_of("extension").map(|e| {
e.trim_left_matches('.').to_lowercase()
}),
command,
};
// If base_dir is ROOT_DIR, then root_dir must be absolute.

View file

@ -1,15 +1,16 @@
use internal::{error, FdOptions};
use exec::{self, TokenizedCommand};
use fshelper;
use internal::{error, FdOptions, PathDisplay};
use output;
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;
use std::thread;
use std::time;
use regex::Regex;
use ignore::{self, WalkBuilder};
use regex::Regex;
/// The receiver thread can either be buffering results or directly streaming to the console.
enum ReceiverMode {
@ -30,9 +31,14 @@ pub enum FileType {
SymLink,
}
/// Recursively scan the given search path and search for files / pathnames matching the pattern.
/// Recursively scan the given search path for files / pathnames matching the pattern.
///
/// If the `--exec` argument was supplied, this will create a thread pool for executing
/// jobs in parallel from a given command line and the discovered paths. Otherwise, each
/// path will simply be written to standard output.
pub fn scan(root: &Path, pattern: Arc<Regex>, base: &Path, config: Arc<FdOptions>) {
let (tx, rx) = channel();
let threads = config.threads;
let walker = WalkBuilder::new(root)
.hidden(config.ignore_hidden)
@ -43,56 +49,90 @@ pub fn scan(root: &Path, pattern: Arc<Regex>, base: &Path, config: Arc<FdOptions
.git_exclude(config.read_ignore)
.follow_links(config.follow_links)
.max_depth(config.max_depth)
.threads(config.threads)
.threads(threads)
.build_parallel();
// Spawn the thread that receives all results through the channel.
let rx_config = Arc::clone(&config);
let rx_base = base.to_owned();
let is_absolute = config.path_display == PathDisplay::Absolute;
let receiver_thread = thread::spawn(move || {
let start = time::Instant::now();
// This will be set to `Some` if the `--exec` argument was supplied.
if let Some(ref cmd) = rx_config.command {
let shared_rx = Arc::new(Mutex::new(rx));
let mut buffer = vec![];
let out_perm = Arc::new(Mutex::new(()));
// Start in buffering mode
let mut mode = ReceiverMode::Buffering;
let base = Arc::new(if is_absolute { Some(rx_base) } else { None });
// Maximum time to wait before we start streaming to the console.
let max_buffer_time = rx_config.max_buffer_time.unwrap_or_else(
|| time::Duration::from_millis(100),
);
// This is safe because `cmd` will exist beyond the end of this scope.
// It's required to tell Rust that it's safe to share across threads.
let cmd = unsafe { Arc::from_raw(cmd as *const TokenizedCommand) };
for value in rx {
match mode {
ReceiverMode::Buffering => {
buffer.push(value);
// Each spawned job will store it's thread handle in here.
let mut handles = Vec::with_capacity(threads);
for _ in 0..threads {
let rx = shared_rx.clone();
let cmd = cmd.clone();
let base = base.clone();
let out_perm = out_perm.clone();
// Have we reached the maximum time?
if time::Instant::now() - start > max_buffer_time {
// Flush the buffer
for v in &buffer {
output::print_entry(&rx_base, v, &rx_config);
// Spawn a job thread that will listen for and execute inputs.
let handle = thread::spawn(move || exec::job(rx, base, cmd, out_perm));
// Push the handle of the spawned thread into the vector for later joining.
handles.push(handle);
}
// Wait for all threads to exit before exiting the program.
for h in handles {
h.join().unwrap();
}
} else {
let start = time::Instant::now();
let mut buffer = vec![];
// Start in buffering mode
let mut mode = ReceiverMode::Buffering;
// Maximum time to wait before we start streaming to the console.
let max_buffer_time = rx_config.max_buffer_time.unwrap_or_else(
|| time::Duration::from_millis(100),
);
for value in rx {
match mode {
ReceiverMode::Buffering => {
buffer.push(value);
// Have we reached the maximum time?
if time::Instant::now() - start > max_buffer_time {
// Flush the buffer
for v in &buffer {
output::print_entry(&rx_base, v, &rx_config);
}
buffer.clear();
// Start streaming
mode = ReceiverMode::Streaming;
}
buffer.clear();
// Start streaming
mode = ReceiverMode::Streaming;
}
ReceiverMode::Streaming => {
output::print_entry(&rx_base, &value, &rx_config);
}
}
ReceiverMode::Streaming => {
}
// If we have finished fast enough (faster than max_buffer_time), we haven't streamed
// anything to the console, yet. In this case, sort the results and print them:
if !buffer.is_empty() {
buffer.sort();
for value in buffer {
output::print_entry(&rx_base, &value, &rx_config);
}
}
}
// If we have finished fast enough (faster than max_buffer_time), we haven't streamed
// anything to the console, yet. In this case, sort the results and print them:
if !buffer.is_empty() {
buffer.sort();
for value in buffer {
output::print_entry(&rx_base, &value, &rx_config);
}
}
});
// Spawn the sender threads.