From 3d59bd98aaea776092168f8feaf5daeeaf743cbf Mon Sep 17 00:00:00 2001
From: zsugabubus <zsugabubus@users.noreply.github.com>
Date: Fri, 2 Aug 2019 13:56:06 +0200
Subject: [PATCH] ignore: rework inter-thread messaging

Change the meaning of `Quit` message. Now it means terminate. The final
"dance" is unnecessary, because by the time quitting begins, no thread
will ever spawn a new `Work`. The trick was to replace the heuristic
spin-loop with blocking receive.

Closes #1337
---
 ignore/src/walk.rs | 167 +++++++++++++--------------------------------
 1 file changed, 49 insertions(+), 118 deletions(-)

diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs
index b430981f..ed321eed 100644
--- a/ignore/src/walk.rs
+++ b/ignore/src/walk.rs
@@ -6,11 +6,9 @@ use std::io;
 use std::path::{Path, PathBuf};
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use std::sync::Arc;
-use std::thread;
-use std::time::Duration;
 use std::vec;
 
-use channel;
+use channel::{self, TryRecvError};
 use same_file::Handle;
 use walkdir::{self, WalkDir};
 
@@ -1242,8 +1240,7 @@ impl WalkParallel {
             }
         }
         // Create the workers and then wait for them to finish.
