Add parallel recursive directory iterator.

This adds a new walk type in the `ignore` crate, `WalkParallel`, which
provides a way for recursively iterating over a set of paths in parallel
while respecting various ignore rules.

The API is a bit strange, as a closure producing a closure isn't
something one often sees, but it does seem to work well.

This also allowed us to simplify much of the worker logic in ripgrep
proper, where MultiWorker is now gone.
This commit is contained in:
Andrew Gallant
2016-11-05 21:44:15 -04:00
parent 1aeae3e22d
commit b272be25fa
11 changed files with 1506 additions and 479 deletions

26
Cargo.lock generated
View File

@@ -3,7 +3,6 @@ name = "ripgrep"
version = "0.2.6" version = "0.2.6"
dependencies = [ dependencies = [
"ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)", "docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"grep 0.1.3", "grep 0.1.3",
@@ -29,6 +28,11 @@ dependencies = [
"memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "crossbeam"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "ctrlc" name = "ctrlc"
version = "2.0.1" version = "2.0.1"
@@ -39,14 +43,6 @@ dependencies = [
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "deque"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "docopt" name = "docopt"
version = "0.6.86" version = "0.6.86"
@@ -109,6 +105,7 @@ dependencies = [
name = "ignore" name = "ignore"
version = "0.1.3" version = "0.1.3"
dependencies = [ dependencies = [
"crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"globset 0.1.1", "globset 0.1.1",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -169,14 +166,6 @@ dependencies = [
"libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "rand"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.17 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "0.1.80" version = "0.1.80"
@@ -293,8 +282,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata] [metadata]
"checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66" "checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66"
"checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97"
"checksum ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77f98bb69e3fefadcc5ca80a1368a55251f70295168203e01165bcaecb270891" "checksum ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77f98bb69e3fefadcc5ca80a1368a55251f70295168203e01165bcaecb270891"
"checksum deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1614659040e711785ed8ea24219140654da1729f3ec8a47a9719d041112fe7bf"
"checksum docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)" = "4a7ef30445607f6fc8720f0a0a2c7442284b629cf0d049286860fae23e71c4d9" "checksum docopt 0.6.86 (registry+https://github.com/rust-lang/crates.io-index)" = "4a7ef30445607f6fc8720f0a0a2c7442284b629cf0d049286860fae23e71c4d9"
"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f"
"checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" "checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344"
@@ -306,7 +295,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" "checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20"
"checksum memmap 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "065ce59af31c18ea2c419100bda6247dd4ec3099423202b12f0bd32e529fabd2" "checksum memmap 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "065ce59af31c18ea2c419100bda6247dd4ec3099423202b12f0bd32e529fabd2"
"checksum num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8890e6084723d57d0df8d2720b0d60c6ee67d6c93e7169630e4371e88765dcad" "checksum num_cpus 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8890e6084723d57d0df8d2720b0d60c6ee67d6c93e7169630e4371e88765dcad"
"checksum rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "2791d88c6defac799c3f20d74f094ca33b9332612d9aef9078519c82e4fe04a5"
"checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f" "checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f"
"checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957" "checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957"
"checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b" "checksum rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)" = "6159e4e6e559c81bd706afe9c8fd68f547d3e851ce12e76b1de7914bab61691b"

View File

@@ -25,7 +25,6 @@ path = "tests/tests.rs"
[dependencies] [dependencies]
ctrlc = "2.0" ctrlc = "2.0"
deque = "0.3"
docopt = "0.6" docopt = "0.6"
env_logger = "0.3" env_logger = "0.3"
grep = { version = "0.1.3", path = "grep" } grep = { version = "0.1.3", path = "grep" }

View File

@@ -18,6 +18,7 @@ name = "ignore"
bench = false bench = false
[dependencies] [dependencies]
crossbeam = "0.2"
globset = { version = "0.1.1", path = "../globset" } globset = { version = "0.1.1", path = "../globset" }
lazy_static = "0.2" lazy_static = "0.2"
log = "0.3" log = "0.3"

View File

@@ -1,28 +1,92 @@
/* #![allow(dead_code, unused_imports, unused_mut, unused_variables)]
extern crate crossbeam;
extern crate ignore; extern crate ignore;
extern crate walkdir; extern crate walkdir;
use std::env; use std::env;
use std::io::{self, Write}; use std::io::{self, Write};
use std::os::unix::ffi::OsStrExt; use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use ignore::ignore::IgnoreBuilder; use crossbeam::sync::MsQueue;
use ignore::WalkBuilder;
use walkdir::WalkDir; use walkdir::WalkDir;
fn main() { fn main() {
let path = env::args().nth(1).unwrap(); let mut path = env::args().nth(1).unwrap();
let ig = IgnoreBuilder::new().build(); let mut parallel = false;
let wd = WalkDir::new(path); let mut simple = false;
let walker = ignore::walk::Iter::new(ig, wd); let queue: Arc<MsQueue<Option<DirEntry>>> = Arc::new(MsQueue::new());
if path == "parallel" {
let mut stdout = io::BufWriter::new(io::stdout()); path = env::args().nth(2).unwrap();
// let mut count = 0; parallel = true;
for dirent in walker { } else if path == "walkdir" {
// count += 1; path = env::args().nth(2).unwrap();
stdout.write(dirent.path().as_os_str().as_bytes()).unwrap(); simple = true;
stdout.write(b"\n").unwrap();
} }
// println!("{}", count);
let stdout_queue = queue.clone();
let stdout_thread = thread::spawn(move || {
let mut stdout = io::BufWriter::new(io::stdout());
while let Some(dent) = stdout_queue.pop() {
write_path(&mut stdout, dent.path());
}
});
if parallel {
let walker = WalkBuilder::new(path).threads(6).build_parallel();
walker.run(|| {
let queue = queue.clone();
Box::new(move |result| {
use ignore::WalkState::*;
queue.push(Some(DirEntry::Y(result.unwrap())));
Continue
})
});
} else if simple {
let mut stdout = io::BufWriter::new(io::stdout());
let walker = WalkDir::new(path);
for result in walker {
queue.push(Some(DirEntry::X(result.unwrap())));
}
} else {
let mut stdout = io::BufWriter::new(io::stdout());
let walker = WalkBuilder::new(path).build();
for result in walker {
queue.push(Some(DirEntry::Y(result.unwrap())));
}
}
queue.push(None);
stdout_thread.join().unwrap();
}
enum DirEntry {
X(walkdir::DirEntry),
Y(ignore::DirEntry),
}
impl DirEntry {
fn path(&self) -> &Path {
match *self {
DirEntry::X(ref x) => x.path(),
DirEntry::Y(ref y) => y.path(),
}
}
}
#[cfg(unix)]
fn write_path<W: Write>(mut wtr: W, path: &Path) {
use std::os::unix::ffi::OsStrExt;
wtr.write(path.as_os_str().as_bytes()).unwrap();
wtr.write(b"\n").unwrap();
}
#[cfg(not(unix))]
fn write_path<W: Write>(mut wtr: W, path: &Path) {
wtr.write(path.to_string_lossy().as_bytes()).unwrap();
wtr.write(b"\n").unwrap();
} }
*/
fn main() {}

View File

@@ -137,6 +137,11 @@ impl Ignore {
self.0.parent.is_none() self.0.parent.is_none()
} }
/// Returns true if this matcher was added via the `add_parents` method.
pub fn is_absolute_parent(&self) -> bool {
self.0.is_absolute_parent
}
/// Return this matcher's parent, if one exists. /// Return this matcher's parent, if one exists.
pub fn parent(&self) -> Option<Ignore> { pub fn parent(&self) -> Option<Ignore> {
self.0.parent.clone() self.0.parent.clone()
@@ -376,7 +381,7 @@ impl Ignore {
} }
/// Returns an iterator over parent ignore matchers, including this one. /// Returns an iterator over parent ignore matchers, including this one.
fn parents(&self) -> Parents { pub fn parents(&self) -> Parents {
Parents(Some(self)) Parents(Some(self))
} }
@@ -387,7 +392,10 @@ impl Ignore {
} }
} }
struct Parents<'a>(Option<&'a Ignore>); /// An iterator over all parents of an ignore matcher, including itself.
///
/// The lifetime `'a` refers to the lifetime of the initial `Ignore` matcher.
pub struct Parents<'a>(Option<&'a Ignore>);
impl<'a> Iterator for Parents<'a> { impl<'a> Iterator for Parents<'a> {
type Item = &'a Ignore; type Item = &'a Ignore;

View File

@@ -44,6 +44,7 @@ for result in WalkBuilder::new("./").hidden(false).build() {
See the documentation for `WalkBuilder` for many other options. See the documentation for `WalkBuilder` for many other options.
*/ */
extern crate crossbeam;
extern crate globset; extern crate globset;
#[macro_use] #[macro_use]
extern crate lazy_static; extern crate lazy_static;
@@ -61,7 +62,7 @@ use std::fmt;
use std::io; use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
pub use walk::{DirEntry, Walk, WalkBuilder}; pub use walk::{DirEntry, Walk, WalkBuilder, WalkParallel, WalkState};
mod dir; mod dir;
pub mod gitignore; pub mod gitignore;
@@ -80,6 +81,12 @@ pub enum Error {
WithLineNumber { line: u64, err: Box<Error> }, WithLineNumber { line: u64, err: Box<Error> },
/// An error associated with a particular file path. /// An error associated with a particular file path.
WithPath { path: PathBuf, err: Box<Error> }, WithPath { path: PathBuf, err: Box<Error> },
/// An error associated with a particular directory depth when recursively
/// walking a directory.
WithDepth { depth: usize, err: Box<Error> },
/// An error that occurs when a file loop is detected when traversing
/// symbolic links.
Loop { ancestor: PathBuf, child: PathBuf },
/// An error that occurs when doing I/O, such as reading an ignore file. /// An error that occurs when doing I/O, such as reading an ignore file.
Io(io::Error), Io(io::Error),
/// An error that occurs when trying to parse a glob. /// An error that occurs when trying to parse a glob.
@@ -101,6 +108,7 @@ impl Error {
Error::Partial(_) => true, Error::Partial(_) => true,
Error::WithLineNumber { ref err, .. } => err.is_partial(), Error::WithLineNumber { ref err, .. } => err.is_partial(),
Error::WithPath { ref err, .. } => err.is_partial(), Error::WithPath { ref err, .. } => err.is_partial(),
Error::WithDepth { ref err, .. } => err.is_partial(),
_ => false, _ => false,
} }
} }
@@ -111,6 +119,8 @@ impl Error {
Error::Partial(ref errs) => errs.len() == 1 && errs[0].is_io(), Error::Partial(ref errs) => errs.len() == 1 && errs[0].is_io(),
Error::WithLineNumber { ref err, .. } => err.is_io(), Error::WithLineNumber { ref err, .. } => err.is_io(),
Error::WithPath { ref err, .. } => err.is_io(), Error::WithPath { ref err, .. } => err.is_io(),
Error::WithDepth { ref err, .. } => err.is_io(),
Error::Loop { .. } => false,
Error::Io(_) => true, Error::Io(_) => true,
Error::Glob(_) => false, Error::Glob(_) => false,
Error::UnrecognizedFileType(_) => false, Error::UnrecognizedFileType(_) => false,
@@ -118,6 +128,16 @@ impl Error {
} }
} }
/// Returns a depth associated with recursively walking a directory (if
/// this error was generated from a recursive directory iterator).
pub fn depth(&self) -> Option<usize> {
match *self {
Error::WithPath { ref err, .. } => err.depth(),
Error::WithDepth { depth, .. } => Some(depth),
_ => None,
}
}
/// Turn an error into a tagged error with the given file path. /// Turn an error into a tagged error with the given file path.
fn with_path<P: AsRef<Path>>(self, path: P) -> Error { fn with_path<P: AsRef<Path>>(self, path: P) -> Error {
Error::WithPath { Error::WithPath {
@@ -126,6 +146,14 @@ impl Error {
} }
} }
/// Turn an error into a tagged error with the given depth.
fn with_depth(self, depth: usize) -> Error {
Error::WithDepth {
depth: depth,
err: Box::new(self),
}
}
/// Turn an error into a tagged error with the given file path and line /// Turn an error into a tagged error with the given file path and line
/// number. If path is empty, then it is omitted from the error. /// number. If path is empty, then it is omitted from the error.
fn tagged<P: AsRef<Path>>(self, path: P, lineno: u64) -> Error { fn tagged<P: AsRef<Path>>(self, path: P, lineno: u64) -> Error {
@@ -146,6 +174,8 @@ impl error::Error for Error {
Error::Partial(_) => "partial error", Error::Partial(_) => "partial error",
Error::WithLineNumber { ref err, .. } => err.description(), Error::WithLineNumber { ref err, .. } => err.description(),
Error::WithPath { ref err, .. } => err.description(), Error::WithPath { ref err, .. } => err.description(),
Error::WithDepth { ref err, .. } => err.description(),
Error::Loop { .. } => "file system loop found",
Error::Io(ref err) => err.description(), Error::Io(ref err) => err.description(),
Error::Glob(ref msg) => msg, Error::Glob(ref msg) => msg,
Error::UnrecognizedFileType(_) => "unrecognized file type", Error::UnrecognizedFileType(_) => "unrecognized file type",
@@ -168,6 +198,12 @@ impl fmt::Display for Error {
Error::WithPath { ref path, ref err } => { Error::WithPath { ref path, ref err } => {
write!(f, "{}: {}", path.display(), err) write!(f, "{}: {}", path.display(), err)
} }
Error::WithDepth { ref err, .. } => err.fmt(f),
Error::Loop { ref ancestor, ref child } => {
write!(f, "File system loop found: \
{} points to an ancestor {}",
child.display(), ancestor.display())
}
Error::Io(ref err) => err.fmt(f), Error::Io(ref err) => err.fmt(f),
Error::Glob(ref msg) => write!(f, "{}", msg), Error::Glob(ref msg) => write!(f, "{}", msg),
Error::UnrecognizedFileType(ref ty) => { Error::UnrecognizedFileType(ref ty) => {
@@ -187,6 +223,30 @@ impl From<io::Error> for Error {
} }
} }
impl From<walkdir::Error> for Error {
fn from(err: walkdir::Error) -> Error {
let depth = err.depth();
if let (Some(anc), Some(child)) = (err.loop_ancestor(), err.path()) {
return Error::WithDepth {
depth: depth,
err: Box::new(Error::Loop {
ancestor: anc.to_path_buf(),
child: child.to_path_buf(),
}),
};
}
let path = err.path().map(|p| p.to_path_buf());
let mut ig_err = Error::Io(io::Error::from(err));
if let Some(path) = path {
ig_err = Error::WithPath {
path: path,
err: Box::new(ig_err),
};
}
ig_err
}
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct PartialErrorBuilder(Vec<Error>); struct PartialErrorBuilder(Vec<Error>);

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,3 @@
use std::cmp;
use std::env; use std::env;
use std::io; use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@@ -21,10 +20,9 @@ use ignore::types::{FileTypeDef, Types, TypesBuilder};
use ignore; use ignore;
use out::{Out, ColoredTerminal}; use out::{Out, ColoredTerminal};
use printer::Printer; use printer::Printer;
use search_buffer::BufferSearcher;
use search_stream::{InputBuffer, Searcher};
#[cfg(windows)] #[cfg(windows)]
use terminal_win::WindowsBuffer; use terminal_win::WindowsBuffer;
use worker::{Worker, WorkerBuilder};
use Result; use Result;
@@ -364,7 +362,7 @@ impl RawArgs {
}; };
let threads = let threads =
if self.flag_threads == 0 { if self.flag_threads == 0 {
cmp::min(8, num_cpus::get()) num_cpus::get()
} else { } else {
self.flag_threads self.flag_threads
}; };
@@ -576,18 +574,6 @@ impl Args {
self.grep.clone() self.grep.clone()
} }
/// Creates a new input buffer that is used in searching.
pub fn input_buffer(&self) -> InputBuffer {
let mut inp = InputBuffer::new();
inp.eol(self.eol);
inp
}
/// Whether we should prefer memory maps for searching or not.
pub fn mmap(&self) -> bool {
self.mmap
}
/// Whether ripgrep should be quiet or not. /// Whether ripgrep should be quiet or not.
pub fn quiet(&self) -> bool { pub fn quiet(&self) -> bool {
self.quiet self.quiet
@@ -662,18 +648,16 @@ impl Args {
&self.paths &self.paths
} }
/// Create a new line based searcher whose configuration is taken from the /// Returns true if there is exactly one file path given to search.
/// command line. This searcher supports a dizzying array of features: pub fn is_one_path(&self) -> bool {
/// inverted matching, line counting, context control and more. self.paths.len() == 1
pub fn searcher<'a, R: io::Read, W: Send + Terminal>( && (self.paths[0] == Path::new("-") || self.paths[0].is_file())
&self, }
inp: &'a mut InputBuffer,
printer: &'a mut Printer<W>, /// Create a worker whose configuration is taken from the
grep: &'a Grep, /// command line.
path: &'a Path, pub fn worker(&self) -> Worker {
rdr: R, WorkerBuilder::new(self.grep())
) -> Searcher<'a, R, W> {
Searcher::new(inp, printer, grep, path, rdr)
.after_context(self.after_context) .after_context(self.after_context)
.before_context(self.before_context) .before_context(self.before_context)
.count(self.count) .count(self.count)
@@ -681,28 +665,10 @@ impl Args {
.eol(self.eol) .eol(self.eol)
.line_number(self.line_number) .line_number(self.line_number)
.invert_match(self.invert_match) .invert_match(self.invert_match)
.mmap(self.mmap)
.quiet(self.quiet) .quiet(self.quiet)
.text(self.text) .text(self.text)
} .build()
/// Create a new line based searcher whose configuration is taken from the
/// command line. This search operates on an entire file all once (which
/// may have been memory mapped).
pub fn searcher_buffer<'a, W: Send + Terminal>(
&self,
printer: &'a mut Printer<W>,
grep: &'a Grep,
path: &'a Path,
buf: &'a [u8],
) -> BufferSearcher<'a, W> {
BufferSearcher::new(printer, grep, path, buf)
.count(self.count)
.files_with_matches(self.files_with_matches)
.eol(self.eol)
.line_number(self.line_number)
.invert_match(self.invert_match)
.quiet(self.quiet)
.text(self.text)
} }
/// Returns the number of worker search threads that should be used. /// Returns the number of worker search threads that should be used.
@@ -722,7 +688,17 @@ impl Args {
} }
/// Create a new recursive directory iterator over the paths in argv. /// Create a new recursive directory iterator over the paths in argv.
pub fn walker(&self) -> Walk { pub fn walker(&self) -> ignore::Walk {
self.walker_builder().build()
}
/// Create a new parallel recursive directory iterator over the paths
/// in argv.
pub fn walker_parallel(&self) -> ignore::WalkParallel {
self.walker_builder().build_parallel()
}
fn walker_builder(&self) -> ignore::WalkBuilder {
let paths = self.paths(); let paths = self.paths();
let mut wd = ignore::WalkBuilder::new(&paths[0]); let mut wd = ignore::WalkBuilder::new(&paths[0]);
for path in &paths[1..] { for path in &paths[1..] {
@@ -744,7 +720,8 @@ impl Args {
wd.git_exclude(!self.no_ignore && !self.no_ignore_vcs); wd.git_exclude(!self.no_ignore && !self.no_ignore_vcs);
wd.ignore(!self.no_ignore); wd.ignore(!self.no_ignore);
wd.parents(!self.no_ignore_parent); wd.parents(!self.no_ignore_parent);
Walk(wd.build()) wd.threads(self.threads());
wd
} }
} }
@@ -761,34 +738,6 @@ fn version() -> String {
} }
} }
/// A simple wrapper around the ignore::Walk iterator. This will
/// automatically emit error messages to stderr and will skip directories.
pub struct Walk(ignore::Walk);
impl Iterator for Walk {
type Item = ignore::DirEntry;
fn next(&mut self) -> Option<ignore::DirEntry> {
while let Some(result) = self.0.next() {
match result {
Ok(dent) => {
if let Some(err) = dent.error() {
eprintln!("{}", err);
}
if dent.file_type().map_or(false, |x| x.is_dir()) {
continue;
}
return Some(dent);
}
Err(err) => {
eprintln!("{}", err);
}
}
}
None
}
}
/// A single state in the state machine used by `unescape`. /// A single state in the state machine used by `unescape`.
#[derive(Clone, Copy, Eq, PartialEq)] #[derive(Clone, Copy, Eq, PartialEq)]
enum State { enum State {

View File

@@ -1,5 +1,4 @@
extern crate ctrlc; extern crate ctrlc;
extern crate deque;
extern crate docopt; extern crate docopt;
extern crate env_logger; extern crate env_logger;
extern crate grep; extern crate grep;
@@ -21,30 +20,20 @@ extern crate term;
extern crate winapi; extern crate winapi;
use std::error::Error; use std::error::Error;
use std::fs::File;
use std::io; use std::io;
use std::io::Write; use std::io::Write;
use std::path::Path;
use std::process; use std::process;
use std::result; use std::result;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc;
use std::thread; use std::thread;
use std::cmp; use std::cmp;
use deque::{Stealer, Stolen};
use grep::Grep;
use memmap::{Mmap, Protection};
use term::Terminal; use term::Terminal;
use ignore::DirEntry;
use args::Args; use args::Args;
use out::{ColoredTerminal, Out}; use worker::Work;
use pathutil::strip_prefix;
use printer::Printer;
use search_stream::InputBuffer;
#[cfg(windows)]
use terminal_win::WindowsBuffer;
macro_rules! errored { macro_rules! errored {
($($tt:tt)*) => { ($($tt:tt)*) => {
@@ -68,11 +57,12 @@ mod search_buffer;
mod search_stream; mod search_stream;
#[cfg(windows)] #[cfg(windows)]
mod terminal_win; mod terminal_win;
mod worker;
pub type Result<T> = result::Result<T, Box<Error + Send + Sync>>; pub type Result<T> = result::Result<T, Box<Error + Send + Sync>>;
fn main() { fn main() {
match Args::parse().and_then(run) { match Args::parse().map(Arc::new).and_then(run) {
Ok(count) if count == 0 => process::exit(1), Ok(count) if count == 0 => process::exit(1),
Ok(_) => process::exit(0), Ok(_) => process::exit(0),
Err(err) => { Err(err) => {
@@ -82,95 +72,108 @@ fn main() {
} }
} }
fn run(args: Args) -> Result<u64> { fn run(args: Arc<Args>) -> Result<u64> {
let args = Arc::new(args); {
let args = args.clone();
ctrlc::set_handler(move || {
let stdout = io::stdout();
let mut stdout = stdout.lock();
let handler_args = args.clone(); let _ = args.stdout().reset();
ctrlc::set_handler(move || { let _ = stdout.flush();
let stdout = io::stdout();
let mut stdout = stdout.lock();
let _ = handler_args.stdout().reset(); process::exit(1);
let _ = stdout.flush(); });
}
process::exit(1);
});
let paths = args.paths();
let threads = cmp::max(1, args.threads() - 1); let threads = cmp::max(1, args.threads() - 1);
let isone =
paths.len() == 1 && (paths[0] == Path::new("-") || paths[0].is_file());
if args.files() { if args.files() {
return run_files(args.clone()); if threads == 1 || args.is_one_path() {
} run_files_one_thread(args)
if args.type_list() { } else {
return run_types(args.clone()); run_files_parallel(args)
} }
if threads == 1 || isone { } else if args.type_list() {
return run_one_thread(args.clone()); run_types(args)
} else if threads == 1 || args.is_one_path() {
run_one_thread(args)
} else {
run_parallel(args)
} }
}
fn run_parallel(args: Arc<Args>) -> Result<u64> {
let out = Arc::new(Mutex::new(args.out())); let out = Arc::new(Mutex::new(args.out()));
let quiet_matched = QuietMatched::new(args.quiet()); let quiet_matched = QuietMatched::new(args.quiet());
let mut workers = vec![]; let paths_searched = Arc::new(AtomicUsize::new(0));
let match_count = Arc::new(AtomicUsize::new(0));
let workq = { args.walker_parallel().run(|| {
let (workq, stealer) = deque::new(); let args = args.clone();
for _ in 0..threads { let quiet_matched = quiet_matched.clone();
let worker = MultiWorker { let paths_searched = paths_searched.clone();
chan_work: stealer.clone(), let match_count = match_count.clone();
quiet_matched: quiet_matched.clone(), let out = out.clone();
out: out.clone(), let mut outbuf = args.outbuf();
outbuf: Some(args.outbuf()), let mut worker = args.worker();
worker: Worker { Box::new(move |result| {
args: args.clone(), use ignore::WalkState::*;
inpbuf: args.input_buffer(),
grep: args.grep(), if quiet_matched.has_match() {
match_count: 0, return Quit;
}, }
let dent = match get_or_log_dir_entry(result) {
None => return Continue,
Some(dent) => dent,
}; };
workers.push(thread::spawn(move || worker.run())); paths_searched.fetch_add(1, Ordering::SeqCst);
} outbuf.clear();
workq {
}; // This block actually executes the search and prints the
let mut paths_searched: u64 = 0; // results into outbuf.
for dent in args.walker() { let mut printer = args.printer(&mut outbuf);
if quiet_matched.has_match() { let count =
break; if dent.is_stdin() {
} worker.run(&mut printer, Work::Stdin)
paths_searched += 1; } else {
if dent.is_stdin() { worker.run(&mut printer, Work::DirEntry(dent))
workq.push(Work::Stdin); };
} else { match_count.fetch_add(count as usize, Ordering::SeqCst);
workq.push(Work::File(dent)); if quiet_matched.set_match(count > 0) {
} return Quit;
}
}
if !outbuf.get_ref().is_empty() {
// This should be the only mutex in all of ripgrep. Since the
// common case is to report a small number of matches relative
// to the corpus, this really shouldn't matter much.
//
// Still, it'd be nice to send this on a channel, but then we'd
// need to manage a pool of outbufs, which would complicate the
// code.
let mut out = out.lock().unwrap();
out.write(&outbuf);
}
Continue
})
});
if !args.paths().is_empty() && paths_searched.load(Ordering::SeqCst) == 0 {
eprint_nothing_searched();
} }
if !paths.is_empty() && paths_searched == 0 { Ok(match_count.load(Ordering::SeqCst) as u64)
eprintln!("No files were searched, which means ripgrep probably \
applied a filter you didn't expect. \
Try running again with --debug.");
}
for _ in 0..workers.len() {
workq.push(Work::Quit);
}
let mut match_count = 0;
for worker in workers {
match_count += worker.join().unwrap();
}
Ok(match_count)
} }
fn run_one_thread(args: Arc<Args>) -> Result<u64> { fn run_one_thread(args: Arc<Args>) -> Result<u64> {
let mut worker = Worker { let mut worker = args.worker();
args: args.clone(),
inpbuf: args.input_buffer(),
grep: args.grep(),
match_count: 0,
};
let mut term = args.stdout(); let mut term = args.stdout();
let mut paths_searched: u64 = 0; let mut paths_searched: u64 = 0;
for dent in args.walker() { let mut match_count = 0;
for result in args.walker() {
let dent = match get_or_log_dir_entry(result) {
None => continue,
Some(dent) => dent,
};
let mut printer = args.printer(&mut term); let mut printer = args.printer(&mut term);
if worker.match_count > 0 { if match_count > 0 {
if args.quiet() { if args.quiet() {
break; break;
} }
@@ -179,32 +182,53 @@ fn run_one_thread(args: Arc<Args>) -> Result<u64> {
} }
} }
paths_searched += 1; paths_searched += 1;
if dent.is_stdin() { match_count +=
worker.do_work(&mut printer, WorkReady::Stdin); if dent.is_stdin() {
} else { worker.run(&mut printer, Work::Stdin)
let file = match File::open(dent.path()) { } else {
Ok(file) => file, worker.run(&mut printer, Work::DirEntry(dent))
Err(err) => {
eprintln!("{}: {}", dent.path().display(), err);
continue;
}
}; };
worker.do_work(&mut printer, WorkReady::DirFile(dent, file));
}
} }
if !args.paths().is_empty() && paths_searched == 0 { if !args.paths().is_empty() && paths_searched == 0 {
eprintln!("No files were searched, which means ripgrep probably \ eprint_nothing_searched();
applied a filter you didn't expect. \
Try running again with --debug.");
} }
Ok(worker.match_count) Ok(match_count)
} }
fn run_files(args: Arc<Args>) -> Result<u64> { fn run_files_parallel(args: Arc<Args>) -> Result<u64> {
let print_args = args.clone();
let (tx, rx) = mpsc::channel::<ignore::DirEntry>();
let print_thread = thread::spawn(move || {
let term = print_args.stdout();
let mut printer = print_args.printer(term);
let mut file_count = 0;
for dent in rx.iter() {
printer.path(dent.path());
file_count += 1;
}
file_count
});
args.walker_parallel().run(move || {
let tx = tx.clone();
Box::new(move |result| {
if let Some(dent) = get_or_log_dir_entry(result) {
tx.send(dent).unwrap();
}
ignore::WalkState::Continue
})
});
Ok(print_thread.join().unwrap())
}
fn run_files_one_thread(args: Arc<Args>) -> Result<u64> {
let term = args.stdout(); let term = args.stdout();
let mut printer = args.printer(term); let mut printer = args.printer(term);
let mut file_count = 0; let mut file_count = 0;
for dent in args.walker() { for result in args.walker() {
let dent = match get_or_log_dir_entry(result) {
None => continue,
Some(dent) => dent,
};
printer.path(dent.path()); printer.path(dent.path());
file_count += 1; file_count += 1;
} }
@@ -222,163 +246,64 @@ fn run_types(args: Arc<Args>) -> Result<u64> {
Ok(ty_count) Ok(ty_count)
} }
enum Work { fn get_or_log_dir_entry(
Stdin, result: result::Result<ignore::DirEntry, ignore::Error>,
File(DirEntry), ) -> Option<ignore::DirEntry> {
Quit, match result {
} Err(err) => {
eprintln!("{}", err);
enum WorkReady { None
Stdin,
DirFile(DirEntry, File),
}
struct MultiWorker {
chan_work: Stealer<Work>,
quiet_matched: QuietMatched,
out: Arc<Mutex<Out>>,
#[cfg(not(windows))]
outbuf: Option<ColoredTerminal<term::TerminfoTerminal<Vec<u8>>>>,
#[cfg(windows)]
outbuf: Option<ColoredTerminal<WindowsBuffer>>,
worker: Worker,
}
struct Worker {
args: Arc<Args>,
inpbuf: InputBuffer,
grep: Grep,
match_count: u64,
}
impl MultiWorker {
fn run(mut self) -> u64 {
loop {
if self.quiet_matched.has_match() {
break;
}
let work = match self.chan_work.steal() {
Stolen::Empty | Stolen::Abort => continue,
Stolen::Data(Work::Quit) => break,
Stolen::Data(Work::Stdin) => WorkReady::Stdin,
Stolen::Data(Work::File(ent)) => {
match File::open(ent.path()) {
Ok(file) => WorkReady::DirFile(ent, file),
Err(err) => {
eprintln!("{}: {}", ent.path().display(), err);
continue;
}
}
}
};
let mut outbuf = self.outbuf.take().unwrap();
outbuf.clear();
let mut printer = self.worker.args.printer(outbuf);
self.worker.do_work(&mut printer, work);
if self.quiet_matched.set_match(self.worker.match_count > 0) {
break;
}
let outbuf = printer.into_inner();
if !outbuf.get_ref().is_empty() {
let mut out = self.out.lock().unwrap();
out.write(&outbuf);
}
self.outbuf = Some(outbuf);
} }
self.worker.match_count Ok(dent) => {
} if let Some(err) = dent.error() {
}
impl Worker {
fn do_work<W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
work: WorkReady,
) {
let result = match work {
WorkReady::Stdin => {
let stdin = io::stdin();
let stdin = stdin.lock();
self.search(printer, &Path::new("<stdin>"), stdin)
}
WorkReady::DirFile(ent, file) => {
let mut path = ent.path();
if let Some(p) = strip_prefix("./", path) {
path = p;
}
if self.args.mmap() {
self.search_mmap(printer, path, &file)
} else {
self.search(printer, path, file)
}
}
};
match result {
Ok(count) => {
self.match_count += count;
}
Err(err) => {
eprintln!("{}", err); eprintln!("{}", err);
} }
if !dent.file_type().map_or(true, |x| x.is_file()) {
None
} else {
Some(dent)
}
} }
} }
fn search<R: io::Read, W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
path: &Path,
rdr: R,
) -> Result<u64> {
self.args.searcher(
&mut self.inpbuf,
printer,
&self.grep,
path,
rdr,
).run().map_err(From::from)
}
fn search_mmap<W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
path: &Path,
file: &File,
) -> Result<u64> {
if try!(file.metadata()).len() == 0 {
// Opening a memory map with an empty file results in an error.
// However, this may not actually be an empty file! For example,
// /proc/cpuinfo reports itself as an empty file, but it can
// produce data when it's read from. Therefore, we fall back to
// regular read calls.
return self.search(printer, path, file);
}
let mmap = try!(Mmap::open(file, Protection::Read));
Ok(self.args.searcher_buffer(
printer,
&self.grep,
path,
unsafe { mmap.as_slice() },
).run())
}
} }
fn eprint_nothing_searched() {
eprintln!("No files were searched, which means ripgrep probably \
applied a filter you didn't expect. \
Try running again with --debug.");
}
/// A simple thread safe abstraction for determining whether a search should
/// stop if the user has requested quiet mode.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct QuietMatched(Arc<Option<AtomicBool>>); pub struct QuietMatched(Arc<Option<AtomicBool>>);
impl QuietMatched { impl QuietMatched {
fn new(quiet: bool) -> QuietMatched { /// Create a new QuietMatched value.
///
/// If quiet is true, then set_match and has_match will reflect whether
/// a search should quit or not because it found a match.
///
/// If quiet is false, then set_match is always a no-op and has_match
/// always returns false.
pub fn new(quiet: bool) -> QuietMatched {
let atomic = if quiet { Some(AtomicBool::new(false)) } else { None }; let atomic = if quiet { Some(AtomicBool::new(false)) } else { None };
QuietMatched(Arc::new(atomic)) QuietMatched(Arc::new(atomic))
} }
fn has_match(&self) -> bool { /// Returns true if and only if quiet mode is enabled and a match has
/// occurred.
pub fn has_match(&self) -> bool {
match *self.0 { match *self.0 {
None => false, None => false,
Some(ref matched) => matched.load(Ordering::SeqCst), Some(ref matched) => matched.load(Ordering::SeqCst),
} }
} }
fn set_match(&self, yes: bool) -> bool { /// Sets whether a match has occurred or not.
///
/// If quiet mode is disabled, then this is a no-op.
pub fn set_match(&self, yes: bool) -> bool {
match *self.0 { match *self.0 {
None => false, None => false,
Some(_) if !yes => false, Some(_) if !yes => false,

View File

@@ -158,6 +158,7 @@ impl<W: Terminal + Send> Printer<W> {
} }
/// Flushes the underlying writer and returns it. /// Flushes the underlying writer and returns it.
#[allow(dead_code)]
pub fn into_inner(mut self) -> W { pub fn into_inner(mut self) -> W {
let _ = self.wtr.flush(); let _ = self.wtr.flush();
self.wtr self.wtr

253
src/worker.rs Normal file
View File

@@ -0,0 +1,253 @@
use std::fs::File;
use std::io;
use std::path::Path;
use grep::Grep;
use ignore::DirEntry;
use memmap::{Mmap, Protection};
use term::Terminal;
use pathutil::strip_prefix;
use printer::Printer;
use search_buffer::BufferSearcher;
use search_stream::{InputBuffer, Searcher};
use Result;
pub enum Work {
Stdin,
DirEntry(DirEntry),
}
pub struct WorkerBuilder {
grep: Grep,
opts: Options,
}
#[derive(Clone, Debug)]
struct Options {
mmap: bool,
after_context: usize,
before_context: usize,
count: bool,
files_with_matches: bool,
eol: u8,
invert_match: bool,
line_number: bool,
quiet: bool,
text: bool,
}
impl Default for Options {
fn default() -> Options {
Options {
mmap: false,
after_context: 0,
before_context: 0,
count: false,
files_with_matches: false,
eol: b'\n',
invert_match: false,
line_number: false,
quiet: false,
text: false,
}
}
}
impl WorkerBuilder {
/// Create a new builder for a worker.
///
/// A reusable input buffer and a grep matcher are required, but there
/// are numerous additional options that can be configured on this builder.
pub fn new(grep: Grep) -> WorkerBuilder {
WorkerBuilder {
grep: grep,
opts: Options::default(),
}
}
/// Create the worker from this builder.
pub fn build(self) -> Worker {
let mut inpbuf = InputBuffer::new();
inpbuf.eol(self.opts.eol);
Worker {
grep: self.grep,
inpbuf: inpbuf,
opts: self.opts,
}
}
/// The number of contextual lines to show after each match. The default
/// is zero.
pub fn after_context(mut self, count: usize) -> Self {
self.opts.after_context = count;
self
}
/// The number of contextual lines to show before each match. The default
/// is zero.
pub fn before_context(mut self, count: usize) -> Self {
self.opts.before_context = count;
self
}
/// If enabled, searching will print a count instead of each match.
///
/// Disabled by default.
pub fn count(mut self, yes: bool) -> Self {
self.opts.count = yes;
self
}
/// If enabled, searching will print the path instead of each match.
///
/// Disabled by default.
pub fn files_with_matches(mut self, yes: bool) -> Self {
self.opts.files_with_matches = yes;
self
}
/// Set the end-of-line byte used by this searcher.
pub fn eol(mut self, eol: u8) -> Self {
self.opts.eol = eol;
self
}
/// If enabled, matching is inverted so that lines that *don't* match the
/// given pattern are treated as matches.
pub fn invert_match(mut self, yes: bool) -> Self {
self.opts.invert_match = yes;
self
}
/// If enabled, compute line numbers and prefix each line of output with
/// them.
pub fn line_number(mut self, yes: bool) -> Self {
self.opts.line_number = yes;
self
}
/// If enabled, try to use memory maps for searching if possible.
pub fn mmap(mut self, yes: bool) -> Self {
self.opts.mmap = yes;
self
}
/// If enabled, don't show any output and quit searching after the first
/// match is found.
pub fn quiet(mut self, yes: bool) -> Self {
self.opts.quiet = yes;
self
}
/// If enabled, search binary files as if they were text.
pub fn text(mut self, yes: bool) -> Self {
self.opts.text = yes;
self
}
}
/// Worker is responsible for executing searches on file paths, while choosing
/// streaming search or memory map search as appropriate.
pub struct Worker {
inpbuf: InputBuffer,
grep: Grep,
opts: Options,
}
impl Worker {
/// Execute the worker with the given printer and work item.
///
/// A work item can either be stdin or a file path.
pub fn run<W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
work: Work,
) -> u64 {
let result = match work {
Work::Stdin => {
let stdin = io::stdin();
let stdin = stdin.lock();
self.search(printer, &Path::new("<stdin>"), stdin)
}
Work::DirEntry(dent) => {
let mut path = dent.path();
let file = match File::open(path) {
Ok(file) => file,
Err(err) => {
eprintln!("{}: {}", path.display(), err);
return 0;
}
};
if let Some(p) = strip_prefix("./", path) {
path = p;
}
if self.opts.mmap {
self.search_mmap(printer, path, &file)
} else {
self.search(printer, path, file)
}
}
};
match result {
Ok(count) => {
count
}
Err(err) => {
eprintln!("{}", err);
0
}
}
}
fn search<R: io::Read, W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
path: &Path,
rdr: R,
) -> Result<u64> {
let searcher = Searcher::new(
&mut self.inpbuf, printer, &self.grep, path, rdr);
searcher
.after_context(self.opts.after_context)
.before_context(self.opts.before_context)
.count(self.opts.count)
.files_with_matches(self.opts.files_with_matches)
.eol(self.opts.eol)
.line_number(self.opts.line_number)
.invert_match(self.opts.invert_match)
.quiet(self.opts.quiet)
.text(self.opts.text)
.run()
.map_err(From::from)
}
fn search_mmap<W: Terminal + Send>(
&mut self,
printer: &mut Printer<W>,
path: &Path,
file: &File,
) -> Result<u64> {
if try!(file.metadata()).len() == 0 {
// Opening a memory map with an empty file results in an error.
// However, this may not actually be an empty file! For example,
// /proc/cpuinfo reports itself as an empty file, but it can
// produce data when it's read from. Therefore, we fall back to
// regular read calls.
return self.search(printer, path, file);
}
let mmap = try!(Mmap::open(file, Protection::Read));
let searcher = BufferSearcher::new(
printer, &self.grep, path, unsafe { mmap.as_slice() });
Ok(searcher
.count(self.opts.count)
.files_with_matches(self.opts.files_with_matches)
.eol(self.opts.eol)
.line_number(self.opts.line_number)
.invert_match(self.opts.invert_match)
.quiet(self.opts.quiet)
.text(self.opts.text)
.run())
}
}