From b272be25fa7b616689384d2e4bec3e01715e2477 Mon Sep 17 00:00:00 2001 From: Andrew Gallant Date: Sat, 5 Nov 2016 21:44:15 -0400 Subject: [PATCH] Add parallel recursive directory iterator. This adds a new walk type in the `ignore` crate, `WalkParallel`, which provides a way for recursively iterating over a set of paths in parallel while respecting various ignore rules. The API is a bit strange, as a closure producing a closure isn't something one often sees, but it does seem to work well. This also allowed us to simplify much of the worker logic in ripgrep proper, where MultiWorker is now gone. --- Cargo.lock | 26 +- Cargo.toml | 1 - ignore/Cargo.toml | 1 + ignore/examples/walk.rs | 98 +++- ignore/src/dir.rs | 12 +- ignore/src/lib.rs | 62 ++- ignore/src/walk.rs | 1017 ++++++++++++++++++++++++++++++++++----- src/args.rs | 105 ++-- src/main.rs | 409 +++++++--------- src/printer.rs | 1 + src/worker.rs | 253 ++++++++++ 11 files changed, 1506 insertions(+), 479 deletions(-) create mode 100644 src/worker.rs diff --git a/Cargo.lock b/Cargo.lock index 6e171451..d6f512f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,7 +3,6 @@ name = "ripgrep" version = "0.2.6" dependencies = [ "ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "grep 0.1.3", @@ -29,6 +28,11 @@ dependencies = [ "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "crossbeam" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "ctrlc" version = "2.0.1" @@ -39,14 +43,6 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "deque" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "docopt" version = "0.6.86" @@ -109,6 +105,7 @@ dependencies = [ name = "ignore" version = "0.1.3" dependencies = [ + "crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", "globset 0.1.1", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -169,14 +166,6 @@ dependencies = [ "libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rand" -version = "0.3.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "regex" version = "0.1.80" @@ -293,8 +282,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] "checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66" +"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77f98bb69e3fefadcc5ca80a1368a55251f70295168203e01165bcaecb270891" -"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf" "checksum docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)" = "4a7ef30445607f6fc8720f0a0a2c7442284b629cf0d049286860fae23e71c4d9" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" @@ -306,7 +295,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" "checksum memmap 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "065ce59af31c18ea2c419100bda6247dd4ec3099423202b12f0bd32e529fabd2" "checksum num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8890e6084723d57d0df8d2720b0d60c6ee67d6c93e7169630e4371e88765dcad" -"checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5" "checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f" "checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957" "checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b" diff --git a/Cargo.toml b/Cargo.toml index daec1490..95cc97d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ path = "tests/tests.rs" [dependencies] ctrlc = "2.0" -deque = "0.3" docopt = "0.6" env_logger = "0.3" grep = { version = "0.1.3", path = "grep" } diff --git a/ignore/Cargo.toml b/ignore/Cargo.toml index 1d336441..635299f9 100644 --- a/ignore/Cargo.toml +++ b/ignore/Cargo.toml @@ -18,6 +18,7 @@ name = "ignore" bench = false [dependencies] +crossbeam = "0.2" globset = { version = "0.1.1", path = "../globset" } lazy_static = "0.2" log = "0.3" diff --git a/ignore/examples/walk.rs b/ignore/examples/walk.rs index 0ce0a086..0ff4ea94 100644 --- a/ignore/examples/walk.rs +++ b/ignore/examples/walk.rs @@ -1,28 +1,92 @@ -/* +#![allow(dead_code, unused_imports, unused_mut, unused_variables)] + +extern crate crossbeam; extern crate ignore; extern crate walkdir; use std::env; use std::io::{self, Write}; -use std::os::unix::ffi::OsStrExt; +use std::path::Path; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; -use ignore::ignore::IgnoreBuilder; +use crossbeam::sync::MsQueue; +use ignore::WalkBuilder; use walkdir::WalkDir; fn main() { - let path = env::args().nth(1).unwrap(); - let ig = IgnoreBuilder::new().build(); - let wd = WalkDir::new(path); - let walker = ignore::walk::Iter::new(ig, wd); - - let mut stdout = io::BufWriter::new(io::stdout()); - // let mut count = 0; - for dirent in walker { - // count += 1; - stdout.write(dirent.path().as_os_str().as_bytes()).unwrap(); - stdout.write(b"\n").unwrap(); + let mut path = env::args().nth(1).unwrap(); + let mut parallel = false; + let mut simple = false; + let queue: Arc>> = Arc::new(MsQueue::new()); + if path == "parallel" { + path = env::args().nth(2).unwrap(); + parallel = true; + } else if path == "walkdir" { + path = env::args().nth(2).unwrap(); + simple = true; } - // println!("{}", count); + + let stdout_queue = queue.clone(); + let stdout_thread = thread::spawn(move || { + let mut stdout = io::BufWriter::new(io::stdout()); + while let Some(dent) = stdout_queue.pop() { + write_path(&mut stdout, dent.path()); + } + }); + + if parallel { + let walker = WalkBuilder::new(path).threads(6).build_parallel(); + walker.run(|| { + let queue = queue.clone(); + Box::new(move |result| { + use ignore::WalkState::*; + + queue.push(Some(DirEntry::Y(result.unwrap()))); + Continue + }) + }); + } else if simple { + let mut stdout = io::BufWriter::new(io::stdout()); + let walker = WalkDir::new(path); + for result in walker { + queue.push(Some(DirEntry::X(result.unwrap()))); + } + } else { + let mut stdout = io::BufWriter::new(io::stdout()); + let walker = WalkBuilder::new(path).build(); + for result in walker { + queue.push(Some(DirEntry::Y(result.unwrap()))); + } + } + queue.push(None); + stdout_thread.join().unwrap(); +} + +enum DirEntry { + X(walkdir::DirEntry), + Y(ignore::DirEntry), +} + +impl DirEntry { + fn path(&self) -> &Path { + match *self { + DirEntry::X(ref x) => x.path(), + DirEntry::Y(ref y) => y.path(), + } + } +} + +#[cfg(unix)] +fn write_path(mut wtr: W, path: &Path) { + use std::os::unix::ffi::OsStrExt; + wtr.write(path.as_os_str().as_bytes()).unwrap(); + wtr.write(b"\n").unwrap(); +} + +#[cfg(not(unix))] +fn write_path(mut wtr: W, path: &Path) { + wtr.write(path.to_string_lossy().as_bytes()).unwrap(); + wtr.write(b"\n").unwrap(); } -*/ -fn main() {} diff --git a/ignore/src/dir.rs b/ignore/src/dir.rs index 6ac00627..496664f3 100644 --- a/ignore/src/dir.rs +++ b/ignore/src/dir.rs @@ -137,6 +137,11 @@ impl Ignore { self.0.parent.is_none() } + /// Returns true if this matcher was added via the `add_parents` method. + pub fn is_absolute_parent(&self) -> bool { + self.0.is_absolute_parent + } + /// Return this matcher's parent, if one exists. pub fn parent(&self) -> Option { self.0.parent.clone() @@ -376,7 +381,7 @@ impl Ignore { } /// Returns an iterator over parent ignore matchers, including this one. - fn parents(&self) -> Parents { + pub fn parents(&self) -> Parents { Parents(Some(self)) } @@ -387,7 +392,10 @@ impl Ignore { } } -struct Parents<'a>(Option<&'a Ignore>); +/// An iterator over all parents of an ignore matcher, including itself. +/// +/// The lifetime `'a` refers to the lifetime of the initial `Ignore` matcher. +pub struct Parents<'a>(Option<&'a Ignore>); impl<'a> Iterator for Parents<'a> { type Item = &'a Ignore; diff --git a/ignore/src/lib.rs b/ignore/src/lib.rs index a3aa0c8f..d489712d 100644 --- a/ignore/src/lib.rs +++ b/ignore/src/lib.rs @@ -44,6 +44,7 @@ for result in WalkBuilder::new("./").hidden(false).build() { See the documentation for `WalkBuilder` for many other options. */ +extern crate crossbeam; extern crate globset; #[macro_use] extern crate lazy_static; @@ -61,7 +62,7 @@ use std::fmt; use std::io; use std::path::{Path, PathBuf}; -pub use walk::{DirEntry, Walk, WalkBuilder}; +pub use walk::{DirEntry, Walk, WalkBuilder, WalkParallel, WalkState}; mod dir; pub mod gitignore; @@ -80,6 +81,12 @@ pub enum Error { WithLineNumber { line: u64, err: Box }, /// An error associated with a particular file path. WithPath { path: PathBuf, err: Box }, + /// An error associated with a particular directory depth when recursively + /// walking a directory. + WithDepth { depth: usize, err: Box }, + /// An error that occurs when a file loop is detected when traversing + /// symbolic links. + Loop { ancestor: PathBuf, child: PathBuf }, /// An error that occurs when doing I/O, such as reading an ignore file. Io(io::Error), /// An error that occurs when trying to parse a glob. @@ -101,6 +108,7 @@ impl Error { Error::Partial(_) => true, Error::WithLineNumber { ref err, .. } => err.is_partial(), Error::WithPath { ref err, .. } => err.is_partial(), + Error::WithDepth { ref err, .. } => err.is_partial(), _ => false, } } @@ -111,6 +119,8 @@ impl Error { Error::Partial(ref errs) => errs.len() == 1 && errs[0].is_io(), Error::WithLineNumber { ref err, .. } => err.is_io(), Error::WithPath { ref err, .. } => err.is_io(), + Error::WithDepth { ref err, .. } => err.is_io(), + Error::Loop { .. } => false, Error::Io(_) => true, Error::Glob(_) => false, Error::UnrecognizedFileType(_) => false, @@ -118,6 +128,16 @@ impl Error { } } + /// Returns a depth associated with recursively walking a directory (if + /// this error was generated from a recursive directory iterator). + pub fn depth(&self) -> Option { + match *self { + Error::WithPath { ref err, .. } => err.depth(), + Error::WithDepth { depth, .. } => Some(depth), + _ => None, + } + } + /// Turn an error into a tagged error with the given file path. fn with_path>(self, path: P) -> Error { Error::WithPath { @@ -126,6 +146,14 @@ impl Error { } } + /// Turn an error into a tagged error with the given depth. + fn with_depth(self, depth: usize) -> Error { + Error::WithDepth { + depth: depth, + err: Box::new(self), + } + } + /// Turn an error into a tagged error with the given file path and line /// number. If path is empty, then it is omitted from the error. fn tagged>(self, path: P, lineno: u64) -> Error { @@ -146,6 +174,8 @@ impl error::Error for Error { Error::Partial(_) => "partial error", Error::WithLineNumber { ref err, .. } => err.description(), Error::WithPath { ref err, .. } => err.description(), + Error::WithDepth { ref err, .. } => err.description(), + Error::Loop { .. } => "file system loop found", Error::Io(ref err) => err.description(), Error::Glob(ref msg) => msg, Error::UnrecognizedFileType(_) => "unrecognized file type", @@ -168,6 +198,12 @@ impl fmt::Display for Error { Error::WithPath { ref path, ref err } => { write!(f, "{}: {}", path.display(), err) } + Error::WithDepth { ref err, .. } => err.fmt(f), + Error::Loop { ref ancestor, ref child } => { + write!(f, "File system loop found: \ + {} points to an ancestor {}", + child.display(), ancestor.display()) + } Error::Io(ref err) => err.fmt(f), Error::Glob(ref msg) => write!(f, "{}", msg), Error::UnrecognizedFileType(ref ty) => { @@ -187,6 +223,30 @@ impl From for Error { } } +impl From for Error { + fn from(err: walkdir::Error) -> Error { + let depth = err.depth(); + if let (Some(anc), Some(child)) = (err.loop_ancestor(), err.path()) { + return Error::WithDepth { + depth: depth, + err: Box::new(Error::Loop { + ancestor: anc.to_path_buf(), + child: child.to_path_buf(), + }), + }; + } + let path = err.path().map(|p| p.to_path_buf()); + let mut ig_err = Error::Io(io::Error::from(err)); + if let Some(path) = path { + ig_err = Error::WithPath { + path: path, + err: Box::new(ig_err), + }; + } + ig_err + } +} + #[derive(Debug, Default)] struct PartialErrorBuilder(Vec); diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs index 0bcc6136..a1ac2de5 100644 --- a/ignore/src/walk.rs +++ b/ignore/src/walk.rs @@ -1,10 +1,15 @@ use std::ffi::OsStr; -use std::fs::{FileType, Metadata}; +use std::fmt; +use std::fs::{self, FileType, Metadata}; use std::io; use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::thread; use std::vec; -use walkdir::{self, WalkDir, WalkDirIterator}; +use crossbeam::sync::MsQueue; +use walkdir::{self, WalkDir, WalkDirIterator, is_same_file}; use dir::{Ignore, IgnoreBuilder}; use gitignore::GitignoreBuilder; @@ -12,6 +17,278 @@ use overrides::Override; use types::Types; use {Error, PartialErrorBuilder}; +/// A directory entry with a possible error attached. +/// +/// The error typically refers to a problem parsing ignore files in a +/// particular directory. +#[derive(Debug)] +pub struct DirEntry { + dent: DirEntryInner, + err: Option, +} + +impl DirEntry { + /// The full path that this entry represents. + pub fn path(&self) -> &Path { + self.dent.path() + } + + /// Whether this entry corresponds to a symbolic link or not. + pub fn path_is_symbolic_link(&self) -> bool { + self.dent.path_is_symbolic_link() + } + + /// Returns true if and only if this entry corresponds to stdin. + /// + /// i.e., The entry has depth 0 and its file name is `-`. + pub fn is_stdin(&self) -> bool { + self.dent.is_stdin() + } + + /// Return the metadata for the file that this entry points to. + pub fn metadata(&self) -> Result { + self.dent.metadata() + } + + /// Return the file type for the file that this entry points to. + /// + /// This entry doesn't have a file type if it corresponds to stdin. + pub fn file_type(&self) -> Option { + self.dent.file_type() + } + + /// Return the file name of this entry. + /// + /// If this entry has no file name (e.g., `/`), then the full path is + /// returned. + pub fn file_name(&self) -> &OsStr { + self.dent.file_name() + } + + /// Returns the depth at which this entry was created relative to the root. + pub fn depth(&self) -> usize { + self.dent.depth() + } + + /// Returns an error, if one exists, associated with processing this entry. + /// + /// An example of an error is one that occurred while parsing an ignore + /// file. + pub fn error(&self) -> Option<&Error> { + self.err.as_ref() + } + + fn new_stdin() -> DirEntry { + DirEntry { + dent: DirEntryInner::Stdin, + err: None, + } + } + + fn new_walkdir(dent: walkdir::DirEntry, err: Option) -> DirEntry { + DirEntry { + dent: DirEntryInner::Walkdir(dent), + err: err, + } + } + + fn new_raw(dent: DirEntryRaw, err: Option) -> DirEntry { + DirEntry { + dent: DirEntryInner::Raw(dent), + err: err, + } + } +} + +/// DirEntryInner is the implementation of DirEntry. +/// +/// It specifically represents three distinct sources of directory entries: +/// +/// 1. From the walkdir crate. +/// 2. Special entries that represent things like stdin. +/// 3. From a path. +/// +/// Specifically, (3) has to essentially re-create the DirEntry implementation +/// from WalkDir. +#[derive(Debug)] +enum DirEntryInner { + Stdin, + Walkdir(walkdir::DirEntry), + Raw(DirEntryRaw), +} + +impl DirEntryInner { + fn path(&self) -> &Path { + use self::DirEntryInner::*; + match *self { + Stdin => Path::new(""), + Walkdir(ref x) => x.path(), + Raw(ref x) => x.path(), + } + } + + fn path_is_symbolic_link(&self) -> bool { + use self::DirEntryInner::*; + match *self { + Stdin => false, + Walkdir(ref x) => x.path_is_symbolic_link(), + Raw(ref x) => x.path_is_symbolic_link(), + } + } + + fn is_stdin(&self) -> bool { + match *self { + DirEntryInner::Stdin => true, + _ => false, + } + } + + fn metadata(&self) -> Result { + use self::DirEntryInner::*; + match *self { + Stdin => { + let err = Error::Io(io::Error::new( + io::ErrorKind::Other, " has no metadata")); + Err(err.with_path("")) + } + Walkdir(ref x) => { + x.metadata().map_err(|err| { + Error::Io(io::Error::from(err)).with_path(x.path()) + }) + } + Raw(ref x) => x.metadata(), + } + } + + fn file_type(&self) -> Option { + use self::DirEntryInner::*; + match *self { + Stdin => None, + Walkdir(ref x) => Some(x.file_type()), + Raw(ref x) => Some(x.file_type()), + } + } + + fn file_name(&self) -> &OsStr { + use self::DirEntryInner::*; + match *self { + Stdin => OsStr::new(""), + Walkdir(ref x) => x.file_name(), + Raw(ref x) => x.file_name(), + } + } + + fn depth(&self) -> usize { + use self::DirEntryInner::*; + match *self { + Stdin => 0, + Walkdir(ref x) => x.depth(), + Raw(ref x) => x.depth(), + } + } +} + +/// DirEntryRaw is essentially copied from the walkdir crate so that we can +/// build `DirEntry`s from whole cloth in the parallel iterator. +struct DirEntryRaw { + /// The path as reported by the `fs::ReadDir` iterator (even if it's a + /// symbolic link). + path: PathBuf, + /// The file type. Necessary for recursive iteration, so store it. + ty: FileType, + /// Is set when this entry was created from a symbolic link and the user + /// expects the iterator to follow symbolic links. + follow_link: bool, + /// The depth at which this entry was generated relative to the root. + depth: usize, +} + +impl fmt::Debug for DirEntryRaw { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // Leaving out FileType because it doesn't have a debug impl + // in Rust 1.9. We could add it if we really wanted to by manually + // querying each possibly file type. Meh. ---AG + f.debug_struct("DirEntryRaw") + .field("path", &self.path) + .field("follow_link", &self.follow_link) + .field("depth", &self.depth) + .finish() + } +} + +impl DirEntryRaw { + fn path(&self) -> &Path { + &self.path + } + + fn path_is_symbolic_link(&self) -> bool { + self.ty.is_symlink() || self.follow_link + } + + fn metadata(&self) -> Result { + if self.follow_link { + fs::metadata(&self.path) + } else { + fs::symlink_metadata(&self.path) + }.map_err(|err| Error::Io(io::Error::from(err)).with_path(&self.path)) + } + + fn file_type(&self) -> FileType { + self.ty + } + + fn file_name(&self) -> &OsStr { + self.path.file_name().unwrap_or_else(|| self.path.as_os_str()) + } + + fn depth(&self) -> usize { + self.depth + } + + fn from_entry( + depth: usize, + ent: &fs::DirEntry, + ) -> Result { + let ty = try!(ent.file_type().map_err(|err| { + let err = Error::Io(io::Error::from(err)).with_path(ent.path()); + Error::WithDepth { + depth: depth, + err: Box::new(err), + } + })); + Ok(DirEntryRaw { + path: ent.path(), + ty: ty, + follow_link: false, + depth: depth, + }) + } + + fn from_link(depth: usize, pb: PathBuf) -> Result { + let md = try!(fs::metadata(&pb).map_err(|err| { + Error::Io(err).with_path(&pb) + })); + Ok(DirEntryRaw { + path: pb, + ty: md.file_type(), + follow_link: true, + depth: depth, + }) + } + + fn from_path(depth: usize, pb: PathBuf) -> Result { + let md = try!(fs::symlink_metadata(&pb).map_err(|err| { + Error::Io(err).with_path(&pb) + })); + Ok(DirEntryRaw { + path: pb, + ty: md.file_type(), + follow_link: false, + depth: depth, + }) + } +} + /// WalkBuilder builds a recursive directory iterator. /// /// The builder supports a large number of configurable options. This includes @@ -58,12 +335,14 @@ use {Error, PartialErrorBuilder}; /// path is skipped. /// * Sixth, if the path has made it this far then it is yielded in the /// iterator. +#[derive(Clone, Debug)] pub struct WalkBuilder { paths: Vec, ig_builder: IgnoreBuilder, parents: bool, max_depth: Option, follow_links: bool, + threads: usize, } impl WalkBuilder { @@ -80,6 +359,7 @@ impl WalkBuilder { parents: true, max_depth: None, follow_links: false, + threads: 0, } } @@ -109,6 +389,22 @@ impl WalkBuilder { } } + /// Build a new `WalkParallel` iterator. + /// + /// Note that this *doesn't* return something that implements `Iterator`. + /// Instead, the returned value must be run with a closure. e.g., + /// `builder.build_parallel().run(|| |path| println!("{:?}", path))`. + pub fn build_parallel(&self) -> WalkParallel { + WalkParallel { + paths: self.paths.clone().into_iter(), + ig_root: self.ig_builder.build(), + max_depth: self.max_depth, + follow_links: self.follow_links, + parents: self.parents, + threads: self.threads, + } + } + /// Add a file path to the iterator. /// /// Each additional file path added is traversed recursively. This should @@ -133,6 +429,17 @@ impl WalkBuilder { self } + /// The number of threads to use for traversal. + /// + /// Note that this only has an effect when using `build_parallel`. + /// + /// The default setting is `0`, which chooses the number of threads + /// automatically using heuristics. + pub fn threads(&mut self, n: usize) -> &mut WalkBuilder { + self.threads = n; + self + } + /// Add an ignore file to the matcher. /// /// This has lower precedence than all other sources of ignore rules. @@ -239,7 +546,8 @@ impl WalkBuilder { } } -/// Walk is a recursive directory iterator over file paths in a directory. +/// Walk is a recursive directory iterator over file paths in one or more +/// directories. /// /// Only file and directory paths matching the rules are returned. By default, /// ignore files like `.gitignore` are respected. The precise matching rules @@ -264,17 +572,9 @@ impl Walk { fn skip_entry(&self, ent: &walkdir::DirEntry) -> bool { if ent.depth() == 0 { - // Never skip the root directory. return false; } - let m = self.ig.matched(ent.path(), ent.file_type().is_dir()); - if m.is_ignore() { - debug!("ignoring {}: {:?}", ent.path().display(), m); - return true; - } else if m.is_whitelist() { - debug!("whitelisting {}: {:?}", ent.path().display(), m); - } - false + skip_path(&self.ig, ent.path(), ent.file_type().is_dir()) } } @@ -290,10 +590,7 @@ impl Iterator for Walk { match self.its.next() { None => return None, Some((_, None)) => { - return Some(Ok(DirEntry { - dent: None, - err: None, - })); + return Some(Ok(DirEntry::new_stdin())); } Some((path, Some(it))) => { self.it = Some(it); @@ -313,15 +610,7 @@ impl Iterator for Walk { }; match ev { Err(err) => { - let path = err.path().map(|p| p.to_path_buf()); - let mut ig_err = Error::Io(io::Error::from(err)); - if let Some(path) = path { - ig_err = Error::WithPath { - path: path.to_path_buf(), - err: Box::new(ig_err), - }; - } - return Some(Err(ig_err)); + return Some(Err(Error::from(err))); } Ok(WalkEvent::Exit) => { self.ig = self.ig.parent().unwrap(); @@ -338,98 +627,19 @@ impl Iterator for Walk { } let (igtmp, err) = self.ig.add_child(ent.path()); self.ig = igtmp; - return Some(Ok(DirEntry { dent: Some(ent), err: err })); + return Some(Ok(DirEntry::new_walkdir(ent, err))); } Ok(WalkEvent::File(ent)) => { if self.skip_entry(&ent) { continue; } - // If this isn't actually a file (e.g., a symlink), - // then skip it. - if !ent.file_type().is_file() { - continue; - } - return Some(Ok(DirEntry { dent: Some(ent), err: None })); + return Some(Ok(DirEntry::new_walkdir(ent, None))); } } } } } -/// A directory entry with a possible error attached. -/// -/// The error typically refers to a problem parsing ignore files in a -/// particular directory. -#[derive(Debug)] -pub struct DirEntry { - dent: Option, - err: Option, -} - -impl DirEntry { - /// The full path that this entry represents. - pub fn path(&self) -> &Path { - self.dent.as_ref().map_or(Path::new(""), |x| x.path()) - } - - /// Whether this entry corresponds to a symbolic link or not. - pub fn path_is_symbolic_link(&self) -> bool { - self.dent.as_ref().map_or(false, |x| x.path_is_symbolic_link()) - } - - /// Returns true if and only if this entry corresponds to stdin. - /// - /// i.e., The entry has depth 0 and its file name is `-`. - pub fn is_stdin(&self) -> bool { - self.dent.is_none() - } - - /// Return the metadata for the file that this entry points to. - pub fn metadata(&self) -> Result { - if let Some(dent) = self.dent.as_ref() { - dent.metadata().map_err(|err| Error::WithPath { - path: self.path().to_path_buf(), - err: Box::new(Error::Io(io::Error::from(err))), - }) - } else { - let ioerr = io::Error::new( - io::ErrorKind::Other, "stdin has no metadata"); - Err(Error::WithPath { - path: Path::new("").to_path_buf(), - err: Box::new(Error::Io(ioerr)), - }) - } - } - - /// Return the file type for the file that this entry points to. - /// - /// This entry doesn't have a file type if it corresponds to stdin. - pub fn file_type(&self) -> Option { - self.dent.as_ref().map(|x| x.file_type()) - } - - /// Return the file name of this entry. - /// - /// If this entry has no file name (e.g., `/`), then the full path is - /// returned. - pub fn file_name(&self) -> &OsStr { - self.dent.as_ref().map_or(OsStr::new(""), |x| x.file_name()) - } - - /// Returns the depth at which this entry was created relative to the root. - pub fn depth(&self) -> usize { - self.dent.as_ref().map_or(0, |x| x.depth()) - } - - /// Returns an error, if one exists, associated with processing this entry. - /// - /// An example of an error is one that occurred while parsing an ignore - /// file. - pub fn error(&self) -> Option<&Error> { - self.err.as_ref() - } -} - /// WalkEventIter transforms a WalkDir iterator into an iterator that more /// accurately describes the directory tree. Namely, it emits events that are /// one of three types: directory, file or "exit." An "exit" event means that @@ -485,21 +695,497 @@ impl Iterator for WalkEventIter { } } +/// WalkState is used in the parallel recursive directory iterator to indicate +/// whether walking should continue as normal, skip descending into a +/// particular directory or quit the walk entirely. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum WalkState { + /// Continue walking as normal. + Continue, + /// If the directory entry given is a directory, don't descend into it. + /// In all other cases, this has no effect. + Skip, + /// Quit the entire iterator as soon as possible. + /// + /// Note that this is an inherently asynchronous action. It is possible + /// for more entries to be yielded even after instructing the iterator + /// to quit. + Quit, +} + +impl WalkState { + fn is_quit(&self) -> bool { + *self == WalkState::Quit + } +} + +/// WalkParallel is a parallel recursive directory iterator over files paths +/// in one or more directories. +/// +/// Only file and directory paths matching the rules are returned. By default, +/// ignore files like `.gitignore` are respected. The precise matching rules +/// and precedence is explained in the documentation for `WalkBuilder`. +/// +/// Unlike `Walk`, this uses multiple threads for traversing a directory. +pub struct WalkParallel { + paths: vec::IntoIter, + ig_root: Ignore, + parents: bool, + max_depth: Option, + follow_links: bool, + threads: usize, +} + +impl WalkParallel { + /// Execute the parallel recursive directory iterator. `mkf` is called + /// for each thread used for iteration. The function produced by `mkf` + /// is then in turn called for each visited file path. + pub fn run( + self, + mut mkf: F, + ) where F: FnMut() -> Box) -> WalkState + Send + 'static> { + let mut f = mkf(); + let threads = self.threads(); + let queue = Arc::new(MsQueue::new()); + let mut any_dirs = false; + // Send the initial set of root paths to the pool of workers. + // Note that we only send directories. For files, we send to them the + // callback directly. + for path in self.paths { + if path == Path::new("-") { + if f(Ok(DirEntry::new_stdin())).is_quit() { + return; + } + continue; + } + let dent = match DirEntryRaw::from_path(0, path) { + Ok(dent) => DirEntry::new_raw(dent, None), + Err(err) => { + if f(Err(err)).is_quit() { + return; + } + continue; + } + }; + if !dent.file_type().map_or(false, |t| t.is_dir()) { + if f(Ok(dent)).is_quit() { + return; + } + } else { + any_dirs = true; + queue.push(Message::Work(Work { + dent: dent, + ignore: self.ig_root.clone(), + })); + } + } + // ... but there's no need to start workers if we don't need them. + if !any_dirs { + return; + } + // Create the workers and then wait for them to finish. + let num_waiting = Arc::new(AtomicUsize::new(0)); + let num_quitting = Arc::new(AtomicUsize::new(0)); + let mut handles = vec![]; + for _ in 0..threads { + let worker = Worker { + f: mkf(), + queue: queue.clone(), + quit_now: Arc::new(AtomicBool::new(false)), + is_waiting: false, + is_quitting: false, + num_waiting: num_waiting.clone(), + num_quitting: num_quitting.clone(), + threads: threads, + parents: self.parents, + max_depth: self.max_depth, + follow_links: self.follow_links, + }; + handles.push(thread::spawn(|| worker.run())); + } + for handle in handles { + handle.join().unwrap(); + } + } + + fn threads(&self) -> usize { + if self.threads == 0 { + 2 + } else { + self.threads + } + } +} + +/// Message is the set of instructions that a worker knows how to process. +enum Message { + /// A work item corresponds to a directory that should be descended into. + /// Work items for entries that should be skipped or ignored should not + /// be produced. + Work(Work), + /// This instruction indicates that the worker should start quitting. + Quit, +} + +/// A unit of work for each worker to process. +/// +/// Each unit of work corresponds to a directory that should be descended +/// into. +struct Work { + /// The directory entry. + dent: DirEntry, + /// Any ignore matchers that have been built for this directory's parents. + ignore: Ignore, +} + +impl Work { + /// Adds ignore rules for parent directories. + /// + /// Note that this only applies to entries at depth 0. On all other + /// entries, this is a no-op. + fn add_parents(&mut self) -> Option { + if self.dent.depth() > 0 { + return None; + } + // At depth 0, the path of this entry is a root path, so we can + // use it directly to add parent ignore rules. + let (ig, err) = self.ignore.add_parents(self.dent.path()); + self.ignore = ig; + err + } + + /// Reads the directory contents of this work item and adds ignore + /// rules for this directory. + /// + /// If there was a problem with reading the directory contents, then + /// an error is returned. If there was a problem reading the ignore + /// rules for this directory, then the error is attached to this + /// work item's directory entry. + fn read_dir(&mut self) -> Result { + let readdir = match fs::read_dir(self.dent.path()) { + Ok(readdir) => readdir, + Err(err) => { + let err = Error::from(err) + .with_path(self.dent.path()) + .with_depth(self.dent.depth()); + return Err(err); + } + }; + let (ig, err) = self.ignore.add_child(self.dent.path()); + self.ignore = ig; + self.dent.err = err; + Ok(readdir) + } +} + +/// A worker is responsible for descending into directories, updating the +/// ignore matchers, producing new work and invoking the caller's callback. +/// +/// Note that a worker is *both* a producer and a consumer. +struct Worker { + /// The caller's callback. + f: Box) -> WalkState + Send + 'static>, + /// A queue of work items. This is multi-producer and multi-consumer. + queue: Arc>, + /// Whether all workers should quit at the next opportunity. Note that + /// this is distinct from quitting because of exhausting the contents of + /// a directory. Instead, this is used when the caller's callback indicates + /// that the iterator should quit immediately. + quit_now: Arc, + /// Whether this worker is waiting for more work. + is_waiting: bool, + /// Whether this worker has started to quit. + is_quitting: bool, + /// The number of workers waiting for more work. + num_waiting: Arc, + /// The number of workers waiting to quit. + num_quitting: Arc, + /// The total number of workers. + threads: usize, + /// Whether to create ignore matchers for parents of caller specified + /// directories. + parents: bool, + /// The maximum depth of directories to descend. A value of `0` means no + /// descension at all. + max_depth: Option, + /// Whether to follow symbolic links or not. When this is enabled, loop + /// detection is performed. + follow_links: bool, +} + +impl Worker { + /// Runs this worker until there is no more work left to do. + /// + /// The worker will call the caller's callback for all entries that aren't + /// skipped by the ignore matcher. + fn run(mut self) { + while let Some(mut work) = self.get_work() { + let depth = work.dent.depth(); + if self.parents { + if let Some(err) = work.add_parents() { + if (self.f)(Err(err)).is_quit() { + self.quit_now(); + return; + } + } + } + let readdir = match work.read_dir() { + Ok(readdir) => readdir, + Err(err) => { + if (self.f)(Err(err)).is_quit() { + self.quit_now(); + return; + } + continue; + } + }; + match (self.f)(Ok(work.dent)) { + WalkState::Continue => {} + WalkState::Skip => continue, + WalkState::Quit => { + self.quit_now(); + return; + } + } + if self.max_depth.map_or(false, |max| depth >= max) { + continue; + } + for result in readdir { + if self.run_one(&work.ignore, depth + 1, result).is_quit() { + self.quit_now(); + return; + } + } + } + } + + /// Runs the worker on a single entry from a directory iterator. + /// + /// If the entry is a path that should be ignored, then this is a no-op. + /// Otherwise, if the entry is a directory, then it is pushed on to the + /// queue. If the entry isn't a directory, the caller's callback is + /// applied. + /// + /// If an error occurs while reading the entry, then it is sent to the + /// caller's callback. + /// + /// `ig` is the `Ignore` matcher for the parent directory. `depth` should + /// be the depth of this entry. `result` should be the item yielded by + /// a directory iterator. + fn run_one( + &mut self, + ig: &Ignore, + depth: usize, + result: Result, + ) -> WalkState { + let fs_dent = match result { + Ok(fs_dent) => fs_dent, + Err(err) => { + return (self.f)(Err(Error::from(err).with_depth(depth))); + } + }; + let mut dent = match DirEntryRaw::from_entry(depth, &fs_dent) { + Ok(dent) => DirEntry::new_raw(dent, None), + Err(err) => { + return (self.f)(Err(err)); + } + }; + let is_symlink = dent.file_type().map_or(false, |ft| ft.is_symlink()); + if self.follow_links && is_symlink { + let path = dent.path().to_path_buf(); + dent = match DirEntryRaw::from_link(depth, path) { + Ok(dent) => DirEntry::new_raw(dent, None), + Err(err) => { + return (self.f)(Err(err)); + } + }; + if dent.file_type().map_or(false, |ft| ft.is_dir()) { + if let Err(err) = check_symlink_loop(ig, dent.path(), depth) { + return (self.f)(Err(err)); + } + } + } + let is_dir = dent.file_type().map_or(false, |ft| ft.is_dir()); + if skip_path(ig, dent.path(), is_dir) { + WalkState::Continue + } else if !is_dir { + (self.f)(Ok(dent)) + } else { + self.queue.push(Message::Work(Work { + dent: dent, + ignore: ig.clone(), + })); + WalkState::Continue + } + } + + /// Returns the next directory to descend into. + /// + /// If all work has been exhausted, then this returns None. The worker + /// should then subsequently quit. + fn get_work(&mut self) -> Option { + loop { + if self.is_quit_now() { + return None; + } + match self.queue.try_pop() { + Some(Message::Work(work)) => { + self.waiting(false); + self.quitting(false); + return Some(work); + } + Some(Message::Quit) => { + // We can't just quit because a Message::Quit could be + // spurious. For example, it's possible to observe that + // all workers are waiting even if there's more work to + // be done. + // + // Therefore, we do a bit of a dance to wait until all + // workers have signaled that they're ready to quit before + // actually quitting. + // + // If the Quit message turns out to be spurious, then the + // loop below will break and we'll go back to looking for + // more work. + self.waiting(true); + self.quitting(true); + while !self.is_quit_now() { + let nwait = self.num_waiting(); + let nquit = self.num_quitting(); + // If the number of waiting workers dropped, then + // abort our attempt to quit. + if nwait < self.threads { + break; + } + // If all workers are in this quit loop, then we + // can stop. + if nquit == self.threads { + return None; + } + // Otherwise, spin. + } + // If we're here, then we've aborted our quit attempt. + continue; + } + None => { + self.waiting(true); + self.quitting(false); + if self.num_waiting() == self.threads { + for _ in 0..self.threads { + self.queue.push(Message::Quit); + } + } + continue; + } + } + } + } + + /// Indicates that all workers should quit immediately. + fn quit_now(&self) { + self.quit_now.store(true, Ordering::SeqCst); + } + + /// Returns true if this worker should quit immediately. + fn is_quit_now(&self) -> bool { + self.quit_now.load(Ordering::SeqCst) + } + + /// Returns the total number of workers waiting for work. + fn num_waiting(&self) -> usize { + self.num_waiting.load(Ordering::SeqCst) + } + + /// Returns the total number of workers ready to quit. + fn num_quitting(&self) -> usize { + self.num_quitting.load(Ordering::SeqCst) + } + + /// Sets this worker's "quitting" state to the value of `yes`. + fn quitting(&mut self, yes: bool) { + if yes { + if !self.is_quitting { + self.is_quitting = true; + self.num_quitting.fetch_add(1, Ordering::SeqCst); + } + } else { + if self.is_quitting { + self.is_quitting = false; + self.num_quitting.fetch_sub(1, Ordering::SeqCst); + } + } + } + + /// Sets this worker's "waiting" state to the value of `yes`. + fn waiting(&mut self, yes: bool) { + if yes { + if !self.is_waiting { + self.is_waiting = true; + self.num_waiting.fetch_add(1, Ordering::SeqCst); + } + } else { + if self.is_waiting { + self.is_waiting = false; + self.num_waiting.fetch_sub(1, Ordering::SeqCst); + } + } + } +} + +fn check_symlink_loop( + ig_parent: &Ignore, + child_path: &Path, + child_depth: usize, +) -> Result<(), Error> { + for ig in ig_parent.parents().take_while(|ig| !ig.is_absolute_parent()) { + let same = try!(is_same_file(ig.path(), child_path).map_err(|err| { + Error::from(err).with_path(child_path).with_depth(child_depth) + })); + if same { + return Err(Error::Loop { + ancestor: ig.path().to_path_buf(), + child: child_path.to_path_buf(), + }.with_depth(child_depth)); + } + } + Ok(()) +} + +fn skip_path(ig: &Ignore, path: &Path, is_dir: bool) -> bool { + let m = ig.matched(path, is_dir); + if m.is_ignore() { + debug!("ignoring {}: {:?}", path.display(), m); + true + } else if m.is_whitelist() { + debug!("whitelisting {}: {:?}", path.display(), m); + false + } else { + false + } +} + #[cfg(test)] mod tests { use std::fs::{self, File}; use std::io::Write; use std::path::Path; + use std::sync::{Arc, Mutex}; use tempdir::TempDir; - use super::{Walk, WalkBuilder}; + use super::{WalkBuilder, WalkState}; fn wfile>(path: P, contents: &str) { let mut file = File::create(path).unwrap(); file.write_all(contents.as_bytes()).unwrap(); } + #[cfg(unix)] + fn symlink, Q: AsRef>(src: P, dst: Q) { + use std::os::unix::fs::symlink; + symlink(src, dst).unwrap(); + } + fn mkdirp>(path: P) { fs::create_dir_all(path).unwrap(); } @@ -512,10 +1198,13 @@ mod tests { } } - fn walk_collect(prefix: &Path, walk: Walk) -> Vec { + fn walk_collect(prefix: &Path, builder: &WalkBuilder) -> Vec { let mut paths = vec![]; - for dent in walk { - let dent = dent.unwrap(); + for result in builder.build() { + let dent = match result { + Err(_) => continue, + Ok(dent) => dent, + }; let path = dent.path().strip_prefix(prefix).unwrap(); if path.as_os_str().is_empty() { continue; @@ -526,12 +1215,51 @@ mod tests { paths } + fn walk_collect_parallel( + prefix: &Path, + builder: &WalkBuilder, + ) -> Vec { + let paths = Arc::new(Mutex::new(vec![])); + let prefix = Arc::new(prefix.to_path_buf()); + builder.build_parallel().run(|| { + let paths = paths.clone(); + let prefix = prefix.clone(); + Box::new(move |result| { + let dent = match result { + Err(_) => return WalkState::Continue, + Ok(dent) => dent, + }; + let path = dent.path().strip_prefix(&**prefix).unwrap(); + if path.as_os_str().is_empty() { + return WalkState::Continue; + } + let mut paths = paths.lock().unwrap(); + paths.push(normal_path(path.to_str().unwrap())); + WalkState::Continue + }) + }); + let mut paths = paths.lock().unwrap(); + paths.sort(); + paths.to_vec() + } + fn mkpaths(paths: &[&str]) -> Vec { let mut paths: Vec<_> = paths.iter().map(|s| s.to_string()).collect(); paths.sort(); paths } + fn assert_paths( + prefix: &Path, + builder: &WalkBuilder, + expected: &[&str], + ) { + let got = walk_collect(prefix, builder); + assert_eq!(got, mkpaths(expected)); + let got = walk_collect_parallel(prefix, builder); + assert_eq!(got, mkpaths(expected)); + } + #[test] fn no_ignores() { let td = TempDir::new("walk-test-").unwrap(); @@ -540,10 +1268,9 @@ mod tests { wfile(td.path().join("a/b/foo"), ""); wfile(td.path().join("x/y/foo"), ""); - let got = walk_collect(td.path(), Walk::new(td.path())); - assert_eq!(got, mkpaths(&[ + assert_paths(td.path(), &WalkBuilder::new(td.path()), &[ "x", "x/y", "x/y/foo", "a", "a/b", "a/b/foo", "a/b/c", - ])); + ]); } #[test] @@ -556,8 +1283,9 @@ mod tests { wfile(td.path().join("bar"), ""); wfile(td.path().join("a/bar"), ""); - let got = walk_collect(td.path(), Walk::new(td.path())); - assert_eq!(got, mkpaths(&["bar", "a", "a/bar"])); + assert_paths(td.path(), &WalkBuilder::new(td.path()), &[ + "bar", "a", "a/bar", + ]); } #[test] @@ -573,8 +1301,7 @@ mod tests { let mut builder = WalkBuilder::new(td.path()); assert!(builder.add_ignore(&igpath).is_none()); - let got = walk_collect(td.path(), builder.build()); - assert_eq!(got, mkpaths(&["bar", "a", "a/bar"])); + assert_paths(td.path(), &builder, &["bar", "a", "a/bar"]); } #[test] @@ -586,7 +1313,59 @@ mod tests { wfile(td.path().join("a/bar"), ""); let root = td.path().join("a"); - let got = walk_collect(&root, Walk::new(&root)); - assert_eq!(got, mkpaths(&["bar"])); + assert_paths(&root, &WalkBuilder::new(&root), &["bar"]); + } + + #[test] + fn max_depth() { + let td = TempDir::new("walk-test-").unwrap(); + mkdirp(td.path().join("a/b/c")); + wfile(td.path().join("foo"), ""); + wfile(td.path().join("a/foo"), ""); + wfile(td.path().join("a/b/foo"), ""); + wfile(td.path().join("a/b/c/foo"), ""); + + let mut builder = WalkBuilder::new(td.path()); + assert_paths(td.path(), &builder, &[ + "a", "a/b", "a/b/c", "foo", "a/foo", "a/b/foo", "a/b/c/foo", + ]); + assert_paths(td.path(), builder.max_depth(Some(0)), &[]); + assert_paths(td.path(), builder.max_depth(Some(1)), &["a", "foo"]); + assert_paths(td.path(), builder.max_depth(Some(2)), &[ + "a", "a/b", "foo", "a/foo", + ]); + } + + #[cfg(unix)] // because symlinks on windows are weird + #[test] + fn symlinks() { + let td = TempDir::new("walk-test-").unwrap(); + mkdirp(td.path().join("a/b")); + symlink(td.path().join("a/b"), td.path().join("z")); + wfile(td.path().join("a/b/foo"), ""); + + let mut builder = WalkBuilder::new(td.path()); + assert_paths(td.path(), &builder, &[ + "a", "a/b", "a/b/foo", "z", + ]); + assert_paths(td.path(), &builder.follow_links(true), &[ + "a", "a/b", "a/b/foo", "z", "z/foo", + ]); + } + + #[cfg(unix)] // because symlinks on windows are weird + #[test] + fn symlink_loop() { + let td = TempDir::new("walk-test-").unwrap(); + mkdirp(td.path().join("a/b")); + symlink(td.path().join("a"), td.path().join("a/b/c")); + + let mut builder = WalkBuilder::new(td.path()); + assert_paths(td.path(), &builder, &[ + "a", "a/b", "a/b/c", + ]); + assert_paths(td.path(), &builder.follow_links(true), &[ + "a", "a/b", + ]); } } diff --git a/src/args.rs b/src/args.rs index 9d2923b8..017ade4c 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,4 +1,3 @@ -use std::cmp; use std::env; use std::io; use std::path::{Path, PathBuf}; @@ -21,10 +20,9 @@ use ignore::types::{FileTypeDef, Types, TypesBuilder}; use ignore; use out::{Out, ColoredTerminal}; use printer::Printer; -use search_buffer::BufferSearcher; -use search_stream::{InputBuffer, Searcher}; #[cfg(windows)] use terminal_win::WindowsBuffer; +use worker::{Worker, WorkerBuilder}; use Result; @@ -364,7 +362,7 @@ impl RawArgs { }; let threads = if self.flag_threads == 0 { - cmp::min(8, num_cpus::get()) + num_cpus::get() } else { self.flag_threads }; @@ -576,18 +574,6 @@ impl Args { self.grep.clone() } - /// Creates a new input buffer that is used in searching. - pub fn input_buffer(&self) -> InputBuffer { - let mut inp = InputBuffer::new(); - inp.eol(self.eol); - inp - } - - /// Whether we should prefer memory maps for searching or not. - pub fn mmap(&self) -> bool { - self.mmap - } - /// Whether ripgrep should be quiet or not. pub fn quiet(&self) -> bool { self.quiet @@ -662,18 +648,16 @@ impl Args { &self.paths } - /// Create a new line based searcher whose configuration is taken from the - /// command line. This searcher supports a dizzying array of features: - /// inverted matching, line counting, context control and more. - pub fn searcher<'a, R: io::Read, W: Send + Terminal>( - &self, - inp: &'a mut InputBuffer, - printer: &'a mut Printer, - grep: &'a Grep, - path: &'a Path, - rdr: R, - ) -> Searcher<'a, R, W> { - Searcher::new(inp, printer, grep, path, rdr) + /// Returns true if there is exactly one file path given to search. + pub fn is_one_path(&self) -> bool { + self.paths.len() == 1 + && (self.paths[0] == Path::new("-") || self.paths[0].is_file()) + } + + /// Create a worker whose configuration is taken from the + /// command line. + pub fn worker(&self) -> Worker { + WorkerBuilder::new(self.grep()) .after_context(self.after_context) .before_context(self.before_context) .count(self.count) @@ -681,28 +665,10 @@ impl Args { .eol(self.eol) .line_number(self.line_number) .invert_match(self.invert_match) + .mmap(self.mmap) .quiet(self.quiet) .text(self.text) - } - - /// Create a new line based searcher whose configuration is taken from the - /// command line. This search operates on an entire file all once (which - /// may have been memory mapped). - pub fn searcher_buffer<'a, W: Send + Terminal>( - &self, - printer: &'a mut Printer, - grep: &'a Grep, - path: &'a Path, - buf: &'a [u8], - ) -> BufferSearcher<'a, W> { - BufferSearcher::new(printer, grep, path, buf) - .count(self.count) - .files_with_matches(self.files_with_matches) - .eol(self.eol) - .line_number(self.line_number) - .invert_match(self.invert_match) - .quiet(self.quiet) - .text(self.text) + .build() } /// Returns the number of worker search threads that should be used. @@ -722,7 +688,17 @@ impl Args { } /// Create a new recursive directory iterator over the paths in argv. - pub fn walker(&self) -> Walk { + pub fn walker(&self) -> ignore::Walk { + self.walker_builder().build() + } + + /// Create a new parallel recursive directory iterator over the paths + /// in argv. + pub fn walker_parallel(&self) -> ignore::WalkParallel { + self.walker_builder().build_parallel() + } + + fn walker_builder(&self) -> ignore::WalkBuilder { let paths = self.paths(); let mut wd = ignore::WalkBuilder::new(&paths[0]); for path in &paths[1..] { @@ -744,7 +720,8 @@ impl Args { wd.git_exclude(!self.no_ignore && !self.no_ignore_vcs); wd.ignore(!self.no_ignore); wd.parents(!self.no_ignore_parent); - Walk(wd.build()) + wd.threads(self.threads()); + wd } } @@ -761,34 +738,6 @@ fn version() -> String { } } -/// A simple wrapper around the ignore::Walk iterator. This will -/// automatically emit error messages to stderr and will skip directories. -pub struct Walk(ignore::Walk); - -impl Iterator for Walk { - type Item = ignore::DirEntry; - - fn next(&mut self) -> Option { - while let Some(result) = self.0.next() { - match result { - Ok(dent) => { - if let Some(err) = dent.error() { - eprintln!("{}", err); - } - if dent.file_type().map_or(false, |x| x.is_dir()) { - continue; - } - return Some(dent); - } - Err(err) => { - eprintln!("{}", err); - } - } - } - None - } -} - /// A single state in the state machine used by `unescape`. #[derive(Clone, Copy, Eq, PartialEq)] enum State { diff --git a/src/main.rs b/src/main.rs index 71c4da32..276ee059 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ extern crate ctrlc; -extern crate deque; extern crate docopt; extern crate env_logger; extern crate grep; @@ -21,30 +20,20 @@ extern crate term; extern crate winapi; use std::error::Error; -use std::fs::File; use std::io; use std::io::Write; -use std::path::Path; use std::process; use std::result; use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::mpsc; use std::thread; use std::cmp; -use deque::{Stealer, Stolen}; -use grep::Grep; -use memmap::{Mmap, Protection}; use term::Terminal; -use ignore::DirEntry; use args::Args; -use out::{ColoredTerminal, Out}; -use pathutil::strip_prefix; -use printer::Printer; -use search_stream::InputBuffer; -#[cfg(windows)] -use terminal_win::WindowsBuffer; +use worker::Work; macro_rules! errored { ($($tt:tt)*) => { @@ -68,11 +57,12 @@ mod search_buffer; mod search_stream; #[cfg(windows)] mod terminal_win; +mod worker; pub type Result = result::Result>; fn main() { - match Args::parse().and_then(run) { + match Args::parse().map(Arc::new).and_then(run) { Ok(count) if count == 0 => process::exit(1), Ok(_) => process::exit(0), Err(err) => { @@ -82,95 +72,108 @@ fn main() { } } -fn run(args: Args) -> Result { - let args = Arc::new(args); +fn run(args: Arc) -> Result { + { + let args = args.clone(); + ctrlc::set_handler(move || { + let stdout = io::stdout(); + let mut stdout = stdout.lock(); - let handler_args = args.clone(); - ctrlc::set_handler(move || { - let stdout = io::stdout(); - let mut stdout = stdout.lock(); + let _ = args.stdout().reset(); + let _ = stdout.flush(); - let _ = handler_args.stdout().reset(); - let _ = stdout.flush(); - - process::exit(1); - }); - - let paths = args.paths(); + process::exit(1); + }); + } let threads = cmp::max(1, args.threads() - 1); - let isone = - paths.len() == 1 && (paths[0] == Path::new("-") || paths[0].is_file()); if args.files() { - return run_files(args.clone()); - } - if args.type_list() { - return run_types(args.clone()); - } - if threads == 1 || isone { - return run_one_thread(args.clone()); + if threads == 1 || args.is_one_path() { + run_files_one_thread(args) + } else { + run_files_parallel(args) + } + } else if args.type_list() { + run_types(args) + } else if threads == 1 || args.is_one_path() { + run_one_thread(args) + } else { + run_parallel(args) } +} + +fn run_parallel(args: Arc) -> Result { let out = Arc::new(Mutex::new(args.out())); let quiet_matched = QuietMatched::new(args.quiet()); - let mut workers = vec![]; + let paths_searched = Arc::new(AtomicUsize::new(0)); + let match_count = Arc::new(AtomicUsize::new(0)); - let workq = { - let (workq, stealer) = deque::new(); - for _ in 0..threads { - let worker = MultiWorker { - chan_work: stealer.clone(), - quiet_matched: quiet_matched.clone(), - out: out.clone(), - outbuf: Some(args.outbuf()), - worker: Worker { - args: args.clone(), - inpbuf: args.input_buffer(), - grep: args.grep(), - match_count: 0, - }, + args.walker_parallel().run(|| { + let args = args.clone(); + let quiet_matched = quiet_matched.clone(); + let paths_searched = paths_searched.clone(); + let match_count = match_count.clone(); + let out = out.clone(); + let mut outbuf = args.outbuf(); + let mut worker = args.worker(); + Box::new(move |result| { + use ignore::WalkState::*; + + if quiet_matched.has_match() { + return Quit; + } + let dent = match get_or_log_dir_entry(result) { + None => return Continue, + Some(dent) => dent, }; - workers.push(thread::spawn(move || worker.run())); - } - workq - }; - let mut paths_searched: u64 = 0; - for dent in args.walker() { - if quiet_matched.has_match() { - break; - } - paths_searched += 1; - if dent.is_stdin() { - workq.push(Work::Stdin); - } else { - workq.push(Work::File(dent)); - } + paths_searched.fetch_add(1, Ordering::SeqCst); + outbuf.clear(); + { + // This block actually executes the search and prints the + // results into outbuf. + let mut printer = args.printer(&mut outbuf); + let count = + if dent.is_stdin() { + worker.run(&mut printer, Work::Stdin) + } else { + worker.run(&mut printer, Work::DirEntry(dent)) + }; + match_count.fetch_add(count as usize, Ordering::SeqCst); + if quiet_matched.set_match(count > 0) { + return Quit; + } + } + if !outbuf.get_ref().is_empty() { + // This should be the only mutex in all of ripgrep. Since the + // common case is to report a small number of matches relative + // to the corpus, this really shouldn't matter much. + // + // Still, it'd be nice to send this on a channel, but then we'd + // need to manage a pool of outbufs, which would complicate the + // code. + let mut out = out.lock().unwrap(); + out.write(&outbuf); + } + Continue + }) + }); + if !args.paths().is_empty() && paths_searched.load(Ordering::SeqCst) == 0 { + eprint_nothing_searched(); } - if !paths.is_empty() && paths_searched == 0 { - eprintln!("No files were searched, which means ripgrep probably \ - applied a filter you didn't expect. \ - Try running again with --debug."); - } - for _ in 0..workers.len() { - workq.push(Work::Quit); - } - let mut match_count = 0; - for worker in workers { - match_count += worker.join().unwrap(); - } - Ok(match_count) + Ok(match_count.load(Ordering::SeqCst) as u64) } fn run_one_thread(args: Arc) -> Result { - let mut worker = Worker { - args: args.clone(), - inpbuf: args.input_buffer(), - grep: args.grep(), - match_count: 0, - }; + let mut worker = args.worker(); let mut term = args.stdout(); let mut paths_searched: u64 = 0; - for dent in args.walker() { + let mut match_count = 0; + for result in args.walker() { + let dent = match get_or_log_dir_entry(result) { + None => continue, + Some(dent) => dent, + }; let mut printer = args.printer(&mut term); - if worker.match_count > 0 { + if match_count > 0 { if args.quiet() { break; } @@ -179,32 +182,53 @@ fn run_one_thread(args: Arc) -> Result { } } paths_searched += 1; - if dent.is_stdin() { - worker.do_work(&mut printer, WorkReady::Stdin); - } else { - let file = match File::open(dent.path()) { - Ok(file) => file, - Err(err) => { - eprintln!("{}: {}", dent.path().display(), err); - continue; - } + match_count += + if dent.is_stdin() { + worker.run(&mut printer, Work::Stdin) + } else { + worker.run(&mut printer, Work::DirEntry(dent)) }; - worker.do_work(&mut printer, WorkReady::DirFile(dent, file)); - } } if !args.paths().is_empty() && paths_searched == 0 { - eprintln!("No files were searched, which means ripgrep probably \ - applied a filter you didn't expect. \ - Try running again with --debug."); + eprint_nothing_searched(); } - Ok(worker.match_count) + Ok(match_count) } -fn run_files(args: Arc) -> Result { +fn run_files_parallel(args: Arc) -> Result { + let print_args = args.clone(); + let (tx, rx) = mpsc::channel::(); + let print_thread = thread::spawn(move || { + let term = print_args.stdout(); + let mut printer = print_args.printer(term); + let mut file_count = 0; + for dent in rx.iter() { + printer.path(dent.path()); + file_count += 1; + } + file_count + }); + args.walker_parallel().run(move || { + let tx = tx.clone(); + Box::new(move |result| { + if let Some(dent) = get_or_log_dir_entry(result) { + tx.send(dent).unwrap(); + } + ignore::WalkState::Continue + }) + }); + Ok(print_thread.join().unwrap()) +} + +fn run_files_one_thread(args: Arc) -> Result { let term = args.stdout(); let mut printer = args.printer(term); let mut file_count = 0; - for dent in args.walker() { + for result in args.walker() { + let dent = match get_or_log_dir_entry(result) { + None => continue, + Some(dent) => dent, + }; printer.path(dent.path()); file_count += 1; } @@ -222,163 +246,64 @@ fn run_types(args: Arc) -> Result { Ok(ty_count) } -enum Work { - Stdin, - File(DirEntry), - Quit, -} - -enum WorkReady { - Stdin, - DirFile(DirEntry, File), -} - -struct MultiWorker { - chan_work: Stealer, - quiet_matched: QuietMatched, - out: Arc>, - #[cfg(not(windows))] - outbuf: Option>>>, - #[cfg(windows)] - outbuf: Option>, - worker: Worker, -} - -struct Worker { - args: Arc, - inpbuf: InputBuffer, - grep: Grep, - match_count: u64, -} - -impl MultiWorker { - fn run(mut self) -> u64 { - loop { - if self.quiet_matched.has_match() { - break; - } - let work = match self.chan_work.steal() { - Stolen::Empty | Stolen::Abort => continue, - Stolen::Data(Work::Quit) => break, - Stolen::Data(Work::Stdin) => WorkReady::Stdin, - Stolen::Data(Work::File(ent)) => { - match File::open(ent.path()) { - Ok(file) => WorkReady::DirFile(ent, file), - Err(err) => { - eprintln!("{}: {}", ent.path().display(), err); - continue; - } - } - } - }; - let mut outbuf = self.outbuf.take().unwrap(); - outbuf.clear(); - let mut printer = self.worker.args.printer(outbuf); - self.worker.do_work(&mut printer, work); - if self.quiet_matched.set_match(self.worker.match_count > 0) { - break; - } - let outbuf = printer.into_inner(); - if !outbuf.get_ref().is_empty() { - let mut out = self.out.lock().unwrap(); - out.write(&outbuf); - } - self.outbuf = Some(outbuf); +fn get_or_log_dir_entry( + result: result::Result, +) -> Option { + match result { + Err(err) => { + eprintln!("{}", err); + None } - self.worker.match_count - } -} - -impl Worker { - fn do_work( - &mut self, - printer: &mut Printer, - work: WorkReady, - ) { - let result = match work { - WorkReady::Stdin => { - let stdin = io::stdin(); - let stdin = stdin.lock(); - self.search(printer, &Path::new(""), stdin) - } - WorkReady::DirFile(ent, file) => { - let mut path = ent.path(); - if let Some(p) = strip_prefix("./", path) { - path = p; - } - if self.args.mmap() { - self.search_mmap(printer, path, &file) - } else { - self.search(printer, path, file) - } - } - }; - match result { - Ok(count) => { - self.match_count += count; - } - Err(err) => { + Ok(dent) => { + if let Some(err) = dent.error() { eprintln!("{}", err); } + if !dent.file_type().map_or(true, |x| x.is_file()) { + None + } else { + Some(dent) + } } } - - fn search( - &mut self, - printer: &mut Printer, - path: &Path, - rdr: R, - ) -> Result { - self.args.searcher( - &mut self.inpbuf, - printer, - &self.grep, - path, - rdr, - ).run().map_err(From::from) - } - - fn search_mmap( - &mut self, - printer: &mut Printer, - path: &Path, - file: &File, - ) -> Result { - if try!(file.metadata()).len() == 0 { - // Opening a memory map with an empty file results in an error. - // However, this may not actually be an empty file! For example, - // /proc/cpuinfo reports itself as an empty file, but it can - // produce data when it's read from. Therefore, we fall back to - // regular read calls. - return self.search(printer, path, file); - } - let mmap = try!(Mmap::open(file, Protection::Read)); - Ok(self.args.searcher_buffer( - printer, - &self.grep, - path, - unsafe { mmap.as_slice() }, - ).run()) - } } +fn eprint_nothing_searched() { + eprintln!("No files were searched, which means ripgrep probably \ + applied a filter you didn't expect. \ + Try running again with --debug."); +} + +/// A simple thread safe abstraction for determining whether a search should +/// stop if the user has requested quiet mode. #[derive(Clone, Debug)] -struct QuietMatched(Arc>); +pub struct QuietMatched(Arc>); impl QuietMatched { - fn new(quiet: bool) -> QuietMatched { + /// Create a new QuietMatched value. + /// + /// If quiet is true, then set_match and has_match will reflect whether + /// a search should quit or not because it found a match. + /// + /// If quiet is false, then set_match is always a no-op and has_match + /// always returns false. + pub fn new(quiet: bool) -> QuietMatched { let atomic = if quiet { Some(AtomicBool::new(false)) } else { None }; QuietMatched(Arc::new(atomic)) } - fn has_match(&self) -> bool { + /// Returns true if and only if quiet mode is enabled and a match has + /// occurred. + pub fn has_match(&self) -> bool { match *self.0 { None => false, Some(ref matched) => matched.load(Ordering::SeqCst), } } - fn set_match(&self, yes: bool) -> bool { + /// Sets whether a match has occurred or not. + /// + /// If quiet mode is disabled, then this is a no-op. + pub fn set_match(&self, yes: bool) -> bool { match *self.0 { None => false, Some(_) if !yes => false, diff --git a/src/printer.rs b/src/printer.rs index e7373bce..1b8e5965 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -158,6 +158,7 @@ impl Printer { } /// Flushes the underlying writer and returns it. + #[allow(dead_code)] pub fn into_inner(mut self) -> W { let _ = self.wtr.flush(); self.wtr diff --git a/src/worker.rs b/src/worker.rs new file mode 100644 index 00000000..797fe9d7 --- /dev/null +++ b/src/worker.rs @@ -0,0 +1,253 @@ +use std::fs::File; +use std::io; +use std::path::Path; + +use grep::Grep; +use ignore::DirEntry; +use memmap::{Mmap, Protection}; +use term::Terminal; + +use pathutil::strip_prefix; +use printer::Printer; +use search_buffer::BufferSearcher; +use search_stream::{InputBuffer, Searcher}; + +use Result; + +pub enum Work { + Stdin, + DirEntry(DirEntry), +} + +pub struct WorkerBuilder { + grep: Grep, + opts: Options, +} + +#[derive(Clone, Debug)] +struct Options { + mmap: bool, + after_context: usize, + before_context: usize, + count: bool, + files_with_matches: bool, + eol: u8, + invert_match: bool, + line_number: bool, + quiet: bool, + text: bool, +} + +impl Default for Options { + fn default() -> Options { + Options { + mmap: false, + after_context: 0, + before_context: 0, + count: false, + files_with_matches: false, + eol: b'\n', + invert_match: false, + line_number: false, + quiet: false, + text: false, + } + } +} + +impl WorkerBuilder { + /// Create a new builder for a worker. + /// + /// A reusable input buffer and a grep matcher are required, but there + /// are numerous additional options that can be configured on this builder. + pub fn new(grep: Grep) -> WorkerBuilder { + WorkerBuilder { + grep: grep, + opts: Options::default(), + } + } + + /// Create the worker from this builder. + pub fn build(self) -> Worker { + let mut inpbuf = InputBuffer::new(); + inpbuf.eol(self.opts.eol); + Worker { + grep: self.grep, + inpbuf: inpbuf, + opts: self.opts, + } + } + + /// The number of contextual lines to show after each match. The default + /// is zero. + pub fn after_context(mut self, count: usize) -> Self { + self.opts.after_context = count; + self + } + + /// The number of contextual lines to show before each match. The default + /// is zero. + pub fn before_context(mut self, count: usize) -> Self { + self.opts.before_context = count; + self + } + + /// If enabled, searching will print a count instead of each match. + /// + /// Disabled by default. + pub fn count(mut self, yes: bool) -> Self { + self.opts.count = yes; + self + } + + /// If enabled, searching will print the path instead of each match. + /// + /// Disabled by default. + pub fn files_with_matches(mut self, yes: bool) -> Self { + self.opts.files_with_matches = yes; + self + } + + /// Set the end-of-line byte used by this searcher. + pub fn eol(mut self, eol: u8) -> Self { + self.opts.eol = eol; + self + } + + /// If enabled, matching is inverted so that lines that *don't* match the + /// given pattern are treated as matches. + pub fn invert_match(mut self, yes: bool) -> Self { + self.opts.invert_match = yes; + self + } + + /// If enabled, compute line numbers and prefix each line of output with + /// them. + pub fn line_number(mut self, yes: bool) -> Self { + self.opts.line_number = yes; + self + } + + /// If enabled, try to use memory maps for searching if possible. + pub fn mmap(mut self, yes: bool) -> Self { + self.opts.mmap = yes; + self + } + + /// If enabled, don't show any output and quit searching after the first + /// match is found. + pub fn quiet(mut self, yes: bool) -> Self { + self.opts.quiet = yes; + self + } + + /// If enabled, search binary files as if they were text. + pub fn text(mut self, yes: bool) -> Self { + self.opts.text = yes; + self + } +} + +/// Worker is responsible for executing searches on file paths, while choosing +/// streaming search or memory map search as appropriate. +pub struct Worker { + inpbuf: InputBuffer, + grep: Grep, + opts: Options, +} + +impl Worker { + /// Execute the worker with the given printer and work item. + /// + /// A work item can either be stdin or a file path. + pub fn run( + &mut self, + printer: &mut Printer, + work: Work, + ) -> u64 { + let result = match work { + Work::Stdin => { + let stdin = io::stdin(); + let stdin = stdin.lock(); + self.search(printer, &Path::new(""), stdin) + } + Work::DirEntry(dent) => { + let mut path = dent.path(); + let file = match File::open(path) { + Ok(file) => file, + Err(err) => { + eprintln!("{}: {}", path.display(), err); + return 0; + } + }; + if let Some(p) = strip_prefix("./", path) { + path = p; + } + if self.opts.mmap { + self.search_mmap(printer, path, &file) + } else { + self.search(printer, path, file) + } + } + }; + match result { + Ok(count) => { + count + } + Err(err) => { + eprintln!("{}", err); + 0 + } + } + } + + fn search( + &mut self, + printer: &mut Printer, + path: &Path, + rdr: R, + ) -> Result { + let searcher = Searcher::new( + &mut self.inpbuf, printer, &self.grep, path, rdr); + searcher + .after_context(self.opts.after_context) + .before_context(self.opts.before_context) + .count(self.opts.count) + .files_with_matches(self.opts.files_with_matches) + .eol(self.opts.eol) + .line_number(self.opts.line_number) + .invert_match(self.opts.invert_match) + .quiet(self.opts.quiet) + .text(self.opts.text) + .run() + .map_err(From::from) + } + + fn search_mmap( + &mut self, + printer: &mut Printer, + path: &Path, + file: &File, + ) -> Result { + if try!(file.metadata()).len() == 0 { + // Opening a memory map with an empty file results in an error. + // However, this may not actually be an empty file! For example, + // /proc/cpuinfo reports itself as an empty file, but it can + // produce data when it's read from. Therefore, we fall back to + // regular read calls. + return self.search(printer, path, file); + } + let mmap = try!(Mmap::open(file, Protection::Read)); + let searcher = BufferSearcher::new( + printer, &self.grep, path, unsafe { mmap.as_slice() }); + Ok(searcher + .count(self.opts.count) + .files_with_matches(self.opts.files_with_matches) + .eol(self.opts.eol) + .line_number(self.opts.line_number) + .invert_match(self.opts.invert_match) + .quiet(self.opts.quiet) + .text(self.opts.text) + .run()) + } +}