-        let num_waiting = Arc::new(AtomicUsize::new(0));
-        let num_quitting = Arc::new(AtomicUsize::new(0));
+        let num_running = Arc::new(AtomicUsize::new(threads));
         let quit_now = Arc::new(AtomicBool::new(false));
         crossbeam_utils::thread::scope(|s| {
             let mut handles = vec![];
@@ -1253,11 +1250,7 @@ impl WalkParallel {
                     tx: tx.clone(),
                     rx: rx.clone(),
                     quit_now: quit_now.clone(),
-                    is_waiting: false,
-                    is_quitting: false,
-                    num_waiting: num_waiting.clone(),
-                    num_quitting: num_quitting.clone(),
-                    threads: threads,
+                    num_running: num_running.clone(),
                     max_depth: self.max_depth,
                     max_filesize: self.max_filesize,
                     follow_links: self.follow_links,
@@ -1270,8 +1263,7 @@ impl WalkParallel {
             for handle in handles {
                 handle.join().unwrap();
             }
-        })
-        .unwrap(); // Pass along panics from threads
+        }).unwrap(); // Pass along panics from threads
     }
 
     fn threads(&self) -> usize {
@@ -1289,7 +1281,7 @@ enum Message {
     /// Work items for entries that should be skipped or ignored should not
     /// be produced.
     Work(Work),
-    /// This instruction indicates that the worker should start quitting.
+    /// This instruction indicates that the worker should quit.
     Quit,
 }
 
@@ -1368,21 +1360,12 @@ struct Worker<'s> {
     tx: channel::Sender<Message>,
     /// The receive side of our mpmc queue.
     rx: channel::Receiver<Message>,
-    /// Whether all workers should quit at the next opportunity. Note that
-    /// this is distinct from quitting because of exhausting the contents of
-    /// a directory. Instead, this is used when the caller's callback indicates
-    /// that the iterator should quit immediately.
+    /// Whether all workers should terminate at the next opportunity. Note
+    /// that we need this because we don't want other `Work` to be done after
+    /// we quit. We wouldn't need this if have a priority channel.
     quit_now: Arc<AtomicBool>,
-    /// Whether this worker is waiting for more work.
-    is_waiting: bool,
-    /// Whether this worker has started to quit.
-    is_quitting: bool,
     /// The number of workers waiting for more work.
-    num_waiting: Arc<AtomicUsize>,
-    /// The number of workers waiting to quit.
-    num_quitting: Arc<AtomicUsize>,
-    /// The total number of workers.
-    threads: usize,
+    num_running: Arc<AtomicUsize>,
     /// The maximum depth of directories to descend. A value of `0` means no
     /// descension at all.
     max_depth: Option<usize>,
@@ -1403,20 +1386,19 @@ impl<'s> Worker<'s> {
     /// The worker will call the caller's callback for all entries that aren't
     /// skipped by the ignore matcher.
     fn run(mut self) {
-        while let Some(mut work) = self.get_work() {
+        'get_work: while let Some(mut work) = self.get_work() {
             // If the work is not a directory, then we can just execute the
             // caller's callback immediately and move on.
             if work.is_symlink() || !work.is_dir() {
                 if self.visitor.visit(Ok(work.dent)).is_quit() {
                     self.quit_now();
-                    return;
                 }
                 continue;
             }
             if let Some(err) = work.add_parents() {
                 if self.visitor.visit(Err(err)).is_quit() {
                     self.quit_now();
-                    return;
+                    continue;
                 }
             }
 
@@ -1427,7 +1409,7 @@ impl<'s> Worker<'s> {
                     Err(err) => {
                         if self.visitor.visit(Err(err)).is_quit() {
                             self.quit_now();
-                            return;
+                            continue;
                         }
                         false
                     }
@@ -1449,7 +1431,7 @@ impl<'s> Worker<'s> {
                 WalkState::Skip => continue,
                 WalkState::Quit => {
                     self.quit_now();
-                    return;
+                    continue;
                 }
             }
             if !descend {
@@ -1461,7 +1443,6 @@ impl<'s> Worker<'s> {
                 Err(err) => {
                     if self.visitor.visit(Err(err)).is_quit() {
                         self.quit_now();
-                        return;
                     }
                     continue;
                 }
@@ -1479,7 +1460,7 @@ impl<'s> Worker<'s> {
                 );
                 if state.is_quit() {
                     self.quit_now();
-                    return;
+                    continue 'get_work;
                 }
             }
         }
@@ -1568,64 +1549,43 @@ impl<'s> Worker<'s> {
     /// If all work has been exhausted, then this returns None. The worker
     /// should then subsequently quit.
     fn get_work(&mut self) -> Option<Work> {
+        let mut value = self.rx.try_recv();
         loop {
+            // Simulate a priority channel: If quit_now flag is set, we can
+            // receive only quit messages.
             if self.is_quit_now() {
-                return None;
+                value = Ok(Message::Quit)
             }
-            match self.rx.try_recv() {
+            match value {
                 Ok(Message::Work(work)) => {
-                    self.waiting(false);
-                    self.quitting(false);
                     return Some(work);
                 }
                 Ok(Message::Quit) => {
-                    // We can't just quit because a Message::Quit could be
-                    // spurious. For example, it's possible to observe that
-                    // all workers are waiting even if there's more work to
-                    // be done.
-                    //
-                    // Therefore, we do a bit of a dance to wait until all
-                    // workers have signaled that they're ready to quit before
-                    // actually quitting.
-                    //
-                    // If the Quit message turns out to be spurious, then the
-                    // loop below will break and we'll go back to looking for
-                    // more work.
-                    self.waiting(true);
-                    self.quitting(true);
-                    while !self.is_quit_now() {
-                        let nwait = self.num_waiting();
-                        let nquit = self.num_quitting();
-                        // If the number of waiting workers dropped, then
-                        // abort our attempt to quit.
-                        if nwait < self.threads {
-                            break;
-                        }
-                        // If all workers are in this quit loop, then we
-                        // can stop.
-                        if nquit == self.threads {
-                            return None;
-                        }
-                        // Otherwise, spin.
-                    }
+                    // Repeat quit message to wake up sleeping threads, if
+                    // any. The domino effect will ensure that every thread
+                    // will quit.
+                    self.waiting();
+                    self.tx.send(Message::Quit).unwrap();
+                    return None;
                 }
-                Err(_) => {
-                    self.waiting(true);
-                    self.quitting(false);
-                    if self.num_waiting() == self.threads {
-                        for _ in 0..self.threads {
-                            self.tx.send(Message::Quit).unwrap();
-                        }
-                    } else {
-                        // You're right to consider this suspicious, but it's
-                        // a useful heuristic to permit producers to catch up
-                        // to consumers without burning the CPU. It is also
-                        // useful as a means to prevent burning the CPU if only
-                        // one worker is left doing actual work. It's not
-                        // perfect and it doesn't leave the CPU completely
-                        // idle, but it's not clear what else we can do. :-/
-                        thread::sleep(Duration::from_millis(1));
+                Err(TryRecvError::Empty) => {
+                    // If it was the last running thread, then no more work can
+                    // arrive, thus we can safely start quitting. Otherwise, a
+                    // thread may spawn new work to be done.
+                    if self.waiting() == 1 {
+                        // Every other thread is blocked at the next recv().
+                        // Send the initial quit message and quit.
+                        self.tx.send(Message::Quit).unwrap();
+                        return None;
                     }
+                    // Wait for next `Work` or `Quit` message.
+                    value = Ok(self.rx.recv().expect(
+                        "channel disconnected while worker is alive",
+                    ));
+                    self.resume();
+                },
+                Err(TryRecvError::Disconnected) => {
+                    unreachable!("channel disconnected while worker is alive");
                 }
             }
         }
@@ -1641,44 +1601,15 @@ impl<'s> Worker<'s> {
         self.quit_now.load(Ordering::SeqCst)
     }
 
-    /// Returns the total number of workers waiting for work.
-    fn num_waiting(&self) -> usize {
-        self.num_waiting.load(Ordering::SeqCst)
+    /// Sets this worker's "running" state to false. Returns the previous
+    /// number of running workers.
+    fn waiting(&self) -> usize {
+        self.num_running.fetch_sub(1, Ordering::SeqCst)
     }
 
-    /// Returns the total number of workers ready to quit.
-    fn num_quitting(&self) -> usize {
-        self.num_quitting.load(Ordering::SeqCst)
-    }
-
-    /// Sets this worker's "quitting" state to the value of `yes`.
-    fn quitting(&mut self, yes: bool) {
-        if yes {
-            if !self.is_quitting {
-                self.is_quitting = true;
-                self.num_quitting.fetch_add(1, Ordering::SeqCst);
-            }
-        } else {
-            if self.is_quitting {
-                self.is_quitting = false;
-                self.num_quitting.fetch_sub(1, Ordering::SeqCst);
-            }
-        }
-    }
-
-    /// Sets this worker's "waiting" state to the value of `yes`.
-    fn waiting(&mut self, yes: bool) {
-        if yes {
-            if !self.is_waiting {
-                self.is_waiting = true;
-                self.num_waiting.fetch_add(1, Ordering::SeqCst);
-            }
-        } else {
-            if self.is_waiting {
-                self.is_waiting = false;
-                self.num_waiting.fetch_sub(1, Ordering::SeqCst);
-            }
-        }
+    /// Sets this worker's "running" state to true.
+    fn resume(&self) {
+        self.num_running.fetch_add(1, Ordering::SeqCst);
     }
 }