mirror of https://github.com/sharkdp/fd.git
walk: Hold the Sender by reference in WorkerMsg
This commit is contained in:
parent
d588971245
commit
46967b8837
|
@ -340,6 +340,7 @@ dependencies = [
|
|||
"regex-syntax",
|
||||
"tempfile",
|
||||
"test-case",
|
||||
"thread_local",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
|
@ -799,6 +800,16 @@ dependencies = [
|
|||
"test-case-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thread_local"
|
||||
version = "1.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.12"
|
||||
|
|
|
@ -49,6 +49,7 @@ normpath = "1.1.1"
|
|||
crossbeam-channel = "0.5.8"
|
||||
clap_complete = {version = "4.4.4", optional = true}
|
||||
faccess = "0.2.4"
|
||||
thread_local = "1.1.7"
|
||||
|
||||
[patch.crates-io]
|
||||
ignore = { git = "https://github.com/tavianator/ripgrep", branch = "fd" }
|
||||
|
|
|
@ -13,7 +13,7 @@ use super::CommandSet;
|
|||
/// generate a command with the supplied command template. The generated command will then
|
||||
/// be executed, and this process will continue until the receiver's sender has closed.
|
||||
pub fn job(
|
||||
rx: Receiver<WorkerMsg>,
|
||||
rx: Receiver<WorkerMsg<'_>>,
|
||||
cmd: &CommandSet,
|
||||
out_perm: &Mutex<()>,
|
||||
config: &Config,
|
||||
|
@ -49,7 +49,7 @@ pub fn job(
|
|||
merge_exitcodes(results)
|
||||
}
|
||||
|
||||
pub fn batch(rx: Receiver<WorkerMsg>, cmd: &CommandSet, config: &Config) -> ExitCode {
|
||||
pub fn batch(rx: Receiver<WorkerMsg<'_>>, cmd: &CommandSet, config: &Config) -> ExitCode {
|
||||
let paths =
|
||||
rx.into_iter()
|
||||
.map(WorkerMsg::take)
|
||||
|
|
50
src/walk.rs
50
src/walk.rs
|
@ -1,9 +1,10 @@
|
|||
use std::borrow::Cow;
|
||||
use std::cell::OnceCell;
|
||||
use std::ffi::OsStr;
|
||||
use std::io::{self, Write};
|
||||
use std::mem;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
@ -14,6 +15,7 @@ use etcetera::BaseStrategy;
|
|||
use ignore::overrides::{Override, OverrideBuilder};
|
||||
use ignore::{self, WalkBuilder, WalkParallel, WalkState};
|
||||
use regex::bytes::Regex;
|
||||
use thread_local::ThreadLocal;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::dir_entry::DirEntry;
|
||||
|
@ -47,14 +49,14 @@ pub enum WorkerResult {
|
|||
type ResultBox = Box<Option<WorkerResult>>;
|
||||
|
||||
/// A WorkerResult that recycles itself.
|
||||
pub struct WorkerMsg {
|
||||
pub struct WorkerMsg<'a> {
|
||||
inner: Option<ResultBox>,
|
||||
tx: Sender<ResultBox>,
|
||||
tx: &'a Sender<ResultBox>,
|
||||
}
|
||||
|
||||
impl WorkerMsg {
|
||||
impl<'a> WorkerMsg<'a> {
|
||||
/// Create a new message.
|
||||
fn new(inner: ResultBox, tx: Sender<ResultBox>) -> Self {
|
||||
fn new(inner: ResultBox, tx: &'a Sender<ResultBox>) -> Self {
|
||||
Self {
|
||||
inner: Some(inner),
|
||||
tx,
|
||||
|
@ -67,7 +69,7 @@ impl WorkerMsg {
|
|||
}
|
||||
}
|
||||
|
||||
impl Drop for WorkerMsg {
|
||||
impl Drop for WorkerMsg<'_> {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.tx.send(self.inner.take().unwrap());
|
||||
}
|
||||
|
@ -75,7 +77,7 @@ impl Drop for WorkerMsg {
|
|||
|
||||
/// A pool of WorkerResults that can be recycled.
|
||||
struct ResultPool {
|
||||
size: usize,
|
||||
size: AtomicUsize,
|
||||
tx: Sender<ResultBox>,
|
||||
rx: Receiver<ResultBox>,
|
||||
}
|
||||
|
@ -86,21 +88,24 @@ const RESULT_POOL_CAPACITY: usize = 0x4000;
|
|||
impl ResultPool {
|
||||
/// Create an empty pool.
|
||||
fn new() -> Self {
|
||||
let size = AtomicUsize::new(0);
|
||||
let (tx, rx) = unbounded();
|
||||
|
||||
Self { size: 0, tx, rx }
|
||||
Self { size, tx, rx }
|
||||
}
|
||||
|
||||
/// Allocate or recycle a WorkerResult from the pool.
|
||||
fn get(&mut self, result: WorkerResult) -> WorkerMsg {
|
||||
let inner = if self.size < RESULT_POOL_CAPACITY {
|
||||
fn get(&self, result: WorkerResult) -> WorkerMsg<'_> {
|
||||
let size = self.size.load(Ordering::Relaxed);
|
||||
|
||||
let inner = if size < RESULT_POOL_CAPACITY {
|
||||
match self.rx.try_recv() {
|
||||
Ok(mut inner) => {
|
||||
*inner = Some(result);
|
||||
inner
|
||||
}
|
||||
Err(_) => {
|
||||
self.size += 1;
|
||||
self.size.store(size + 1, Ordering::Relaxed);
|
||||
Box::new(Some(result))
|
||||
}
|
||||
}
|
||||
|
@ -110,7 +115,7 @@ impl ResultPool {
|
|||
inner
|
||||
};
|
||||
|
||||
WorkerMsg::new(inner, self.tx.clone())
|
||||
WorkerMsg::new(inner, &self.tx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,7 +133,7 @@ struct ReceiverBuffer<'a, W> {
|
|||
/// The ^C notifier.
|
||||
interrupt_flag: &'a AtomicBool,
|
||||
/// Receiver for worker messages.
|
||||
rx: Receiver<WorkerMsg>,
|
||||
rx: Receiver<WorkerMsg<'a>>,
|
||||
/// Standard output.
|
||||
stdout: W,
|
||||
/// The current buffer mode.
|
||||
|
@ -143,7 +148,7 @@ struct ReceiverBuffer<'a, W> {
|
|||
|
||||
impl<'a, W: Write> ReceiverBuffer<'a, W> {
|
||||
/// Create a new receiver buffer.
|
||||
fn new(state: &'a WorkerState, rx: Receiver<WorkerMsg>, stdout: W) -> Self {
|
||||
fn new(state: &'a WorkerState, rx: Receiver<WorkerMsg<'a>>, stdout: W) -> Self {
|
||||
let config = &state.config;
|
||||
let quit_flag = state.quit_flag.as_ref();
|
||||
let interrupt_flag = state.interrupt_flag.as_ref();
|
||||
|
@ -291,18 +296,22 @@ struct WorkerState {
|
|||
quit_flag: Arc<AtomicBool>,
|
||||
/// Flag specifically for quitting due to ^C
|
||||
interrupt_flag: Arc<AtomicBool>,
|
||||
/// WorkerResult pools.
|
||||
pools: ThreadLocal<ResultPool>,
|
||||
}
|
||||
|
||||
impl WorkerState {
|
||||
fn new(patterns: Vec<Regex>, config: Config) -> Self {
|
||||
let quit_flag = Arc::new(AtomicBool::new(false));
|
||||
let interrupt_flag = Arc::new(AtomicBool::new(false));
|
||||
let pools = ThreadLocal::new();
|
||||
|
||||
Self {
|
||||
patterns,
|
||||
config,
|
||||
quit_flag,
|
||||
interrupt_flag,
|
||||
pools,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -391,7 +400,7 @@ impl WorkerState {
|
|||
|
||||
/// Run the receiver work, either on this thread or a pool of background
|
||||
/// threads (for --exec).
|
||||
fn receive(&self, rx: Receiver<WorkerMsg>) -> ExitCode {
|
||||
fn receive<'a>(&'a self, rx: Receiver<WorkerMsg<'a>>) -> ExitCode {
|
||||
let config = &self.config;
|
||||
|
||||
// This will be set to `Some` if the `--exec` argument was supplied.
|
||||
|
@ -426,20 +435,27 @@ impl WorkerState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Create a new ResultPool for a sender.
|
||||
fn get_pool(&self) -> &ResultPool {
|
||||
self.pools.get_or(ResultPool::new)
|
||||
}
|
||||
|
||||
/// Spawn the sender threads.
|
||||
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerMsg>) {
|
||||
fn spawn_senders<'a>(&'a self, walker: WalkParallel, tx: Sender<WorkerMsg<'a>>) {
|
||||
walker.run(|| {
|
||||
let patterns = &self.patterns;
|
||||
let config = &self.config;
|
||||
let quit_flag = self.quit_flag.as_ref();
|
||||
let tx = tx.clone();
|
||||
let mut pool = ResultPool::new();
|
||||
let pool = OnceCell::new();
|
||||
|
||||
Box::new(move |entry| {
|
||||
if quit_flag.load(Ordering::Relaxed) {
|
||||
return WalkState::Quit;
|
||||
}
|
||||
|
||||
let pool = pool.get_or_init(|| self.get_pool());
|
||||
|
||||
let entry = match entry {
|
||||
Ok(ref e) if e.depth() == 0 => {
|
||||
// Skip the root directory entry.
|
||||
|
|
Loading…
Reference in New Issue