ignore: rework inter-thread messaging

Change the meaning of `Quit` message. Now it means terminate. The final
"dance" is unnecessary, because by the time quitting begins, no thread
will ever spawn a new `Work`. The trick was to replace the heuristic
spin-loop with blocking receive.

Closes #1337
This commit is contained in:
zsugabubus 2019-08-02 13:56:06 +02:00 committed by Andrew Gallant
parent 52d7f47420
commit 3d59bd98aa

View File

@ -6,11 +6,9 @@ use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::vec;
use channel;
use channel::{self, TryRecvError};
use same_file::Handle;
use walkdir::{self, WalkDir};
@ -1242,8 +1240,7 @@ impl WalkParallel {
}
}
// Create the workers and then wait for them to finish.
let num_waiting = Arc::new(AtomicUsize::new(0));
let num_quitting = Arc::new(AtomicUsize::new(0));
let num_running = Arc::new(AtomicUsize::new(threads));
let quit_now = Arc::new(AtomicBool::new(false));
crossbeam_utils::thread::scope(|s| {
let mut handles = vec![];
@ -1253,11 +1250,7 @@ impl WalkParallel {
tx: tx.clone(),
rx: rx.clone(),
quit_now: quit_now.clone(),
is_waiting: false,
is_quitting: false,
num_waiting: num_waiting.clone(),
num_quitting: num_quitting.clone(),
threads: threads,
num_running: num_running.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
@ -1270,8 +1263,7 @@ impl WalkParallel {
for handle in handles {
handle.join().unwrap();
}
})
.unwrap(); // Pass along panics from threads
}).unwrap(); // Pass along panics from threads
}
fn threads(&self) -> usize {
@ -1289,7 +1281,7 @@ enum Message {
/// Work items for entries that should be skipped or ignored should not
/// be produced.
Work(Work),
/// This instruction indicates that the worker should start quitting.
/// This instruction indicates that the worker should quit.
Quit,
}
@ -1368,21 +1360,12 @@ struct Worker<'s> {
tx: channel::Sender<Message>,
/// The receive side of our mpmc queue.
rx: channel::Receiver<Message>,
/// Whether all workers should quit at the next opportunity. Note that
/// this is distinct from quitting because of exhausting the contents of
/// a directory. Instead, this is used when the caller's callback indicates
/// that the iterator should quit immediately.
/// Whether all workers should terminate at the next opportunity. Note
/// that we need this because we don't want other `Work` to be done after
/// we quit. We wouldn't need this if have a priority channel.
quit_now: Arc<AtomicBool>,
/// Whether this worker is waiting for more work.
is_waiting: bool,
/// Whether this worker has started to quit.
is_quitting: bool,
/// The number of workers waiting for more work.
num_waiting: Arc<AtomicUsize>,
/// The number of workers waiting to quit.
num_quitting: Arc<AtomicUsize>,
/// The total number of workers.
threads: usize,
num_running: Arc<AtomicUsize>,
/// The maximum depth of directories to descend. A value of `0` means no
/// descension at all.
max_depth: Option<usize>,
@ -1403,20 +1386,19 @@ impl<'s> Worker<'s> {
/// The worker will call the caller's callback for all entries that aren't
/// skipped by the ignore matcher.
fn run(mut self) {
while let Some(mut work) = self.get_work() {
'get_work: while let Some(mut work) = self.get_work() {
// If the work is not a directory, then we can just execute the
// caller's callback immediately and move on.
if work.is_symlink() || !work.is_dir() {
if self.visitor.visit(Ok(work.dent)).is_quit() {
self.quit_now();
return;
}
continue;
}
if let Some(err) = work.add_parents() {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
return;
continue;
}
}
@ -1427,7 +1409,7 @@ impl<'s> Worker<'s> {
Err(err) => {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
return;
continue;
}
false
}
@ -1449,7 +1431,7 @@ impl<'s> Worker<'s> {
WalkState::Skip => continue,
WalkState::Quit => {
self.quit_now();
return;
continue;
}
}
if !descend {
@ -1461,7 +1443,6 @@ impl<'s> Worker<'s> {
Err(err) => {
if self.visitor.visit(Err(err)).is_quit() {
self.quit_now();
return;
}
continue;
}
@ -1479,7 +1460,7 @@ impl<'s> Worker<'s> {
);
if state.is_quit() {
self.quit_now();
return;
continue 'get_work;
}
}
}
@ -1568,64 +1549,43 @@ impl<'s> Worker<'s> {
/// If all work has been exhausted, then this returns None. The worker
/// should then subsequently quit.
fn get_work(&mut self) -> Option<Work> {
let mut value = self.rx.try_recv();
loop {
// Simulate a priority channel: If quit_now flag is set, we can
// receive only quit messages.
if self.is_quit_now() {
return None;
value = Ok(Message::Quit)
}
match self.rx.try_recv() {
match value {
Ok(Message::Work(work)) => {
self.waiting(false);
self.quitting(false);
return Some(work);
}
Ok(Message::Quit) => {
// We can't just quit because a Message::Quit could be
// spurious. For example, it's possible to observe that
// all workers are waiting even if there's more work to
// be done.
//
// Therefore, we do a bit of a dance to wait until all
// workers have signaled that they're ready to quit before
// actually quitting.
//
// If the Quit message turns out to be spurious, then the
// loop below will break and we'll go back to looking for
// more work.
self.waiting(true);
self.quitting(true);
while !self.is_quit_now() {
let nwait = self.num_waiting();
let nquit = self.num_quitting();
// If the number of waiting workers dropped, then
// abort our attempt to quit.
if nwait < self.threads {
break;
}
// If all workers are in this quit loop, then we
// can stop.
if nquit == self.threads {
return None;
}
// Otherwise, spin.
}
// Repeat quit message to wake up sleeping threads, if
// any. The domino effect will ensure that every thread
// will quit.
self.waiting();
self.tx.send(Message::Quit).unwrap();
return None;
}
Err(_) => {
self.waiting(true);
self.quitting(false);
if self.num_waiting() == self.threads {
for _ in 0..self.threads {
self.tx.send(Message::Quit).unwrap();
}
} else {
// You're right to consider this suspicious, but it's
// a useful heuristic to permit producers to catch up
// to consumers without burning the CPU. It is also
// useful as a means to prevent burning the CPU if only
// one worker is left doing actual work. It's not
// perfect and it doesn't leave the CPU completely
// idle, but it's not clear what else we can do. :-/
thread::sleep(Duration::from_millis(1));
Err(TryRecvError::Empty) => {
// If it was the last running thread, then no more work can
// arrive, thus we can safely start quitting. Otherwise, a
// thread may spawn new work to be done.
if self.waiting() == 1 {
// Every other thread is blocked at the next recv().
// Send the initial quit message and quit.
self.tx.send(Message::Quit).unwrap();
return None;
}
// Wait for next `Work` or `Quit` message.
value = Ok(self.rx.recv().expect(
"channel disconnected while worker is alive",
));
self.resume();
},
Err(TryRecvError::Disconnected) => {
unreachable!("channel disconnected while worker is alive");
}
}
}
@ -1641,44 +1601,15 @@ impl<'s> Worker<'s> {
self.quit_now.load(Ordering::SeqCst)
}
/// Returns the total number of workers waiting for work.
fn num_waiting(&self) -> usize {
self.num_waiting.load(Ordering::SeqCst)
/// Sets this worker's "running" state to false. Returns the previous
/// number of running workers.
fn waiting(&self) -> usize {
self.num_running.fetch_sub(1, Ordering::SeqCst)
}
/// Returns the total number of workers ready to quit.
fn num_quitting(&self) -> usize {
self.num_quitting.load(Ordering::SeqCst)
}
/// Sets this worker's "quitting" state to the value of `yes`.
fn quitting(&mut self, yes: bool) {
if yes {
if !self.is_quitting {
self.is_quitting = true;
self.num_quitting.fetch_add(1, Ordering::SeqCst);
}
} else {
if self.is_quitting {
self.is_quitting = false;
self.num_quitting.fetch_sub(1, Ordering::SeqCst);
}
}
}
/// Sets this worker's "waiting" state to the value of `yes`.
fn waiting(&mut self, yes: bool) {
if yes {
if !self.is_waiting {
self.is_waiting = true;
self.num_waiting.fetch_add(1, Ordering::SeqCst);
}
} else {
if self.is_waiting {
self.is_waiting = false;
self.num_waiting.fetch_sub(1, Ordering::SeqCst);
}
}
/// Sets this worker's "running" state to true.
fn resume(&self) {
self.num_running.fetch_add(1, Ordering::SeqCst);
}
}