From 9f7c2ebc09caec2282065974725f9cd2326b2355 Mon Sep 17 00:00:00 2001
From: Ed Page <eopage@gmail.com>
Date: Sat, 26 Oct 2019 19:02:10 -0600
Subject: [PATCH] ignore: allow parallel walker to borrow data

This makes it so the caller can more easily refactor from
single-threaded to multi-threaded walking. If they want to support both,
this makes it easier to do so with a single initialization code-path. In
particular, it side-steps the need to put everything into an `Arc`.

This is not a breaking change because it strictly increases the number
of allowed inputs to `WalkParallel::run`.

Closes #1410, Closes #1432
---
 Cargo.lock         |   1 +
 ignore/Cargo.toml  |   1 +
 ignore/src/walk.rs | 141 +++++++++++++++++++++++++--------------------
 3 files changed, 79 insertions(+), 64 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index adaf3f76..3c6dacb9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -238,6 +238,7 @@ name = "ignore"
 version = "0.4.11"
 dependencies = [
  "crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "globset 0.4.4",
  "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/ignore/Cargo.toml b/ignore/Cargo.toml
index 20b58016..5f201fa0 100644
--- a/ignore/Cargo.toml
+++ b/ignore/Cargo.toml
@@ -19,6 +19,7 @@ bench = false
 
 [dependencies]
 crossbeam-channel = "0.4.0"
+crossbeam-utils = "0.7.0"
 globset = { version = "0.4.3", path = "../globset" }
 lazy_static = "1.1"
 log = "0.4.5"
diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs
index e00b29a5..658c2dbb 100644
--- a/ignore/src/walk.rs
+++ b/ignore/src/walk.rs
@@ -1068,6 +1068,10 @@ impl WalkState {
     }
 }
 
+type FnVisitor<'s> = Box<
+    dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 's
+>;
+
 /// WalkParallel is a parallel recursive directory iterator over files paths
 /// in one or more directories.
 ///
