mirror of https://github.com/sharkdp/fd.git
walk: Switch back to crossbeam-channel
Fixes #933. Fixes #1060. Fixes #1113.
This commit is contained in:
parent
93e5488420
commit
3742e44948
|
@ -162,6 +162,16 @@ version = "0.8.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.8.12"
|
||||
|
@ -295,6 +305,7 @@ dependencies = [
|
|||
"chrono",
|
||||
"clap",
|
||||
"clap_complete",
|
||||
"crossbeam-channel",
|
||||
"ctrlc",
|
||||
"diff",
|
||||
"dirs-next",
|
||||
|
|
|
@ -51,6 +51,7 @@ dirs-next = "2.0"
|
|||
normpath = "0.3.2"
|
||||
chrono = "0.4"
|
||||
once_cell = "1.15.0"
|
||||
crossbeam-channel = "0.5.6"
|
||||
|
||||
[dependencies.clap]
|
||||
version = "3.1"
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use std::sync::mpsc::Receiver;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crossbeam_channel::Receiver;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::dir_entry::DirEntry;
|
||||
use crate::error::print_error;
|
||||
|
@ -13,7 +14,7 @@ use super::CommandSet;
|
|||
/// generate a command with the supplied command template. The generated command will then
|
||||
/// be executed, and this process will continue until the receiver's sender has closed.
|
||||
pub fn job(
|
||||
rx: Arc<Mutex<Receiver<WorkerResult>>>,
|
||||
rx: Receiver<WorkerResult>,
|
||||
cmd: Arc<CommandSet>,
|
||||
out_perm: Arc<Mutex<()>>,
|
||||
config: &Config,
|
||||
|
@ -23,12 +24,9 @@ pub fn job(
|
|||
|
||||
let mut results: Vec<ExitCode> = Vec::new();
|
||||
loop {
|
||||
// Create a lock on the shared receiver for this thread.
|
||||
let lock = rx.lock().unwrap();
|
||||
|
||||
// Obtain the next result from the receiver, else if the channel
|
||||
// has closed, exit from the loop
|
||||
let dir_entry: DirEntry = match lock.recv() {
|
||||
let dir_entry: DirEntry = match rx.recv() {
|
||||
Ok(WorkerResult::Entry(dir_entry)) => dir_entry,
|
||||
Ok(WorkerResult::Error(err)) => {
|
||||
if config.show_filesystem_errors {
|
||||
|
@ -39,8 +37,6 @@ pub fn job(
|
|||
Err(_) => break,
|
||||
};
|
||||
|
||||
// Drop the lock so that other threads can read from the receiver.
|
||||
drop(lock);
|
||||
// Generate a command, execute it and store its exit code.
|
||||
results.push(cmd.execute(
|
||||
dir_entry.stripped_path(config),
|
||||
|
|
14
src/walk.rs
14
src/walk.rs
|
@ -3,13 +3,13 @@ use std::io;
|
|||
use std::mem;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{borrow::Cow, io::Write};
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
|
||||
use ignore::overrides::OverrideBuilder;
|
||||
use ignore::{self, WalkBuilder};
|
||||
use regex::bytes::Regex;
|
||||
|
@ -51,7 +51,7 @@ pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100);
|
|||
/// path will simply be written to standard output.
|
||||
pub fn scan(paths: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> Result<ExitCode> {
|
||||
let first_path = &paths[0];
|
||||
let (tx, rx) = channel();
|
||||
let (tx, rx) = unbounded();
|
||||
|
||||
let mut override_builder = OverrideBuilder::new(first_path);
|
||||
|
||||
|
@ -222,11 +222,7 @@ impl<W: Write> ReceiverBuffer<W> {
|
|||
match self.mode {
|
||||
ReceiverMode::Buffering => {
|
||||
// Wait at most until we should switch to streaming
|
||||
let now = Instant::now();
|
||||
self.deadline
|
||||
.checked_duration_since(now)
|
||||
.ok_or(RecvTimeoutError::Timeout)
|
||||
.and_then(|t| self.rx.recv_timeout(t))
|
||||
self.rx.recv_deadline(self.deadline)
|
||||
}
|
||||
ReceiverMode::Streaming => {
|
||||
// Wait however long it takes for a result
|
||||
|
@ -345,15 +341,13 @@ fn spawn_receiver(
|
|||
if cmd.in_batch_mode() {
|
||||
exec::batch(rx, cmd, &config)
|
||||
} else {
|
||||
let shared_rx = Arc::new(Mutex::new(rx));
|
||||
|
||||
let out_perm = Arc::new(Mutex::new(()));
|
||||
|
||||
// Each spawned job will store it's thread handle in here.
|
||||
let mut handles = Vec::with_capacity(threads);
|
||||
for _ in 0..threads {
|
||||
let config = Arc::clone(&config);
|
||||
let rx = Arc::clone(&shared_rx);
|
||||
let rx = rx.clone();
|
||||
let cmd = Arc::clone(cmd);
|
||||
let out_perm = Arc::clone(&out_perm);
|
||||
|
||||
|
|
Loading…
Reference in New Issue