walk: Switch to a semaphore for ResultPool

This commit is contained in:
Tavian Barnes 2023-11-02 16:31:22 -04:00
parent 46967b8837
commit 346afd11d8
1 changed files with 38 additions and 37 deletions

View File

@ -5,7 +5,7 @@ use std::io::{self, Write};
use std::mem; use std::mem;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Condvar, Mutex};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -45,77 +45,78 @@ pub enum WorkerResult {
Error(ignore::Error), Error(ignore::Error),
} }
/// Storage for a WorkerResult.
type ResultBox = Box<Option<WorkerResult>>;
/// A WorkerResult that recycles itself. /// A WorkerResult that recycles itself.
pub struct WorkerMsg<'a> { pub struct WorkerMsg<'a> {
inner: Option<ResultBox>, inner: Option<Box<WorkerResult>>,
tx: &'a Sender<ResultBox>, pool: &'a ResultPool,
} }
impl<'a> WorkerMsg<'a> { impl<'a> WorkerMsg<'a> {
/// Create a new message. /// Create a new message.
fn new(inner: ResultBox, tx: &'a Sender<ResultBox>) -> Self { fn new(inner: Box<WorkerResult>, pool: &'a ResultPool) -> Self {
Self { Self {
inner: Some(inner), inner: Some(inner),
tx, pool,
} }
} }
/// Extract the result from this message. /// Extract the result from this message.
pub fn take(mut self) -> WorkerResult { pub fn take(mut self) -> WorkerResult {
self.inner.as_mut().unwrap().take().unwrap() *self.inner.take().unwrap()
} }
} }
impl Drop for WorkerMsg<'_> { impl Drop for WorkerMsg<'_> {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.tx.send(self.inner.take().unwrap()); self.pool.recycle();
} }
} }
/// A pool of WorkerResults that can be recycled. /// A pool of WorkerResults that can be recycled.
struct ResultPool { struct ResultPool {
size: AtomicUsize, cap: AtomicUsize,
tx: Sender<ResultBox>, mutex: Mutex<()>,
rx: Receiver<ResultBox>, cv: Condvar,
} }
/// Capacity was chosen empircally to perform similarly to an unbounded channel
const RESULT_POOL_CAPACITY: usize = 0x4000;
impl ResultPool { impl ResultPool {
/// Create an empty pool. /// Create an empty pool.
fn new() -> Self { fn new() -> Self {
let size = AtomicUsize::new(0); // Capacity was chosen empircally to perform similarly to an unbounded channel
let (tx, rx) = unbounded(); let cap = AtomicUsize::new(0x4000);
let mutex = Mutex::new(());
let cv = Condvar::new();
Self { size, tx, rx } Self { cap, mutex, cv }
}
/// Try to decrement the capacity.
fn try_get(&self) -> bool {
self.cap
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cap| {
cap.checked_sub(1)
})
.is_ok()
} }
/// Allocate or recycle a WorkerResult from the pool. /// Allocate or recycle a WorkerResult from the pool.
fn get(&self, result: WorkerResult) -> WorkerMsg<'_> { fn get(&self, result: WorkerResult) -> WorkerMsg<'_> {
let size = self.size.load(Ordering::Relaxed); if !self.try_get() {
let guard = self.mutex.lock().unwrap();
let guard = self.cv.wait_while(guard, |_| !self.try_get()).unwrap();
drop(guard);
}
let inner = if size < RESULT_POOL_CAPACITY { WorkerMsg::new(Box::new(result), &self)
match self.rx.try_recv() { }
Ok(mut inner) => {
*inner = Some(result);
inner
}
Err(_) => {
self.size.store(size + 1, Ordering::Relaxed);
Box::new(Some(result))
}
}
} else {
let mut inner = self.rx.recv().unwrap();
*inner = Some(result);
inner
};
WorkerMsg::new(inner, &self.tx) /// Recycle a WorkerResult.
fn recycle(&self) {
let cap = self.cap.fetch_add(1, Ordering::Relaxed);
if cap == 0 {
drop(self.mutex.lock().unwrap());
self.cv.notify_one();
}
} }
} }