walk: Limit batch sizes in --exec mode

This commit is contained in:
Tavian Barnes 2023-11-08 10:24:00 -05:00
parent 73260c0e35
commit b8a5f95cf2
1 changed files with 16 additions and 6 deletions

View File

@ -75,21 +75,23 @@ impl IntoIterator for Batch {
struct BatchSender {
batch: Batch,
tx: Sender<Batch>,
limit: usize,
}
impl BatchSender {
fn new(tx: Sender<Batch>) -> Self {
fn new(tx: Sender<Batch>, limit: usize) -> Self {
Self {
batch: Batch::new(),
tx,
limit,
}
}
/// Check if we need to flush a batch.
fn needs_flush(batch: Option<&Vec<WorkerResult>>) -> bool {
fn needs_flush(&self, batch: Option<&Vec<WorkerResult>>) -> bool {
match batch {
// Limit the batch size to provide some backpressure
Some(vec) => vec.len() >= 0x400,
Some(vec) => vec.len() >= self.limit,
// Batch was already taken by the receiver, so make a new one
None => true,
}
@ -99,7 +101,7 @@ impl BatchSender {
fn send(&mut self, item: WorkerResult) -> Result<(), SendError<()>> {
let mut batch = self.batch.lock();
if Self::needs_flush(batch.as_ref()) {
if self.needs_flush(batch.as_ref()) {
drop(batch);
self.batch = Batch::new();
batch = self.batch.lock();
@ -443,7 +445,15 @@ impl WorkerState {
let patterns = &self.patterns;
let config = &self.config;
let quit_flag = self.quit_flag.as_ref();
let mut tx = BatchSender::new(tx.clone());
let mut limit = 0x100;
if let Some(cmd) = &config.command {
if !cmd.in_batch_mode() && config.threads > 1 {
// Evenly distribute work between multiple receivers
limit = 1;
}
}
let mut tx = BatchSender::new(tx.clone(), limit);
Box::new(move |entry| {
if quit_flag.load(Ordering::Relaxed) {
@ -628,7 +638,7 @@ impl WorkerState {
.unwrap();
}
let (tx, rx) = bounded(config.threads);
let (tx, rx) = bounded(2 * config.threads);
let exit_code = thread::scope(|scope| {
// Spawn the receiver thread(s)