ignore: Avoid contention on num_pending

Previously, every worker would increment the shared num_pending count on
every new work item, and decrement it after finishing them, leading to
lots of contention.  Now, we only track the number of workers actively
running, so there is no contention except when workers go to sleep or
wake up.

Closes #2642
This commit is contained in:
Tavian Barnes 2023-10-30 15:56:08 -04:00 committed by Andrew Gallant
parent af55fc2b38
commit 6d7550d58e
2 changed files with 22 additions and 25 deletions

View File

@ -23,6 +23,8 @@ Performance improvements:
Make most searches with `\b` look-arounds (among others) much faster.
* [PERF #2591](https://github.com/BurntSushi/ripgrep/pull/2591):
Parallel directory traversal now uses work stealing for faster searches.
* [PERF #2642](https://github.com/BurntSushi/ripgrep/pull/2642):
Parallel directory traversal has some contention reduced.
Feature enhancements:

View File

@ -1279,7 +1279,7 @@ impl WalkParallel {
}
// Create the workers and then wait for them to finish.
let quit_now = Arc::new(AtomicBool::new(false));
let num_pending = Arc::new(AtomicUsize::new(stack.len()));
let active_workers = Arc::new(AtomicUsize::new(threads));
let stacks = Stack::new_for_each_thread(threads, stack);
std::thread::scope(|s| {
let handles: Vec<_> = stacks
@ -1288,7 +1288,7 @@ impl WalkParallel {
visitor: builder.build(),
stack,
quit_now: quit_now.clone(),
num_pending: num_pending.clone(),
active_workers: active_workers.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
@ -1471,8 +1471,8 @@ struct Worker<'s> {
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
quit_now: Arc<AtomicBool>,
/// The number of outstanding work items.
num_pending: Arc<AtomicUsize>,
/// The number of currently active workers.
active_workers: Arc<AtomicUsize>,
/// The maximum depth of directories to descend. A value of `0` means no
/// descension at all.
max_depth: Option<usize>,
@ -1500,7 +1500,6 @@ impl<'s> Worker<'s> {
if let WalkState::Quit = self.run_one(work) {
self.quit_now();
}
self.work_done();
}
}
@ -1682,23 +1681,20 @@ impl<'s> Worker<'s> {
return None;
}
None => {
// Once num_pending reaches 0, it is impossible for it to
// ever increase again. Namely, it only reaches 0 once
// all jobs have run such that no jobs have produced more
// work. We have this guarantee because num_pending is
// always incremented before each job is submitted and only
// decremented once each job is completely finished.
// Therefore, if this reaches zero, then there can be no
// other job running.
if self.num_pending() == 0 {
// Every other thread is blocked at the next recv().
// Send the initial quit message and quit.
if self.deactivate_worker() == 0 {
// If deactivate_worker() returns 0, every worker thread
// is currently within the critical section between the
// acquire in deactivate_worker() and the release in
// activate_worker() below. For this to happen, every
// worker's local deque must be simultaneously empty,
// meaning there is no more work left at all.
self.send_quit();
return None;
}
// Wait for next `Work` or `Quit` message.
loop {
if let Some(v) = self.recv() {
self.activate_worker();
value = Some(v);
break;
}
@ -1724,14 +1720,8 @@ impl<'s> Worker<'s> {
self.quit_now.load(AtomicOrdering::SeqCst)
}
/// Returns the number of pending jobs.
fn num_pending(&self) -> usize {
self.num_pending.load(AtomicOrdering::SeqCst)
}
/// Send work.
fn send(&self, work: Work) {
self.num_pending.fetch_add(1, AtomicOrdering::SeqCst);
self.stack.push(Message::Work(work));
}
@ -1745,9 +1735,14 @@ impl<'s> Worker<'s> {
self.stack.pop()
}
/// Signal that work has been finished.
fn work_done(&self) {
self.num_pending.fetch_sub(1, AtomicOrdering::SeqCst);
/// Deactivates a worker and returns the number of currently active workers.
fn deactivate_worker(&self) -> usize {
self.active_workers.fetch_sub(1, AtomicOrdering::Acquire) - 1
}
/// Reactivates a worker.
fn activate_worker(&self) {
self.active_workers.fetch_add(1, AtomicOrdering::Release);
}
}