walk: Use unbounded channels

We originally switched to bounded channels for backpressure to fix #918.
However, bounded channels have a significant initialization overhead as
they pre-allocate a fixed-size buffer for the messages.

This implementation uses a different backpressure strategy: each thread
gets a limited-size pool of WorkerResults.  When the size limit is hit,
the sender thread has to wait for the receiver thread to handle a result
from that pool and recycle it.

Inspired by [snmalloc], results are recycled by sending the boxed result
over a channel back to the thread that allocated it.  By allocating and
freeing each WorkerResult from the same thread, allocator contention is
reduced dramatically.  And since we now pass results by pointer instead
of by value, message passing overhead is reduced as well.

Fixes #1408.

[snmalloc]: https://github.com/microsoft/snmalloc
This commit is contained in:
Tavian Barnes 2023-10-30 12:05:54 -04:00
parent 8bbbd7679b
commit d588971245
2 changed files with 107 additions and 31 deletions

View File

@ -3,10 +3,9 @@ use std::sync::Mutex;
use crossbeam_channel::Receiver;
use crate::config::Config;
use crate::dir_entry::DirEntry;
use crate::error::print_error;
use crate::exit_codes::{merge_exitcodes, ExitCode};
use crate::walk::WorkerResult;
use crate::walk::{WorkerMsg, WorkerResult};
use super::CommandSet;
@ -14,7 +13,7 @@ use super::CommandSet;
/// generate a command with the supplied command template. The generated command will then
/// be executed, and this process will continue until the receiver's sender has closed.
pub fn job(
rx: Receiver<WorkerResult>,
rx: Receiver<WorkerMsg>,
cmd: &CommandSet,
out_perm: &Mutex<()>,
config: &Config,
@ -26,7 +25,8 @@ pub fn job(
loop {
// Obtain the next result from the receiver, else if the channel
// has closed, exit from the loop
let dir_entry: DirEntry = match rx.recv() {
let result = rx.recv().map(WorkerMsg::take);
let dir_entry = match result {
Ok(WorkerResult::Entry(dir_entry)) => dir_entry,
Ok(WorkerResult::Error(err)) => {
if config.show_filesystem_errors {
@ -49,18 +49,19 @@ pub fn job(
merge_exitcodes(results)
}
pub fn batch(rx: Receiver<WorkerResult>, cmd: &CommandSet, config: &Config) -> ExitCode {
let paths = rx
.into_iter()
.filter_map(|worker_result| match worker_result {
WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)),
WorkerResult::Error(err) => {
if config.show_filesystem_errors {
print_error(err.to_string());
pub fn batch(rx: Receiver<WorkerMsg>, cmd: &CommandSet, config: &Config) -> ExitCode {
let paths =
rx.into_iter()
.map(WorkerMsg::take)
.filter_map(|worker_result| match worker_result {
WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)),
WorkerResult::Error(err) => {
if config.show_filesystem_errors {
print_error(err.to_string());
}
None
}
None
}
});
});
cmd.execute_batch(paths, config.batch_size, config.path_separator.as_deref())
}

View File

@ -9,7 +9,7 @@ use std::thread;
use std::time::{Duration, Instant};
use anyhow::{anyhow, Result};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use etcetera::BaseStrategy;
use ignore::overrides::{Override, OverrideBuilder};
use ignore::{self, WalkBuilder, WalkParallel, WalkState};
@ -43,6 +43,77 @@ pub enum WorkerResult {
Error(ignore::Error),
}
/// Storage for a WorkerResult.
type ResultBox = Box<Option<WorkerResult>>;
/// A WorkerResult that recycles itself.
pub struct WorkerMsg {
inner: Option<ResultBox>,
tx: Sender<ResultBox>,
}
impl WorkerMsg {
/// Create a new message.
fn new(inner: ResultBox, tx: Sender<ResultBox>) -> Self {
Self {
inner: Some(inner),
tx,
}
}
/// Extract the result from this message.
pub fn take(mut self) -> WorkerResult {
self.inner.as_mut().unwrap().take().unwrap()
}
}
impl Drop for WorkerMsg {
fn drop(&mut self) {
let _ = self.tx.send(self.inner.take().unwrap());
}
}
/// A pool of WorkerResults that can be recycled.
struct ResultPool {
size: usize,
tx: Sender<ResultBox>,
rx: Receiver<ResultBox>,
}
/// Capacity was chosen empircally to perform similarly to an unbounded channel
const RESULT_POOL_CAPACITY: usize = 0x4000;
impl ResultPool {
/// Create an empty pool.
fn new() -> Self {
let (tx, rx) = unbounded();
Self { size: 0, tx, rx }
}
/// Allocate or recycle a WorkerResult from the pool.
fn get(&mut self, result: WorkerResult) -> WorkerMsg {
let inner = if self.size < RESULT_POOL_CAPACITY {
match self.rx.try_recv() {
Ok(mut inner) => {
*inner = Some(result);
inner
}
Err(_) => {
self.size += 1;
Box::new(Some(result))
}
}
} else {
let mut inner = self.rx.recv().unwrap();
*inner = Some(result);
inner
};
WorkerMsg::new(inner, self.tx.clone())
}
}
/// Maximum size of the output buffer before flushing results to the console
const MAX_BUFFER_LENGTH: usize = 1000;
/// Default duration until output buffering switches to streaming.
@ -56,8 +127,8 @@ struct ReceiverBuffer<'a, W> {
quit_flag: &'a AtomicBool,
/// The ^C notifier.
interrupt_flag: &'a AtomicBool,
/// Receiver for worker results.
rx: Receiver<WorkerResult>,
/// Receiver for worker messages.
rx: Receiver<WorkerMsg>,
/// Standard output.
stdout: W,
/// The current buffer mode.
@ -72,7 +143,7 @@ struct ReceiverBuffer<'a, W> {
impl<'a, W: Write> ReceiverBuffer<'a, W> {
/// Create a new receiver buffer.
fn new(state: &'a WorkerState, rx: Receiver<WorkerResult>, stdout: W) -> Self {
fn new(state: &'a WorkerState, rx: Receiver<WorkerMsg>, stdout: W) -> Self {
let config = &state.config;
let quit_flag = state.quit_flag.as_ref();
let interrupt_flag = state.interrupt_flag.as_ref();
@ -104,7 +175,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
/// Receive the next worker result.
fn recv(&self) -> Result<WorkerResult, RecvTimeoutError> {
match self.mode {
let result = match self.mode {
ReceiverMode::Buffering => {
// Wait at most until we should switch to streaming
self.rx.recv_deadline(self.deadline)
@ -113,7 +184,8 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
// Wait however long it takes for a result
Ok(self.rx.recv()?)
}
}
};
result.map(WorkerMsg::take)
}
/// Wait for a result or state change.
@ -319,7 +391,7 @@ impl WorkerState {
/// Run the receiver work, either on this thread or a pool of background
/// threads (for --exec).
fn receive(&self, rx: Receiver<WorkerResult>) -> ExitCode {
fn receive(&self, rx: Receiver<WorkerMsg>) -> ExitCode {
let config = &self.config;
// This will be set to `Some` if the `--exec` argument was supplied.
@ -355,12 +427,13 @@ impl WorkerState {
}
/// Spawn the sender threads.
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerResult>) {
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerMsg>) {
walker.run(|| {
let patterns = &self.patterns;
let config = &self.config;
let quit_flag = self.quit_flag.as_ref();
let tx = tx.clone();
let mut pool = ResultPool::new();
Box::new(move |entry| {
if quit_flag.load(Ordering::Relaxed) {
@ -387,20 +460,22 @@ impl WorkerState {
DirEntry::broken_symlink(path)
}
_ => {
return match tx.send(WorkerResult::Error(ignore::Error::WithPath {
let result = pool.get(WorkerResult::Error(ignore::Error::WithPath {
path,
err: inner_err,
})) {
}));
return match tx.send(result) {
Ok(_) => WalkState::Continue,
Err(_) => WalkState::Quit,
}
};
}
},
Err(err) => {
return match tx.send(WorkerResult::Error(err)) {
let result = pool.get(WorkerResult::Error(err));
return match tx.send(result) {
Ok(_) => WalkState::Continue,
Err(_) => WalkState::Quit,
}
};
}
};
@ -509,7 +584,8 @@ impl WorkerState {
}
}
let send_result = tx.send(WorkerResult::Entry(entry));
let result = pool.get(WorkerResult::Entry(entry));
let send_result = tx.send(result);
if send_result.is_err() {
return WalkState::Quit;
@ -545,8 +621,7 @@ impl WorkerState {
.unwrap();
}
// Channel capacity was chosen empircally to perform similarly to an unbounded channel
let (tx, rx) = bounded(0x4000 * config.threads);
let (tx, rx) = unbounded();
let exit_code = thread::scope(|scope| {
// Spawn the receiver thread(s)