refactor, put watcher with io::Worker

use `RootFilter` to filter recursive watches
untested
This commit is contained in:
Bernardo 2019-01-20 22:13:21 +01:00 committed by Aleksey Kladov
parent eacf7aeb42
commit f88355ccb5
5 changed files with 188 additions and 172 deletions

32
Cargo.lock generated
View file

@ -446,18 +446,6 @@ name = "glob"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "globset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"aho-corasick 0.6.9 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "heck"
version = "0.3.1"
@ -481,23 +469,6 @@ dependencies = [
"unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ignore"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"globset 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"regex 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"same-file 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
"thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"walkdir 2.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "im"
version = "12.3.0"
@ -1039,7 +1010,6 @@ dependencies = [
"crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"flexi_logger 0.10.5 (registry+https://github.com/rust-lang/crates.io-index)",
"ignore 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"notify 4.0.6 (git+https://github.com/vemoo/notify/?branch=v4-legacy)",
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1899,11 +1869,9 @@ dependencies = [
"checksum futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)" = "49e7653e374fe0d0c12de4250f0bdb60680b8c80eed558c5c7538eec9c89e21b"
"checksum generic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef25c5683767570c2bbd7deba372926a55eaae9982d7726ee2a1050239d45b9d"
"checksum glob 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "8be18de09a56b60ed0edf84bc9df007e30040691af7acd1c41874faac5895bfb"
"checksum globset 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4743617a7464bbda3c8aec8558ff2f9429047e025771037df561d383337ff865"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
"checksum humansize 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b6cab2627acfc432780848602f3f558f7e9dd427352224b0d9324025796d2a5e"
"checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e"
"checksum ignore 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ad03ca67dc12474ecd91fdb94d758cbd20cb4e7a78ebe831df26a9b7511e1162"
"checksum im 12.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0627d417829c1d763d602687634869f254fc79f7e22dea6c824dab993db857e4"
"checksum indexmap 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7e81a7c05f79578dbc15793d8b619db9ba32b4577003ef3af1a91c416798c58d"
"checksum inotify 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "40b54539f3910d6f84fbf9a643efd6e3aa6e4f001426c0329576128255994718"

View file

@ -12,7 +12,6 @@ crossbeam-channel = "0.3.5"
log = "0.4.6"
# until https://github.com/passcod/notify/issues/169 is fixed
notify = { git = "https://github.com/vemoo/notify/", branch = "v4-legacy" }
ignore = "0.4"
drop_bomb = "0.1.0"
parking_lot = "0.7.0"

View file

@ -1,13 +1,20 @@
use std::{
fmt, fs,
path::{Path, PathBuf},
sync::Arc,
thread,
};
use crossbeam_channel::{Receiver, Sender};
use parking_lot::Mutex;
use relative_path::RelativePathBuf;
use thread_worker::WorkerHandle;
use walkdir::{DirEntry, WalkDir};
use crate::{has_rs_extension, watcher::WatcherChange, VfsRoot};
use crate::{
watcher::{Watcher, WatcherChange},
VfsRoot,
};
pub(crate) enum Task {
AddRoot {
@ -17,6 +24,10 @@ pub(crate) enum Task {
},
HandleChange(WatcherChange),
LoadChange(WatcherChange),
Watch {
dir: PathBuf,
filter: Box<Fn(&DirEntry) -> bool + Send>,
},
}
#[derive(Debug)]
@ -35,7 +46,8 @@ pub enum WatcherChangeData {
pub enum TaskResult {
AddRoot(AddRootResult),
HandleChange(WatcherChange),
LoadChange(Option<WatcherChangeData>),
LoadChange(WatcherChangeData),
NoOp,
}
impl fmt::Debug for TaskResult {
@ -44,21 +56,74 @@ impl fmt::Debug for TaskResult {
}
}
pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>;
pub(crate) fn start() -> (Worker, WorkerHandle) {
thread_worker::spawn("vfs", 128, |input_receiver, output_sender| {
input_receiver
.into_iter()
.map(handle_task)
.try_for_each(|it| output_sender.send(it))
.unwrap()
})
pub(crate) struct Worker {
worker: thread_worker::Worker<Task, TaskResult>,
worker_handle: WorkerHandle,
watcher: Arc<Mutex<Option<Watcher>>>,
}
fn handle_task(task: Task) -> TaskResult {
impl Worker {
pub(crate) fn start() -> Worker {
let watcher = Arc::new(Mutex::new(None));
let watcher_clone = watcher.clone();
let (worker, worker_handle) =
thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
let res = input_receiver
.into_iter()
.map(|t| handle_task(t, &watcher_clone))
.try_for_each(|it| output_sender.send(it));
res.unwrap()
});
match Watcher::start(worker.inp.clone()) {
Ok(w) => {
watcher.lock().replace(w);
}
Err(e) => log::error!("could not start watcher: {}", e),
};
Worker {
worker,
worker_handle,
watcher,
}
}
pub(crate) fn sender(&self) -> &Sender<Task> {
&self.worker.inp
}
pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
&self.worker.out
}
pub(crate) fn shutdown(self) -> thread::Result<()> {
if let Some(watcher) = self.watcher.lock().take() {
let _ = watcher.shutdown();
}
self.worker_handle.shutdown()
}
}
fn watch(
watcher: &Arc<Mutex<Option<Watcher>>>,
dir: &Path,
filter_entry: impl Fn(&DirEntry) -> bool,
emit_for_existing: bool,
) {
let mut watcher = watcher.lock();
let watcher = match *watcher {
Some(ref mut w) => w,
None => {
// watcher dropped or couldn't start
return;
}
};
watcher.watch_recursive(dir, filter_entry, emit_for_existing)
}
fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> TaskResult {
match task {
Task::AddRoot { root, path, filter } => {
watch(watcher, &path, &*filter, false);
log::debug!("loading {} ...", path.as_path().display());
let files = load_root(path.as_path(), &*filter);
log::debug!("... loaded {}", path.as_path().display());
@ -70,8 +135,14 @@ fn handle_task(task: Task) -> TaskResult {
}
Task::LoadChange(change) => {
log::debug!("loading {:?} ...", change);
let data = load_change(change);
TaskResult::LoadChange(data)
match load_change(change) {
Some(data) => TaskResult::LoadChange(data),
None => TaskResult::NoOp,
}
}
Task::Watch { dir, filter } => {
watch(watcher, &dir, &*filter, true);
TaskResult::NoOp
}
}
}
@ -90,9 +161,6 @@ fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePa
continue;
}
let path = entry.path();
if !has_rs_extension(path) {
continue;
}
let text = match fs::read_to_string(path) {
Ok(text) => text,
Err(e) => {
@ -109,6 +177,9 @@ fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePa
fn load_change(change: WatcherChange) -> Option<WatcherChangeData> {
let data = match change {
WatcherChange::Create(path) => {
if path.is_dir() {
return None;
}
let text = match fs::read_to_string(&path) {
Ok(text) => text,
Err(e) => {

View file

@ -20,7 +20,7 @@ use std::{
cmp::Reverse,
ffi::OsStr,
fmt, fs, mem,
path::{Path, PathBuf},
path::{Component, Path, PathBuf},
sync::Arc,
thread,
};
@ -29,30 +29,37 @@ use crossbeam_channel::Receiver;
use ra_arena::{impl_arena_id, Arena, RawId};
use relative_path::RelativePathBuf;
use rustc_hash::{FxHashMap, FxHashSet};
use thread_worker::WorkerHandle;
use walkdir::DirEntry;
pub use crate::io::TaskResult as VfsTask;
pub use crate::watcher::{Watcher, WatcherChange};
pub use crate::watcher::WatcherChange;
/// `RootFilter` is a predicate that checks if a file can belong to a root. If
/// several filters match a file (nested dirs), the most nested one wins.
struct RootFilter {
pub(crate) struct RootFilter {
root: PathBuf,
file_filter: fn(&Path) -> bool,
filter: fn(RootEntry) -> bool,
}
pub(crate) struct RootEntry<'a, 'b> {
root: &'a Path,
path: &'b Path,
}
impl RootFilter {
fn new(root: PathBuf) -> RootFilter {
RootFilter {
root,
file_filter: has_rs_extension,
filter: default_filter,
}
}
/// Check if this root can contain `path`. NB: even if this returns
/// true, the `path` might actually be conained in some nested root.
fn can_contain(&self, path: &Path) -> Option<RelativePathBuf> {
if !(self.file_filter)(path) {
pub(crate) fn can_contain(&self, path: &Path) -> Option<RelativePathBuf> {
if !(self.filter)(RootEntry {
root: &self.root,
path,
}) {
return None;
}
let path = path.strip_prefix(&self.root).ok()?;
@ -60,8 +67,17 @@ impl RootFilter {
}
}
pub(crate) fn has_rs_extension(p: &Path) -> bool {
p.extension() == Some(OsStr::new("rs"))
pub(crate) fn default_filter(entry: RootEntry) -> bool {
if entry.path.is_dir() {
// first component relative to root is "target"
entry
.path
.strip_prefix(entry.root)
.map(|p| p.components().next() != Some(Component::Normal(OsStr::new("target"))))
.unwrap_or(false)
} else {
entry.path.extension() == Some(OsStr::new("rs"))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
@ -80,13 +96,11 @@ struct VfsFileData {
}
pub struct Vfs {
roots: Arena<VfsRoot, RootFilter>,
roots: Arena<VfsRoot, Arc<RootFilter>>,
files: Arena<VfsFile, VfsFileData>,
root2files: FxHashMap<VfsRoot, FxHashSet<VfsFile>>,
pending_changes: Vec<VfsChange>,
worker: io::Worker,
worker_handle: WorkerHandle,
watcher: Option<Watcher>,
}
impl fmt::Debug for Vfs {
@ -97,41 +111,35 @@ impl fmt::Debug for Vfs {
impl Vfs {
pub fn new(mut roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
let (worker, worker_handle) = io::start();
let watcher = match Watcher::start(worker.inp.clone()) {
Ok(watcher) => Some(watcher),
Err(e) => {
log::error!("could not start watcher: {}", e);
None
}
};
let worker = io::Worker::start();
let mut res = Vfs {
roots: Arena::default(),
files: Arena::default(),
root2files: FxHashMap::default(),
worker,
worker_handle,
watcher,
pending_changes: Vec::new(),
};
// A hack to make nesting work.
roots.sort_by_key(|it| Reverse(it.as_os_str().len()));
for (i, path) in roots.iter().enumerate() {
let root = res.roots.alloc(RootFilter::new(path.clone()));
let root_filter = Arc::new(RootFilter::new(path.clone()));
let root = res.roots.alloc(root_filter.clone());
res.root2files.insert(root, Default::default());
let nested = roots[..i]
.iter()
.filter(|it| it.starts_with(path))
.map(|it| it.clone())
.collect::<Vec<_>>();
let filter = move |entry: &DirEntry| {
if entry.file_type().is_file() {
has_rs_extension(entry.path())
if entry.file_type().is_dir() && nested.iter().any(|it| it == entry.path()) {
false
} else {
nested.iter().all(|it| it != entry.path())
root_filter.can_contain(entry.path()).is_some()
}
};
let task = io::Task::AddRoot {
@ -139,10 +147,7 @@ impl Vfs {
path: path.clone(),
filter: Box::new(filter),
};
res.worker.inp.send(task).unwrap();
if let Some(ref mut watcher) = res.watcher {
watcher.watch(path);
}
res.worker.sender().send(task).unwrap();
}
let roots = res.roots.iter().map(|(id, _)| id).collect();
(res, roots)
@ -194,7 +199,7 @@ impl Vfs {
}
pub fn task_receiver(&self) -> &Receiver<io::TaskResult> {
&self.worker.out
self.worker.receiver()
}
pub fn handle_task(&mut self, task: io::TaskResult) {
@ -225,19 +230,35 @@ impl Vfs {
self.pending_changes.push(change);
}
io::TaskResult::HandleChange(change) => match &change {
watcher::WatcherChange::Create(path) if path.is_dir() => {
if let Some((root, _path, _file)) = self.find_root(&path) {
let root_filter = self.roots[root].clone();
let filter =
move |entry: &DirEntry| root_filter.can_contain(entry.path()).is_some();
self.worker
.sender()
.send(io::Task::Watch {
dir: path.to_path_buf(),
filter: Box::new(filter),
})
.unwrap()
}
}
watcher::WatcherChange::Create(path)
| watcher::WatcherChange::Remove(path)
| watcher::WatcherChange::Write(path) => {
if self.should_handle_change(&path) {
self.worker.inp.send(io::Task::LoadChange(change)).unwrap()
self.worker
.sender()
.send(io::Task::LoadChange(change))
.unwrap()
}
}
watcher::WatcherChange::Rescan => {
// TODO we should reload all files
}
},
io::TaskResult::LoadChange(None) => {}
io::TaskResult::LoadChange(Some(change)) => match change {
io::TaskResult::LoadChange(change) => match change {
io::WatcherChangeData::Create { path, text }
| io::WatcherChangeData::Write { path, text } => {
if let Some((root, path, file)) = self.find_root(&path) {
@ -256,6 +277,7 @@ impl Vfs {
}
}
},
io::TaskResult::NoOp => {}
}
}
@ -359,11 +381,7 @@ impl Vfs {
/// Sutdown the VFS and terminate the background watching thread.
pub fn shutdown(self) -> thread::Result<()> {
if let Some(watcher) = self.watcher {
let _ = watcher.shutdown();
}
let _ = self.worker.shutdown();
self.worker_handle.shutdown()
self.worker.shutdown()
}
fn add_file(

View file

@ -1,20 +1,20 @@
use crate::io;
use crossbeam_channel::Sender;
use drop_bomb::DropBomb;
use ignore::{gitignore::Gitignore, Walk};
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
use parking_lot::Mutex;
use std::{
path::{Path, PathBuf},
sync::{mpsc, Arc},
sync::mpsc,
thread,
time::Duration,
};
use walkdir::{DirEntry, WalkDir};
pub struct Watcher {
watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
pub(crate) struct Watcher {
watcher: RecommendedWatcher,
thread: thread::JoinHandle<()>,
bomb: DropBomb,
sender: Sender<io::Task>,
}
#[derive(Debug)]
@ -28,7 +28,6 @@ pub enum WatcherChange {
fn handle_change_event(
ev: DebouncedEvent,
sender: &Sender<io::Task>,
watcher: &Arc<Mutex<Option<RecommendedWatcher>>>,
) -> Result<(), Box<std::error::Error>> {
match ev {
DebouncedEvent::NoticeWrite(_)
@ -40,12 +39,6 @@ fn handle_change_event(
sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
}
DebouncedEvent::Create(path) => {
// we have to check if `path` is ignored because Walk iterator doesn't check it
// also childs are only ignored if they match a pattern
// (see `matched` vs `matched_path_or_any_parents` in `Gitignore`)
if path.is_dir() && !should_ignore_dir(&path) {
watch_recursive(watcher, &path, Some(sender));
}
sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
}
DebouncedEvent::Write(path) => {
@ -66,65 +59,6 @@ fn handle_change_event(
Ok(())
}
fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) {
match watcher.watch(dir, RecursiveMode::NonRecursive) {
Ok(()) => log::debug!("watching \"{}\"", dir.display()),
Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
}
}
fn watch_recursive(
watcher: &Arc<Mutex<Option<RecommendedWatcher>>>,
dir: &Path,
sender: Option<&Sender<io::Task>>,
) {
let mut watcher = watcher.lock();
let mut watcher = match *watcher {
Some(ref mut watcher) => watcher,
None => {
// watcher has been dropped
return;
}
};
for res in Walk::new(dir) {
match res {
Ok(entry) => {
if entry.path().is_dir() {
watch_one(&mut watcher, entry.path());
}
if let Some(sender) = sender {
// emit as create because we haven't seen it yet
if let Err(e) = sender.send(io::Task::HandleChange(WatcherChange::Create(
entry.path().to_path_buf(),
))) {
log::warn!("watcher error: {}", e)
}
}
}
Err(e) => log::warn!("watcher error: {}", e),
}
}
}
fn should_ignore_dir(dir: &Path) -> bool {
let mut parent = dir;
loop {
parent = match parent.parent() {
Some(p) => p,
None => break,
};
let gitignore = parent.join(".gitignore");
if gitignore.exists() {
let gitignore = Gitignore::new(gitignore).0;
if gitignore.matched_path_or_any_parents(dir, true).is_ignore() {
log::debug!("ignored {}", dir.display());
return true;
}
}
}
false
}
const WATCHER_DELAY: Duration = Duration::from_millis(250);
impl Watcher {
@ -132,32 +66,58 @@ impl Watcher {
output_sender: Sender<io::Task>,
) -> Result<Watcher, Box<std::error::Error>> {
let (input_sender, input_receiver) = mpsc::channel();
let watcher = Arc::new(Mutex::new(Some(notify::watcher(
input_sender,
WATCHER_DELAY,
)?)));
let w = watcher.clone();
let watcher = notify::watcher(input_sender, WATCHER_DELAY)?;
let sender = output_sender.clone();
let thread = thread::spawn(move || {
input_receiver
.into_iter()
// forward relevant events only
.try_for_each(|change| handle_change_event(change, &output_sender, &w))
.try_for_each(|change| handle_change_event(change, &output_sender))
.unwrap()
});
Ok(Watcher {
watcher,
thread,
sender,
bomb: DropBomb::new(format!("Watcher was not shutdown")),
})
}
pub fn watch(&mut self, root: impl AsRef<Path>) {
watch_recursive(&self.watcher, root.as_ref(), None);
pub fn watch_recursive(
&mut self,
dir: &Path,
filter_entry: impl Fn(&DirEntry) -> bool,
emit_for_existing: bool,
) {
for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) {
match res {
Ok(entry) => {
if entry.path().is_dir() {
match self.watcher.watch(dir, RecursiveMode::NonRecursive) {
Ok(()) => log::debug!("watching \"{}\"", dir.display()),
Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
}
}
if emit_for_existing {
// emit as create because we haven't seen it yet
if let Err(e) =
self.sender
.send(io::Task::HandleChange(WatcherChange::Create(
entry.path().to_path_buf(),
)))
{
log::warn!("watcher error: {}", e)
}
}
}
Err(e) => log::warn!("watcher error: {}", e),
}
}
}
pub fn shutdown(mut self) -> thread::Result<()> {
self.bomb.defuse();
drop(self.watcher.lock().take());
drop(self.watcher);
let res = self.thread.join();
match &res {
Ok(()) => log::info!("... Watcher terminated with ok"),