@@ -1091,11 +1095,10 @@ impl WalkParallel {
     /// Execute the parallel recursive directory iterator. `mkf` is called
     /// for each thread used for iteration. The function produced by `mkf`
     /// is then in turn called for each visited file path.
-    pub fn run<F>(self, mut mkf: F)
+    pub fn run<'s, F>(mut self, mut mkf: F)
     where
-        F: FnMut() -> Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
+        F: FnMut() -> FnVisitor<'s>,
     {
-        let mut f = mkf();
         let threads = self.threads();
         // TODO: Figure out how to use a bounded channel here. With an
         // unbounded channel, the workers can run away and fill up memory
@@ -1106,21 +1109,37 @@ impl WalkParallel {
         // this. The best case scenario would be finding a way to use rayon
         // to do this.
         let (tx, rx) = channel::unbounded();
-        let mut any_work = false;
-        // Send the initial set of root paths to the pool of workers.
-        // Note that we only send directories. For files, we send to them the
-        // callback directly.
-        for path in self.paths {
-            let (dent, root_device) = if path == Path::new("-") {
-                (DirEntry::new_stdin(), None)
-            } else {
-                let root_device = if !self.same_file_system {
-                    None
+        {
+            let mut f = mkf();
+            let mut any_work = false;
+            let mut paths = Vec::new().into_iter();
+            std::mem::swap(&mut paths, &mut self.paths);
+            // Send the initial set of root paths to the pool of workers. Note
+            // that we only send directories. For files, we send to them the
+            // callback directly.
+            for path in paths {
+                let (dent, root_device) = if path == Path::new("-") {
+                    (DirEntry::new_stdin(), None)
                 } else {
-                    match device_num(&path) {
-                        Ok(root_device) => Some(root_device),
+                    let root_device = if !self.same_file_system {
+                        None
+                    } else {
+                        match device_num(&path) {
+                            Ok(root_device) => Some(root_device),
+                            Err(err) => {
+                                let err = Error::Io(err).with_path(path);
+                                if f(Err(err)).is_quit() {
+                                    return;
+                                }
+                                continue;
+                            }
+                        }
+                    };
+                    match DirEntryRaw::from_path(0, path, false) {
+                        Ok(dent) => {
+                            (DirEntry::new_raw(dent, None), root_device)
+                        }
                         Err(err) => {
-                            let err = Error::Io(err).with_path(path);
                             if f(Err(err)).is_quit() {
                                 return;
                             }
@@ -1128,56 +1147,50 @@ impl WalkParallel {
                         }
                     }
                 };
-                match DirEntryRaw::from_path(0, path, false) {
-                    Ok(dent) => (DirEntry::new_raw(dent, None), root_device),
-                    Err(err) => {
-                        if f(Err(err)).is_quit() {
-                            return;
-                        }
-                        continue;
-                    }
-                }
-            };
-            tx.send(Message::Work(Work {
-                dent: dent,
-                ignore: self.ig_root.clone(),
-                root_device: root_device,
-            }))
-            .unwrap();
-            any_work = true;
-        }
-        // ... but there's no need to start workers if we don't need them.
-        if !any_work {
-            return;
+                tx.send(Message::Work(Work {
+                    dent: dent,
+                    ignore: self.ig_root.clone(),
+                    root_device: root_device,
+                }))
+                .unwrap();
+                any_work = true;
+            }
+            // ... but there's no need to start workers if we don't need them.
+            if !any_work {
+                return;
+            }
         }
         // Create the workers and then wait for them to finish.
         let num_waiting = Arc::new(AtomicUsize::new(0));
         let num_quitting = Arc::new(AtomicUsize::new(0));
         let quit_now = Arc::new(AtomicBool::new(false));
-        let mut handles = vec![];
-        for _ in 0..threads {
-            let worker = Worker {
-                f: mkf(),
-                tx: tx.clone(),
-                rx: rx.clone(),
-                quit_now: quit_now.clone(),
-                is_waiting: false,
-                is_quitting: false,
-                num_waiting: num_waiting.clone(),
-                num_quitting: num_quitting.clone(),
-                threads: threads,
-                max_depth: self.max_depth,
-                max_filesize: self.max_filesize,
-                follow_links: self.follow_links,
-                skip: self.skip.clone(),
-            };
-            handles.push(thread::spawn(|| worker.run()));
-        }
-        drop(tx);
-        drop(rx);
-        for handle in handles {
-            handle.join().unwrap();
-        }
+        crossbeam_utils::thread::scope(|s| {
+            let mut handles = vec![];
+            for _ in 0..threads {
+                let worker = Worker {
+                    f: mkf(),
+                    tx: tx.clone(),
+                    rx: rx.clone(),
+                    quit_now: quit_now.clone(),
+                    is_waiting: false,
+                    is_quitting: false,
+                    num_waiting: num_waiting.clone(),
+                    num_quitting: num_quitting.clone(),
+                    threads: threads,
+                    max_depth: self.max_depth,
+                    max_filesize: self.max_filesize,
+                    follow_links: self.follow_links,
+                    skip: self.skip.clone(),
+                };
+                handles.push(s.spawn(|_| worker.run()));
+            }
+            drop(tx);
+            drop(rx);
+            for handle in handles {
+                handle.join().unwrap();
+            }
+        })
+        .unwrap(); // Pass along panics from threads
     }
 
     fn threads(&self) -> usize {
@@ -1267,9 +1280,9 @@ impl Work {
 /// ignore matchers, producing new work and invoking the caller's callback.
 ///
 /// Note that a worker is *both* a producer and a consumer.
-struct Worker {
+struct Worker<'s> {
     /// The caller's callback.
-    f: Box<dyn FnMut(Result<DirEntry, Error>) -> WalkState + Send + 'static>,
+    f: FnVisitor<'s>,
     /// The push side of our mpmc queue.
     tx: channel::Sender<Message>,
     /// The receive side of our mpmc queue.
@@ -1303,7 +1316,7 @@ struct Worker {
     skip: Option<Arc<Handle>>,
 }
 
-impl Worker {
+impl<'s> Worker<'s> {
     /// Runs this worker until there is no more work left to do.
     ///
     /// The worker will call the caller's callback for all entries that aren't