From 4a5a5faf4dc133a8ff0084e4125b95372209d78d Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Sat, 14 Oct 2017 12:04:11 -0400 Subject: [PATCH] Implement --exec feature Closes #84 --- Cargo.lock | 1 + Cargo.toml | 50 +++++++------ README.md | 31 +++++++- src/app.rs | 18 ++++- src/exec/command.rs | 46 ++++++++++++ src/exec/job.rs | 32 ++++++++ src/exec/mod.rs | 178 ++++++++++++++++++++++++++++++++++++++++++++ src/exec/paths.rs | 101 +++++++++++++++++++++++++ src/internal.rs | 4 + src/main.rs | 20 +++-- src/walk.rs | 103 ++++++++++++++++--------- 11 files changed, 515 insertions(+), 69 deletions(-) create mode 100644 src/exec/command.rs create mode 100644 src/exec/job.rs create mode 100644 src/exec/mod.rs create mode 100644 src/exec/paths.rs diff --git a/Cargo.lock b/Cargo.lock index 4069054..ddb1447 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7,6 +7,7 @@ dependencies = [ "clap 2.26.2 (registry+https://github.com/rust-lang/crates.io-index)", "diff 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "ignore 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index d79807d..12e2e45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,36 +1,44 @@ [package] -name = "fd-find" -version = "4.0.0" authors = ["David Peter "] -description = "fd is a simple, fast and user-friendly alternative to find." -homepage = "https://github.com/sharkdp/fd" -repository = "https://github.com/sharkdp/fd" -readme = "README.md" -keywords = ["search", "find", "file", "filesystem", "tool"] -categories = ["command-line-utilities"] -license = "MIT" -exclude = ["benchmarks"] build = "build.rs" - -[badges] -appveyor = { repository = "sharkdp/fd" } -travis-ci = { repository = "sharkdp/fd" } +categories = ["command-line-utilities"] +description = "fd is a simple, fast and user-friendly alternative to find." +exclude = ["benchmarks"] +homepage = "https://github.com/sharkdp/fd" +keywords = [ + "search", + "find", + "file", + "filesystem", + "tool", +] +license = "MIT" +name = "fd-find" +readme = "README.md" +repository = "https://github.com/sharkdp/fd" +version = "4.0.0" [[bin]] name = "fd" path = "src/main.rs" +[badges.appveyor] +repository = "sharkdp/fd" -[dependencies] -ansi_term = "0.9" -clap = "2.26.0" -atty = "0.2" -regex = "0.2" -ignore = "0.2" -num_cpus = "1.6.2" +[badges.travis-ci] +repository = "sharkdp/fd" [build-dependencies] clap = "2.26.0" +[dependencies] +ansi_term = "0.9" +atty = "0.2" +clap = "2.26.0" +ignore = "0.2" +lazy_static = "0.2.9" +num_cpus = "1.6.2" +regex = "0.2" + [dev-dependencies] diff = "0.1" tempdir = "0.3" diff --git a/README.md b/README.md index e3e711a..9e239be 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ While it does not seek to mirror all of *find*'s powerful functionality, it prov * Unicode-awareness. * The command name is *50%* shorter[\*](https://github.com/ggreer/the_silver_searcher) than `find` :-). +* Parallel command execution with a syntax similar to GNU Parallel. ## Demo @@ -34,7 +35,7 @@ subdirectories and about a million files. For averaging and statistical analysis cache". Results for a cold cache are similar. Let's start with `find`: -``` +```sh find ~ -iregex '.*[0-9]\.jpg$' time 6.265 s (6.127 s .. NaN s) @@ -44,7 +45,7 @@ std dev 31.73 ms (0.0 s .. 33.48 ms) ``` `find` is much faster if it does not need to perform a regular-expression search: -``` +```sh find ~ -iname '*[0-9].jpg' time 2.866 s (2.754 s .. 2.964 s) @@ -56,7 +57,7 @@ std dev 23.11 ms (0.0 s .. 25.09 ms) Now let's try the same for `fd`. Note that `fd` *always* performs a regular expression search. The options `--hidden` and `--no-ignore` are needed for a fair comparison, otherwise `fd` does not have to traverse hidden folders and ignored paths (see below): -``` +```sh fd --hidden --no-ignore '.*[0-9]\.jpg$' ~ time 892.6 ms (839.0 ms .. 915.4 ms) @@ -70,7 +71,7 @@ same 14030 files :smile:. Finally, let's run `fd` without `--hidden` and `--no-ignore` (this can lead to different search results, of course): -``` +```sh fd '[0-9]\.jpg$' ~ time 159.5 ms (155.8 ms .. 165.3 ms) @@ -96,6 +97,28 @@ complete (and more colorful) variants, see [here](https://github.com/seebi/dircolors-solarized) or [here](https://github.com/trapd00r/LS_COLORS). +## Parallel Command Execution +If the `--exec` flag is specified alongside a command template, a job pool will be created for +generating and executing commands in parallel with each discovered path as the inputs. The syntax +for generating commands is similar to that of GNU Parallel: + +- **{}**: A placeholder token that will be replaced with the discovered path. +- **{.}**: Removes the extension from the path. +- **{/}**: Uses the basename of the discovered path. +- **{//}**: Uses the parent of the discovered path. +- **{/.}**: Uses the basename, with the extension removed. + +```sh +# Demonstration of parallel job execution +fd '*.flac' --exec 'sleep 1; echo $\{SHELL}: {}' + +# This also works, because `SHELL` is not a valid token +fd '*.flac' --exec 'sleep 1; echo ${SHELL}: {}' + +# Real world example of converting flac files into opus files. +fd '*.flac' --type f --exec 'ffmpeg -i "{}" -c:a libopus "{.}.opus"' +``` + ## Install With Rust's package manager [cargo](https://github.com/rust-lang/cargo), you can install *fd* via: ``` diff --git a/src/app.rs b/src/app.rs index cec9868..fbbea62 100644 --- a/src/app.rs +++ b/src/app.rs @@ -95,6 +95,11 @@ pub fn build_app() -> App<'static, 'static> { .takes_value(true) .hidden(true), ) + .arg( + arg("exec") + .long("exec") + .takes_value(true) + ) .arg(arg("pattern")) .arg(arg("path")) } @@ -143,6 +148,15 @@ fn usage() -> HashMap<&'static str, Help> { 'f' or 'file': regular files\n \ 'd' or 'directory': directories\n \ 'l' or 'symlink': symbolic links"); + doc!(h, "exec" + , "Execute each discovered path using the argument that follows as the command expression." + , "Execute each discovered path using the argument that follows as the command expression.\n \ + The following are valid tokens that can be used within the expression for generating commands:\n \ + '{}': places the input in the location of this token\n \ + '{.}': removes the extension from the input\n \ + '{/}': places the basename of the input\n \ + '{//}': places the parent of the input\n \ + '{/.}': places the basename of the input, without the extension\n"); doc!(h, "extension" , "Filter by file extension" , "(Additionally) filter search results by their file extension."); @@ -153,8 +167,8 @@ fn usage() -> HashMap<&'static str, Help> { 'never': do not use colorized output\n \ 'always': always use colorized output"); doc!(h, "threads" - , "Set number of threads to use for searching" - , "Set number of threads to use for searching (default: number of available CPU cores)"); + , "Set number of threads to use for searching & executing" + , "Set number of threads to use for searching & executing (default: number of available CPU cores)"); doc!(h, "max-buffer-time" , "the time (in ms) to buffer, before streaming to the console" , "Amount of time in milliseconds to buffer, before streaming the search results to\ diff --git a/src/exec/command.rs b/src/exec/command.rs new file mode 100644 index 0000000..e954c8b --- /dev/null +++ b/src/exec/command.rs @@ -0,0 +1,46 @@ +use std::process::Command; +use std::env; + +lazy_static! { + /// On non-Windows systems, the `SHELL` environment variable will be used to determine the + /// preferred shell of choice for execution. Windows will simply use `cmd`. + static ref COMMAND: (String, &'static str) = if cfg!(target_os = "windows") { + ("cmd".into(), "/C") + } else { + (env::var("SHELL").unwrap_or("/bin/sh".into()), "-c") + }; +} + +/// A state that offers access to executing a generated command. +/// +/// The ticket holds a mutable reference to a string that contains the command to be executed. +/// After execution of the the command via the `then_execute()` method, the string will be +/// cleared so that a new command can be written to the string in the future. +pub struct CommandTicket<'a> { + command: &'a mut String, +} + +impl<'a> CommandTicket<'a> { + pub fn new(command: &'a mut String) -> CommandTicket<'a> { + CommandTicket { command } + } + + /// Executes the command stored within the ticket, and + /// clearing the command's buffer when finished. + pub fn then_execute(self) { + // Spawn a shell with the supplied command. + let cmd = Command::new(COMMAND.0.as_str()) + .arg(COMMAND.1) + .arg(&self.command) + .spawn(); + + // Then wait for the command to exit, if it was spawned. + match cmd { + Ok(mut child) => { let _ = child.wait(); }, + Err(why) => eprintln!("fd: exec error: {}", why), + } + + // Clear the buffer for later re-use. + self.command.clear(); + } +} diff --git a/src/exec/job.rs b/src/exec/job.rs new file mode 100644 index 0000000..4d861ea --- /dev/null +++ b/src/exec/job.rs @@ -0,0 +1,32 @@ +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use std::sync::mpsc::Receiver; + +use super::TokenizedCommand; + +/// An event loop that listens for inputs from the `rx` receiver. Each received input will +/// generate a command with the supplied command template. The generated command will then +/// be executed, and this process will continue until the receiver's sender has closed. +pub fn job(rx: Arc>>, cmd: Arc) { + // A string buffer that will be re-used in each iteration. + let buffer = &mut String::with_capacity(256); + + loop { + // Create a lock on the shared receiver for this thread. + let lock = rx.lock().unwrap(); + + // Obtain the next path from the receiver, else if the channel + // has closed, exit from the loop + let value = match lock.recv() { + Ok(value) => value, + Err(_) => break + }; + + // Drop the lock so that other threads can read from the the receiver. + drop(lock); + + // Generate a command to store within the buffer, and execute the command. + // Note that the `then_execute()` method will clear the buffer for us. + cmd.generate(buffer, &value).then_execute(); + } +} \ No newline at end of file diff --git a/src/exec/mod.rs b/src/exec/mod.rs new file mode 100644 index 0000000..6e44c86 --- /dev/null +++ b/src/exec/mod.rs @@ -0,0 +1,178 @@ +// TODO: Possible optimization could avoid pushing characters on a buffer. +mod command; +mod job; +mod paths; + +use std::fmt::{self, Display, Formatter}; +use std::path::Path; + +use self::paths::{basename, dirname, remove_extension}; +use self::command::CommandTicket; +pub use self::job::job; + +/// Designates what should be written to a buffer +/// +/// Each `Token` contains either text, or a placeholder variant, which will be used to generate +/// commands after all tokens for a given command template have been collected. +#[derive(Clone, Debug, PartialEq)] +pub enum Token { + Placeholder, + Basename, + Parent, + NoExt, + BasenameNoExt, + Text(String), +} + +/// Signifies that a placeholder token was found +const PLACE: u8 = 1; + +/// Signifies that the '{' character was found. +const OPEN: u8 = 2; + +/// Contains a collection of `Token`'s that are utilized to generate command strings. +/// +/// The tokens are a represntation of the supplied command template, and are meant to be coupled +/// with an input in order to generate a command. The `generate()` method will be used to +/// generate a command and obtain a ticket for executing that command. +#[derive(Debug, Clone, PartialEq)] +pub struct TokenizedCommand { + pub tokens: Vec, +} + +impl TokenizedCommand { + pub fn new(input: &str) -> TokenizedCommand { + let mut tokens = Vec::new(); + let mut start = 0; + let mut flags = 0; + let mut chars = input.char_indices(); + let mut text = String::new(); + + while let Some((id, character)) = chars.next() { + match character { + // Backslashes are useful in cases where we want to use the '{' character + // without having all occurrences of it to collect placeholder tokens. + '\\' => if let Some((_, nchar)) = chars.next() { + if nchar != '{' { + text.push(character); + } + text.push(nchar); + } + // When a raw '{' is discovered, we will note it's position, and use that for a + // later comparison against valid placeholder tokens. + '{' if flags & OPEN == 0 => { + flags |= OPEN; + start = id; + if !text.is_empty() { + append(&mut tokens, &text); + text.clear(); + } + } + // If the `OPEN` bit is set, we will compare the contents between the discovered + // '{' and '}' characters against a list of valid tokens, then pushing the + // corresponding token onto the `tokens` vector. + '}' if flags & OPEN != 0 => { + flags ^= OPEN; + match &input[start+1..id] { + "" => { + tokens.push(Token::Placeholder); + flags |= PLACE; + } + "." => tokens.push(Token::NoExt), + "/" => tokens.push(Token::Basename), + "//" => tokens.push(Token::Parent), + "/." => tokens.push(Token::BasenameNoExt), + _ => append(&mut tokens, &input[start..id+1]), + } + } + // We aren't collecting characters for a text string if the `OPEN` bit is set. + _ if flags & OPEN != 0 => (), + // Push the character onto the text buffer + _ => text.push(character) + } + } + + // Take care of any stragglers left behind. + if !text.is_empty() { append(&mut tokens, &text); } + + // If a placeholder token was not supplied, append one at the end of the command. + if flags & PLACE == 0 { + append(&mut tokens, " "); + tokens.push(Token::Placeholder) + } + + TokenizedCommand { tokens } + } + + /// Generates a ticket that is required to execute the generated command. + /// + /// Using the internal `tokens` field, and a supplied `input` variable, commands will be + /// written into the `command` buffer. Once all tokens have been processed, the mutable + /// reference of the `command` will be wrapped within a `CommandTicket`, which will be + /// responsible for executing the command and clearing the buffer. + pub fn generate<'a>(&self, command: &'a mut String, input: &Path) -> CommandTicket<'a> { + for token in &self.tokens { + match *token { + Token::Basename => *command += basename(&input.to_string_lossy()), + Token::BasenameNoExt => *command += remove_extension(basename(&input.to_string_lossy())), + Token::NoExt => *command += remove_extension(&input.to_string_lossy()), + Token::Parent => *command += dirname(&input.to_string_lossy()), + Token::Placeholder => *command += &input.to_string_lossy(), + Token::Text(ref string) => *command += string, + } + } + + CommandTicket::new(command) + } +} + +/// If the last token is a text token, append to that token. Otherwise, create a new token. +fn append(tokens: &mut Vec, elem: &str) { + // Useful to avoid a borrowing issue with the tokens vector. + let mut append_text = false; + + // If the last token is a `Text` token, simply the `elem` at the end. + match tokens.last_mut() { + Some(&mut Token::Text(ref mut string)) => *string += elem, + _ => append_text = true, + }; + + // Otherwise, we will need to add a new `Text` token that contains the `elem` + if append_text { + tokens.push(Token::Text(String::from(elem))); + } +} + +impl Display for TokenizedCommand { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + for token in &self.tokens { + match *token { + Token::Placeholder => f.write_str("{}")?, + Token::Basename => f.write_str("{/}")?, + Token::Parent => f.write_str("{//}")?, + Token::NoExt => f.write_str("{.}")?, + Token::BasenameNoExt => f.write_str("{/.}")?, + Token::Text(ref string) => f.write_str(string)?, + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::{TokenizedCommand, Token}; + + #[test] + fn tokens() { + let expected = TokenizedCommand { + tokens: vec![ + Token::Text("echo ${SHELL}: ".into()), + Token::Placeholder, + ], + }; + + assert_eq!(TokenizedCommand::new("echo $\\{SHELL}: {}"), expected); + assert_eq!(TokenizedCommand::new("echo ${SHELL}:"), expected); + } +} \ No newline at end of file diff --git a/src/exec/paths.rs b/src/exec/paths.rs new file mode 100644 index 0000000..18a1359 --- /dev/null +++ b/src/exec/paths.rs @@ -0,0 +1,101 @@ +pub fn basename(input: &str) -> &str { + let mut index = 0; + for (id, character) in input.bytes().enumerate() { + if character == b'/' { index = id; } + } + if index == 0 { input } else { &input[index+1..] } +} + +/// Removes the extension of a given input +pub fn remove_extension(input: &str) -> &str { + let mut dir_index = 0; + let mut ext_index = 0; + + for (id, character) in input.bytes().enumerate() { + if character == b'/' { dir_index = id; } + if character == b'.' { ext_index = id; } + } + + // Account for hidden files and directories + if ext_index == 0 || dir_index + 2 > ext_index { input } else { &input[0..ext_index] } +} + +pub fn dirname(input: &str) -> &str { + let mut index = 0; + for (id, character) in input.bytes().enumerate() { + if character == b'/' { index = id; } + } + if index == 0 { "." } else { &input[0..index] } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn path_remove_ext_simple() { + assert_eq!(remove_extension("foo.txt"), "foo"); + } + + #[test] + fn path_remove_ext_dir() { + assert_eq!(remove_extension("dir/foo.txt"), "dir/foo"); + } + + #[test] + fn path_hidden() { + assert_eq!(remove_extension(".foo"), ".foo") + } + + #[test] + fn path_remove_ext_utf8() { + assert_eq!(remove_extension("💖.txt"), "💖"); + } + + #[test] + fn path_remove_ext_empty() { + assert_eq!(remove_extension(""), ""); + } + + #[test] + fn path_basename_simple() { + assert_eq!(basename("foo.txt"), "foo.txt"); + } + + #[test] + fn path_basename_dir() { + assert_eq!(basename("dir/foo.txt"), "foo.txt"); + } + + #[test] + fn path_basename_empty() { + assert_eq!(basename(""), ""); + } + + #[test] + fn path_basename_utf8() { + assert_eq!(basename("💖/foo.txt"), "foo.txt"); + assert_eq!(basename("dir/💖.txt"), "💖.txt"); + } + + #[test] + fn path_dirname_simple() { + assert_eq!(dirname("foo.txt"), "."); + } + + #[test] + fn path_dirname_dir() { + assert_eq!(dirname("dir/foo.txt"), "dir"); + } + + #[test] + fn path_dirname_utf8() { + assert_eq!(dirname("💖/foo.txt"), "💖"); + assert_eq!(dirname("dir/💖.txt"), "dir"); + } + + #[test] + fn path_dirname_empty() { + assert_eq!(dirname(""), "."); + } +} diff --git a/src/internal.rs b/src/internal.rs index 1c1d66a..b2e8938 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -2,6 +2,7 @@ use std::process; use std::time; use std::io::Write; +use exec::TokenizedCommand; use lscolors::LsColors; use walk::FileType; @@ -71,6 +72,9 @@ pub struct FdOptions { /// /// The value (if present) will be a lowercase string without leading dots. pub extension: Option, + + /// If a value is supplied, each item found will be used to generate and execute commands. + pub command: Option } /// Print error message to stderr and exit with status `1`. diff --git a/src/main.rs b/src/main.rs index 9912afd..856826b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,17 @@ -#[macro_use] -extern crate clap; extern crate ansi_term; extern crate atty; -extern crate regex; +#[macro_use] +extern crate clap; extern crate ignore; +#[macro_use] +extern crate lazy_static; extern crate num_cpus; +extern crate regex; -pub mod lscolors; pub mod fshelper; +pub mod lscolors; mod app; +mod exec; mod internal; mod output; mod walk; @@ -23,6 +26,7 @@ use atty::Stream; use regex::RegexBuilder; use internal::{error, FdOptions, PathDisplay, ROOT_DIR}; +use exec::TokenizedCommand; use lscolors::LsColors; use walk::FileType; @@ -88,8 +92,11 @@ fn main() { None }; + let command = matches.value_of("exec") + .map(|x| TokenizedCommand::new(&x)); + let config = FdOptions { - case_sensitive: case_sensitive, + case_sensitive, search_full_path: matches.is_present("full-path"), ignore_hidden: !(matches.is_present("hidden") || matches.occurrences_of("rg-alias-hidden-ignore") >= 2), @@ -116,7 +123,7 @@ fn main() { } else { PathDisplay::Relative }, - ls_colors: ls_colors, + ls_colors, file_type: match matches.value_of("file-type") { Some("f") | Some("file") => FileType::RegularFile, Some("d") | @@ -127,6 +134,7 @@ fn main() { extension: matches.value_of("extension").map(|e| { e.trim_left_matches('.').to_lowercase() }), + command }; let root = Path::new(ROOT_DIR); diff --git a/src/walk.rs b/src/walk.rs index a5666db..ff24fa2 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -1,15 +1,16 @@ -use internal::{error, FdOptions}; +use exec::{self, TokenizedCommand}; use fshelper; +use internal::{error, FdOptions}; use output; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::sync::mpsc::channel; use std::thread; use std::time; -use regex::Regex; use ignore::{self, WalkBuilder}; +use regex::Regex; /// The receiver thread can either be buffering results or directly streaming to the console. enum ReceiverMode { @@ -30,9 +31,14 @@ pub enum FileType { SymLink, } -/// Recursively scan the given search path and search for files / pathnames matching the pattern. +/// Recursively scan the given search path for files / pathnames matching the pattern. +/// +/// If the `--exec` argument was supplied, this will create a thread pool for executing +/// jobs in parallel from a given command line and the discovered paths. Otherwise, each +/// path will simply be written to standard output. pub fn scan(root: &Path, pattern: Arc, base: &Path, config: Arc) { let (tx, rx) = channel(); + let threads = config.threads; let walker = WalkBuilder::new(root) .hidden(config.ignore_hidden) @@ -43,56 +49,81 @@ pub fn scan(root: &Path, pattern: Arc, base: &Path, config: Arc { - buffer.push(value); + // Push the handle of the spawned thread into the vector for later joining. + handles.push(handle); + } - // Have we reached the maximum time? - if time::Instant::now() - start > max_buffer_time { - // Flush the buffer - for v in &buffer { - output::print_entry(&rx_base, v, &rx_config); + // Wait for all threads to exit before exiting the program. + handles.into_iter().for_each(|h| h.join().unwrap()); + } else { + let start = time::Instant::now(); + + let mut buffer = vec![]; + + // Start in buffering mode + let mut mode = ReceiverMode::Buffering; + + // Maximum time to wait before we start streaming to the console. + let max_buffer_time = rx_config.max_buffer_time.unwrap_or_else( + || time::Duration::from_millis(100), + ); + + for value in rx { + match mode { + ReceiverMode::Buffering => { + buffer.push(value); + + // Have we reached the maximum time? + if time::Instant::now() - start > max_buffer_time { + // Flush the buffer + for v in &buffer { + output::print_entry(&rx_base, v, &rx_config); + } + buffer.clear(); + + // Start streaming + mode = ReceiverMode::Streaming; } - buffer.clear(); - - // Start streaming - mode = ReceiverMode::Streaming; + } + ReceiverMode::Streaming => { + output::print_entry(&rx_base, &value, &rx_config); } } - ReceiverMode::Streaming => { + } + + // If we have finished fast enough (faster than max_buffer_time), we haven't streamed + // anything to the console, yet. In this case, sort the results and print them: + if !buffer.is_empty() { + buffer.sort(); + for value in buffer { output::print_entry(&rx_base, &value, &rx_config); } } } - - // If we have finished fast enough (faster than max_buffer_time), we haven't streamed - // anything to the console, yet. In this case, sort the results and print them: - if !buffer.is_empty() { - buffer.sort(); - for value in buffer { - output::print_entry(&rx_base, &value, &rx_config); - } - } }); // Spawn the sender threads.