mirror of
https://github.com/BurntSushi/ripgrep.git
synced 2025-05-19 09:40:22 -07:00
crates/ignore: switch to depth first traversal
This replaces the use of channels in the parallel directory traversal with a simple stack. The primary motivation for this change is to reduce peak memory usage. In particular, when using a channel (which is a queue), we wind up visiting files in a breadth first fashion. Using a stack switches us to a depth first traversal. While there are no real intrinsic differences, depth first traversal generally tends to use less memory because directory trees are more commonly wide than they are deep. In particular, the queue/stack size itself is not the only concern. In one recent case documented in #1550, a user wanted to search all Rust crates. The directory structure was shallow but extremely wide, with a single directory containing all crates. This in turn results is in descending into each of those directories and building a gitignore matcher for each (since most crates have `.gitignore` files) before ever searching a single file. This means that ripgrep has all such matchers in memory simultaneously, which winds up using quite a bit of memory. In a depth first traversal, peak memory usage is much lower because gitignore matches are built and discarded more quickly. In the case of searching all crates, the peak memory usage decrease is dramatic. On my system, it shrinks by an order magnitude, from almost 1GB to 50MB. The decline in peak memory usage is consistent across other use cases as well, but is typically more modest. For example, searching the Linux repo has a 50% decrease in peak memory usage and searching the Chromium repo has a 25% decrease in peak memory usage. Search times generally remain unchanged, although some ad hoc benchmarks that I typically run have gotten a bit slower. As far as I can tell, this appears to be result of scheduling changes. Namely, the depth first traversal seems to result in searching some very large files towards the end of the search, which reduces the effectiveness of parallelism and makes the overall search take longer. This seems to suggest that a stack isn't optimal. It would instead perhaps be better to prioritize searching larger files first, but it's not quite clear how to do this without introducing more overhead (getting the file size for each file requires a stat call). Fixes #1550
This commit is contained in:
parent
afb325f733
commit
139f186e57
@ -18,7 +18,6 @@ name = "ignore"
|
|||||||
bench = false
|
bench = false
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
crossbeam-channel = "0.4.0"
|
|
||||||
crossbeam-utils = "0.7.0"
|
crossbeam-utils = "0.7.0"
|
||||||
globset = { version = "0.4.3", path = "../globset" }
|
globset = { version = "0.4.3", path = "../globset" }
|
||||||
lazy_static = "1.1"
|
lazy_static = "1.1"
|
||||||
@ -32,5 +31,8 @@ walkdir = "2.2.7"
|
|||||||
[target.'cfg(windows)'.dependencies.winapi-util]
|
[target.'cfg(windows)'.dependencies.winapi-util]
|
||||||
version = "0.1.2"
|
version = "0.1.2"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
crossbeam-channel = "0.4.0"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
simd-accel = ["globset/simd-accel"]
|
simd-accel = ["globset/simd-accel"]
|
||||||
|
@ -46,7 +46,6 @@ See the documentation for `WalkBuilder` for many other options.
|
|||||||
|
|
||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
extern crate crossbeam_channel as channel;
|
|
||||||
extern crate globset;
|
extern crate globset;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate lazy_static;
|
extern crate lazy_static;
|
||||||
|
@ -5,10 +5,11 @@ use std::fs::{self, FileType, Metadata};
|
|||||||
use std::io;
|
use std::io;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::thread;
|
||||||
|
use std::time::Duration;
|
||||||
use std::vec;
|
use std::vec;
|
||||||
|
|
||||||
use channel::{self, TryRecvError};
|
|
||||||
use same_file::Handle;
|
use same_file::Handle;
|
||||||
use walkdir::{self, WalkDir};
|
use walkdir::{self, WalkDir};
|
||||||
|
|
||||||
@ -364,7 +365,8 @@ impl DirEntryRaw {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Placeholder implementation to allow compiling on non-standard platforms (e.g. wasm32).
|
// Placeholder implementation to allow compiling on non-standard platforms
|
||||||
|
// (e.g. wasm32).
|
||||||
#[cfg(not(any(windows, unix)))]
|
#[cfg(not(any(windows, unix)))]
|
||||||
fn from_entry_os(
|
fn from_entry_os(
|
||||||
depth: usize,
|
depth: usize,
|
||||||
@ -413,7 +415,8 @@ impl DirEntryRaw {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Placeholder implementation to allow compiling on non-standard platforms (e.g. wasm32).
|
// Placeholder implementation to allow compiling on non-standard platforms
|
||||||
|
// (e.g. wasm32).
|
||||||
#[cfg(not(any(windows, unix)))]
|
#[cfg(not(any(windows, unix)))]
|
||||||
fn from_path(
|
fn from_path(
|
||||||
depth: usize,
|
depth: usize,
|
||||||
@ -1186,16 +1189,9 @@ impl WalkParallel {
|
|||||||
/// can be merged together into a single data structure.
|
/// can be merged together into a single data structure.
|
||||||
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder) {
|
pub fn visit(mut self, builder: &mut dyn ParallelVisitorBuilder) {
|
||||||
let threads = self.threads();
|
let threads = self.threads();
|
||||||
// TODO: Figure out how to use a bounded channel here. With an
|
let stack = Arc::new(Mutex::new(vec![]));
|
||||||
// unbounded channel, the workers can run away and fill up memory
|
|
||||||
// with all of the file paths. But a bounded channel doesn't work since
|
|
||||||
// our producers are also are consumers, so they end up getting stuck.
|
|
||||||
//
|
|
||||||
// We probably need to rethink parallel traversal completely to fix
|
|
||||||
// this. The best case scenario would be finding a way to use rayon
|
|
||||||
// to do this.
|
|
||||||
let (tx, rx) = channel::unbounded();
|
|
||||||
{
|
{
|
||||||
|
let mut stack = stack.lock().unwrap();
|
||||||
let mut visitor = builder.build();
|
let mut visitor = builder.build();
|
||||||
let mut paths = Vec::new().into_iter();
|
let mut paths = Vec::new().into_iter();
|
||||||
std::mem::swap(&mut paths, &mut self.paths);
|
std::mem::swap(&mut paths, &mut self.paths);
|
||||||
@ -1232,28 +1228,27 @@ impl WalkParallel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
tx.send(Message::Work(Work {
|
stack.push(Message::Work(Work {
|
||||||
dent: dent,
|
dent: dent,
|
||||||
ignore: self.ig_root.clone(),
|
ignore: self.ig_root.clone(),
|
||||||
root_device: root_device,
|
root_device: root_device,
|
||||||
}))
|
}));
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
// ... but there's no need to start workers if we don't need them.
|
// ... but there's no need to start workers if we don't need them.
|
||||||
if tx.is_empty() {
|
if stack.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Create the workers and then wait for them to finish.
|
// Create the workers and then wait for them to finish.
|
||||||
let quit_now = Arc::new(AtomicBool::new(false));
|
let quit_now = Arc::new(AtomicBool::new(false));
|
||||||
let num_pending = Arc::new(AtomicUsize::new(tx.len()));
|
let num_pending =
|
||||||
|
Arc::new(AtomicUsize::new(stack.lock().unwrap().len()));
|
||||||
crossbeam_utils::thread::scope(|s| {
|
crossbeam_utils::thread::scope(|s| {
|
||||||
let mut handles = vec![];
|
let mut handles = vec![];
|
||||||
for _ in 0..threads {
|
for _ in 0..threads {
|
||||||
let worker = Worker {
|
let worker = Worker {
|
||||||
visitor: builder.build(),
|
visitor: builder.build(),
|
||||||
tx: tx.clone(),
|
stack: stack.clone(),
|
||||||
rx: rx.clone(),
|
|
||||||
quit_now: quit_now.clone(),
|
quit_now: quit_now.clone(),
|
||||||
num_pending: num_pending.clone(),
|
num_pending: num_pending.clone(),
|
||||||
max_depth: self.max_depth,
|
max_depth: self.max_depth,
|
||||||
@ -1263,8 +1258,6 @@ impl WalkParallel {
|
|||||||
};
|
};
|
||||||
handles.push(s.spawn(|_| worker.run()));
|
handles.push(s.spawn(|_| worker.run()));
|
||||||
}
|
}
|
||||||
drop(tx);
|
|
||||||
drop(rx);
|
|
||||||
for handle in handles {
|
for handle in handles {
|
||||||
handle.join().unwrap();
|
handle.join().unwrap();
|
||||||
}
|
}
|
||||||
@ -1362,10 +1355,13 @@ impl Work {
|
|||||||
struct Worker<'s> {
|
struct Worker<'s> {
|
||||||
/// The caller's callback.
|
/// The caller's callback.
|
||||||
visitor: Box<dyn ParallelVisitor + 's>,
|
visitor: Box<dyn ParallelVisitor + 's>,
|
||||||
/// The push side of our mpmc queue.
|
/// A stack of work to do.
|
||||||
tx: channel::Sender<Message>,
|
///
|
||||||
/// The receive side of our mpmc queue.
|
/// We use a stack instead of a channel because a stack lets us visit
|
||||||
rx: channel::Receiver<Message>,
|
/// directories in depth first order. This can substantially reduce peak
|
||||||
|
/// memory usage by keeping both the number of files path and gitignore
|
||||||
|
/// matchers in memory lower.
|
||||||
|
stack: Arc<Mutex<Vec<Message>>>,
|
||||||
/// Whether all workers should terminate at the next opportunity. Note
|
/// Whether all workers should terminate at the next opportunity. Note
|
||||||
/// that we need this because we don't want other `Work` to be done after
|
/// that we need this because we don't want other `Work` to be done after
|
||||||
/// we quit. We wouldn't need this if have a priority channel.
|
/// we quit. We wouldn't need this if have a priority channel.
|
||||||
@ -1550,25 +1546,25 @@ impl<'s> Worker<'s> {
|
|||||||
/// If all work has been exhausted, then this returns None. The worker
|
/// If all work has been exhausted, then this returns None. The worker
|
||||||
/// should then subsequently quit.
|
/// should then subsequently quit.
|
||||||
fn get_work(&mut self) -> Option<Work> {
|
fn get_work(&mut self) -> Option<Work> {
|
||||||
let mut value = self.rx.try_recv();
|
let mut value = self.recv();
|
||||||
loop {
|
loop {
|
||||||
// Simulate a priority channel: If quit_now flag is set, we can
|
// Simulate a priority channel: If quit_now flag is set, we can
|
||||||
// receive only quit messages.
|
// receive only quit messages.
|
||||||
if self.is_quit_now() {
|
if self.is_quit_now() {
|
||||||
value = Ok(Message::Quit)
|
value = Some(Message::Quit)
|
||||||
}
|
}
|
||||||
match value {
|
match value {
|
||||||
Ok(Message::Work(work)) => {
|
Some(Message::Work(work)) => {
|
||||||
return Some(work);
|
return Some(work);
|
||||||
}
|
}
|
||||||
Ok(Message::Quit) => {
|
Some(Message::Quit) => {
|
||||||
// Repeat quit message to wake up sleeping threads, if
|
// Repeat quit message to wake up sleeping threads, if
|
||||||
// any. The domino effect will ensure that every thread
|
// any. The domino effect will ensure that every thread
|
||||||
// will quit.
|
// will quit.
|
||||||
self.tx.send(Message::Quit).unwrap();
|
self.send_quit();
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
Err(TryRecvError::Empty) => {
|
None => {
|
||||||
// Once num_pending reaches 0, it is impossible for it to
|
// Once num_pending reaches 0, it is impossible for it to
|
||||||
// ever increase again. Namely, it only reaches 0 once
|
// ever increase again. Namely, it only reaches 0 once
|
||||||
// all jobs have run such that no jobs have produced more
|
// all jobs have run such that no jobs have produced more
|
||||||
@ -1580,17 +1576,21 @@ impl<'s> Worker<'s> {
|
|||||||
if self.num_pending() == 0 {
|
if self.num_pending() == 0 {
|
||||||
// Every other thread is blocked at the next recv().
|
// Every other thread is blocked at the next recv().
|
||||||
// Send the initial quit message and quit.
|
// Send the initial quit message and quit.
|
||||||
self.tx.send(Message::Quit).unwrap();
|
self.send_quit();
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
// Wait for next `Work` or `Quit` message.
|
// Wait for next `Work` or `Quit` message.
|
||||||
value = Ok(self
|
loop {
|
||||||
.rx
|
if let Some(v) = self.recv() {
|
||||||
.recv()
|
value = Some(v);
|
||||||
.expect("channel disconnected while worker is alive"));
|
break;
|
||||||
}
|
}
|
||||||
Err(TryRecvError::Disconnected) => {
|
// Our stack isn't blocking. Instead of burning the
|
||||||
unreachable!("channel disconnected while worker is alive");
|
// CPU waiting, we let the thread sleep for a bit. In
|
||||||
|
// general, this tends to only occur once the search is
|
||||||
|
// approaching termination.
|
||||||
|
thread::sleep(Duration::from_millis(1));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1614,7 +1614,20 @@ impl<'s> Worker<'s> {
|
|||||||
/// Send work.
|
/// Send work.
|
||||||
fn send(&self, work: Work) {
|
fn send(&self, work: Work) {
|
||||||
self.num_pending.fetch_add(1, Ordering::SeqCst);
|
self.num_pending.fetch_add(1, Ordering::SeqCst);
|
||||||
self.tx.send(Message::Work(work)).unwrap();
|
let mut stack = self.stack.lock().unwrap();
|
||||||
|
stack.push(Message::Work(work));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a quit message.
|
||||||
|
fn send_quit(&self) {
|
||||||
|
let mut stack = self.stack.lock().unwrap();
|
||||||
|
stack.push(Message::Quit);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receive work.
|
||||||
|
fn recv(&self) -> Option<Message> {
|
||||||
|
let mut stack = self.stack.lock().unwrap();
|
||||||
|
stack.pop()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Signal that work has been received.
|
/// Signal that work has been received.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user