diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 7ca1e98358..d764c534aa 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -1,19 +1,22 @@ -use std::{fs, sync::Arc, thread}; - -use crossbeam_channel::{Receiver, Sender}; +use std::{ + fs, + thread, + path::{Path, PathBuf}, + sync::{mpsc, Arc}, + time::Duration, +}; +use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; use relative_path::RelativePathBuf; use thread_worker::WorkerHandle; use walkdir::WalkDir; +use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; -mod watcher; -use watcher::Watcher; - -use crate::{RootFilter, Roots, VfsRoot}; +use crate::{RootConfig, Roots, VfsRoot}; pub(crate) enum Task { AddRoot { root: VfsRoot, - filter: Arc, + config: Arc, }, } @@ -39,6 +42,15 @@ pub enum TaskResult { }, } +#[derive(Debug)] +enum ChangeKind { + Create, + Write, + Remove, +} + +const WATCHER_DELAY: Duration = Duration::from_millis(250); + pub(crate) struct Worker { worker: thread_worker::Worker, worker_handle: WorkerHandle, @@ -46,24 +58,75 @@ pub(crate) struct Worker { impl Worker { pub(crate) fn start(roots: Arc) -> Worker { - let (worker, worker_handle) = - thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { - let mut watcher = match Watcher::start(roots, output_sender.clone()) { - Ok(w) => Some(w), - Err(e) => { - log::error!("could not start watcher: {}", e); - None + // This is a pretty elaborate setup of threads & channels! It is + // explained by the following concerns: + // * we need to burn a thread translating from notify's mpsc to + // crossbeam_channel. + // * we want to read all files from a single thread, to gurantee that + // we always get fresher versions and never go back in time. + // * we want to tear down everything neatly during shutdown. + let (worker, worker_handle) = thread_worker::spawn( + "vfs", + 128, + // This are the channels we use to communicate with outside world. + // If `input_receiver` is closed we need to tear ourselves down. + // `output_sender` should not be closed unless the parent died. + move |input_receiver, output_sender| { + // These are `std` channels notify will send events to + let (notify_sender, notify_receiver) = mpsc::channel(); + // These are the corresponding crossbeam channels + let (watcher_sender, watcher_receiver) = unbounded(); + + let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) + .map_err(|e| log::error!("failed to spawn notify {}", e)) + .ok(); + // Start a silly thread to tranform between two channels + let thread = thread::spawn(move || { + notify_receiver + .into_iter() + .for_each(|event| convert_notify_event(event, &watcher_sender)) + }); + + // Process requests from the called or notifications from + // watcher until the caller says stop. + loop { + select! { + // Received request from the caller. If this channel is + // closed, we should shutdown everything. + recv(input_receiver) -> t => match t { + Err(RecvError) => { + drop(input_receiver); + break + }, + Ok(Task::AddRoot { root, config }) => { + watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); + } + }, + // Watcher send us changes. If **this** channel is + // closed, the watcher has died, which indicates a bug + // -- escalate! + recv(watcher_receiver) -> event => match event { + Err(RecvError) => panic!("watcher is dead"), + Ok((path, change)) => { + handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); + } + }, } - }; - let res = input_receiver - .into_iter() - .filter_map(|t| handle_task(t, &mut watcher)) - .try_for_each(|it| output_sender.send(it)); - if let Some(watcher) = watcher { - let _ = watcher.shutdown(); } - res.unwrap() - }); + // Stopped the watcher + drop(watcher.take()); + // Drain pending events: we are not inrerested in them anyways! + watcher_receiver.into_iter().for_each(|_| ()); + + let res = thread.join(); + match &res { + Ok(()) => log::info!("... Watcher terminated with ok"), + Err(_) => log::error!("... Watcher terminated with err"), + } + res.unwrap(); + }, + ); + Worker { worker, worker_handle, @@ -84,46 +147,142 @@ impl Worker { } } -fn handle_task(task: Task, watcher: &mut Option) -> Option { - match task { - Task::AddRoot { root, filter } => { - if let Some(watcher) = watcher { - watcher.watch_root(&filter) - } - log::debug!("loading {} ...", filter.root.as_path().display()); - let files = load_root(filter.as_ref()); - log::debug!("... loaded {}", filter.root.as_path().display()); - Some(TaskResult::BulkLoadRoot { root, files }) +fn watch_root( + watcher: Option<&mut RecommendedWatcher>, + sender: &Sender, + root: VfsRoot, + config: Arc, +) { + log::debug!("loading {} ...", config.root.as_path().display()); + let files = watch_recursive(watcher, config.root.as_path(), &*config) + .into_iter() + .filter_map(|path| { + let abs_path = path.to_path(&config.root); + let text = read_to_string(&abs_path)?; + Some((path, text)) + }) + .collect(); + sender + .send(TaskResult::BulkLoadRoot { root, files }) + .unwrap(); + log::debug!("... loaded {}", config.root.as_path().display()); +} + +fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { + // forward relevant events only + match event { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + // TODO rescan all roots + } + DebouncedEvent::Create(path) => { + sender.send((path, ChangeKind::Create)).unwrap(); + } + DebouncedEvent::Write(path) => { + sender.send((path, ChangeKind::Write)).unwrap(); + } + DebouncedEvent::Remove(path) => { + sender.send((path, ChangeKind::Remove)).unwrap(); + } + DebouncedEvent::Rename(src, dst) => { + sender.send((src, ChangeKind::Remove)).unwrap(); + sender.send((dst, ChangeKind::Create)).unwrap(); + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); } } } -fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { - let mut res = Vec::new(); - for entry in WalkDir::new(&filter.root) - .into_iter() - .filter_entry(filter.entry_filter()) - { - let entry = match entry { - Ok(entry) => entry, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; +fn handle_change( + watcher: Option<&mut RecommendedWatcher>, + sender: &Sender, + roots: &Roots, + path: PathBuf, + kind: ChangeKind, +) { + let (root, rel_path) = match roots.find(&path) { + None => return, + Some(it) => it, + }; + let config = &roots[root]; + match kind { + ChangeKind::Create => { + let mut paths = Vec::new(); + if path.is_dir() { + paths.extend(watch_recursive(watcher, &path, &config)); + } else { + paths.push(rel_path); } - }; - if !entry.file_type().is_file() { - continue; + paths + .into_iter() + .filter_map(|rel_path| { + let abs_path = rel_path.to_path(&config.root); + let text = read_to_string(&abs_path)?; + Some((rel_path, text)) + }) + .try_for_each(|(path, text)| { + sender.send(TaskResult::AddSingleFile { root, path, text }) + }) + .unwrap() } - let path = entry.path(); - let text = match fs::read_to_string(path) { - Ok(text) => text, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; + ChangeKind::Write => { + if let Some(text) = read_to_string(&path) { + sender + .send(TaskResult::ChangeSingleFile { + root, + path: rel_path, + text, + }) + .unwrap(); } - }; - let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap(); - res.push((path.to_owned(), text)) + } + ChangeKind::Remove => sender + .send(TaskResult::RemoveSingleFile { + root, + path: rel_path, + }) + .unwrap(), } - res +} + +fn watch_recursive( + mut watcher: Option<&mut RecommendedWatcher>, + dir: &Path, + config: &RootConfig, +) -> Vec { + let mut files = Vec::new(); + for entry in WalkDir::new(dir) + .into_iter() + .filter_entry(|it| config.contains(it.path()).is_some()) + .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok()) + { + if entry.file_type().is_dir() { + if let Some(watcher) = &mut watcher { + watch_one(watcher, entry.path()); + } + } else { + let path = config.contains(entry.path()).unwrap(); + files.push(path.to_owned()); + } + } + files +} + +fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) { + match watcher.watch(dir, RecursiveMode::NonRecursive) { + Ok(()) => log::debug!("watching \"{}\"", dir.display()), + Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), + } +} + +fn read_to_string(path: &Path) -> Option { + fs::read_to_string(&path) + .map_err(|e| log::warn!("failed to read file {}", e)) + .ok() } diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs deleted file mode 100644 index ff6775f594..0000000000 --- a/crates/ra_vfs/src/io/watcher.rs +++ /dev/null @@ -1,200 +0,0 @@ -use crate::{io, RootFilter, Roots, VfsRoot}; -use crossbeam_channel::Sender; -use drop_bomb::DropBomb; -use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; -use parking_lot::Mutex; -use std::{ - fs, - path::{Path, PathBuf}, - sync::{mpsc, Arc}, - thread, - time::Duration, -}; -use walkdir::WalkDir; - -#[derive(Debug)] -enum ChangeKind { - Create, - Write, - Remove, -} - -const WATCHER_DELAY: Duration = Duration::from_millis(250); - -pub(crate) struct Watcher { - thread: thread::JoinHandle<()>, - bomb: DropBomb, - watcher: Arc>>, -} - -impl Watcher { - pub(crate) fn start( - roots: Arc, - output_sender: Sender, - ) -> Result> { - let (input_sender, input_receiver) = mpsc::channel(); - let watcher = Arc::new(Mutex::new(Some(notify::watcher( - input_sender, - WATCHER_DELAY, - )?))); - let sender = output_sender.clone(); - let watcher_clone = watcher.clone(); - let thread = thread::spawn(move || { - let worker = WatcherWorker { - roots, - watcher: watcher_clone, - sender, - }; - input_receiver - .into_iter() - // forward relevant events only - .try_for_each(|change| worker.handle_debounced_event(change)) - .unwrap() - }); - Ok(Watcher { - thread, - watcher, - bomb: DropBomb::new(format!("Watcher was not shutdown")), - }) - } - - pub fn watch_root(&mut self, filter: &RootFilter) { - for res in WalkDir::new(&filter.root) - .into_iter() - .filter_entry(filter.entry_filter()) - { - match res { - Ok(entry) => { - if entry.file_type().is_dir() { - watch_one(self.watcher.as_ref(), entry.path()); - } - } - Err(e) => log::warn!("watcher error: {}", e), - } - } - } - - pub fn shutdown(mut self) -> thread::Result<()> { - self.bomb.defuse(); - drop(self.watcher.lock().take()); - let res = self.thread.join(); - match &res { - Ok(()) => log::info!("... Watcher terminated with ok"), - Err(_) => log::error!("... Watcher terminated with err"), - } - res - } -} - -struct WatcherWorker { - watcher: Arc>>, - roots: Arc, - sender: Sender, -} - -impl WatcherWorker { - fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), Box> { - match ev { - DebouncedEvent::NoticeWrite(_) - | DebouncedEvent::NoticeRemove(_) - | DebouncedEvent::Chmod(_) => { - // ignore - } - DebouncedEvent::Rescan => { - // TODO rescan all roots - } - DebouncedEvent::Create(path) => { - self.handle_change(path, ChangeKind::Create); - } - DebouncedEvent::Write(path) => { - self.handle_change(path, ChangeKind::Write); - } - DebouncedEvent::Remove(path) => { - self.handle_change(path, ChangeKind::Remove); - } - DebouncedEvent::Rename(src, dst) => { - self.handle_change(src, ChangeKind::Remove); - self.handle_change(dst, ChangeKind::Create); - } - DebouncedEvent::Error(err, path) => { - // TODO should we reload the file contents? - log::warn!("watcher error \"{}\", {:?}", err, path); - } - } - Ok(()) - } - - fn handle_change(&self, path: PathBuf, kind: ChangeKind) { - if let Err(e) = self.try_handle_change(path, kind) { - log::warn!("watcher error: {}", e) - } - } - - fn try_handle_change( - &self, - path: PathBuf, - kind: ChangeKind, - ) -> Result<(), Box> { - let (root, rel_path) = match self.roots.find(&path) { - Some(x) => x, - None => return Ok(()), - }; - match kind { - ChangeKind::Create => { - if path.is_dir() { - self.watch_recursive(&path, root); - } else { - let text = fs::read_to_string(&path)?; - self.sender.send(io::TaskResult::AddSingleFile { - root, - path: rel_path, - text, - })? - } - } - ChangeKind::Write => { - let text = fs::read_to_string(&path)?; - self.sender.send(io::TaskResult::ChangeSingleFile { - root, - path: rel_path, - text, - })? - } - ChangeKind::Remove => self.sender.send(io::TaskResult::RemoveSingleFile { - root, - path: rel_path, - })?, - } - Ok(()) - } - - fn watch_recursive(&self, dir: &Path, root: VfsRoot) { - let filter = &self.roots[root]; - for res in WalkDir::new(dir) - .into_iter() - .filter_entry(filter.entry_filter()) - { - match res { - Ok(entry) => { - if entry.file_type().is_dir() { - watch_one(self.watcher.as_ref(), entry.path()); - } else { - // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching - // emit as create because we haven't seen it yet - self.handle_change(entry.path().to_path_buf(), ChangeKind::Create); - } - } - Err(e) => log::warn!("watcher error: {}", e), - } - } - } -} - -fn watch_one(watcher: &Mutex>, dir: &Path) { - if let Some(watcher) = watcher.lock().as_mut() { - match watcher.watch(dir, RecursiveMode::NonRecursive) { - Ok(()) => log::debug!("watching \"{}\"", dir.display()), - Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), - } - } -} diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 70a13f7655..71a3f807d5 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -18,94 +18,78 @@ mod io; use std::{ cmp::Reverse, fmt, fs, mem, - ops::{Deref, DerefMut}, path::{Path, PathBuf}, sync::Arc, thread, }; use crossbeam_channel::Receiver; -use ra_arena::{impl_arena_id, Arena, RawId}; +use ra_arena::{impl_arena_id, Arena, RawId, map::ArenaMap}; use relative_path::{Component, RelativePath, RelativePathBuf}; use rustc_hash::{FxHashMap, FxHashSet}; -use walkdir::DirEntry; pub use crate::io::TaskResult as VfsTask; use io::{TaskResult, Worker}; -/// `RootFilter` is a predicate that checks if a file can belong to a root. If -/// several filters match a file (nested dirs), the most nested one wins. -pub(crate) struct RootFilter { - root: PathBuf, - filter: fn(&Path, &RelativePath) -> bool, - excluded_dirs: Vec, -} - -impl RootFilter { - fn new(root: PathBuf, excluded_dirs: Vec) -> RootFilter { - RootFilter { - root, - filter: default_filter, - excluded_dirs, - } - } - /// Check if this root can contain `path`. NB: even if this returns - /// true, the `path` might actually be conained in some nested root. - pub(crate) fn can_contain(&self, path: &Path) -> Option { - let rel_path = path.strip_prefix(&self.root).ok()?; - let rel_path = RelativePathBuf::from_path(rel_path).ok()?; - if !(self.filter)(path, rel_path.as_relative_path()) { - return None; - } - Some(rel_path) - } - - pub(crate) fn entry_filter<'a>(&'a self) -> impl FnMut(&DirEntry) -> bool + 'a { - move |entry: &DirEntry| { - if entry.file_type().is_dir() && self.excluded_dirs.iter().any(|it| it == entry.path()) - { - // do not walk nested roots - false - } else { - self.can_contain(entry.path()).is_some() - } - } - } -} - -pub(crate) fn default_filter(path: &Path, rel_path: &RelativePath) -> bool { - if path.is_dir() { - for (i, c) in rel_path.components().enumerate() { - if let Component::Normal(c) = c { - // TODO hardcoded for now - if (i == 0 && c == "target") || c == ".git" || c == "node_modules" { - return false; - } - } - } - true - } else { - rel_path.extension() == Some("rs") - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct VfsRoot(pub RawId); impl_arena_id!(VfsRoot); -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct VfsFile(pub RawId); -impl_arena_id!(VfsFile); - -struct VfsFileData { - root: VfsRoot, - path: RelativePathBuf, - is_overlayed: bool, - text: Arc, +/// Describes the contents of a single source root. +/// +/// `RootConfig` can be thought of as a glob pattern like `src/**.rs` whihc +/// specifes the source root or as a function whihc takes a `PathBuf` and +/// returns `true` iff path belongs to the source root +pub(crate) struct RootConfig { + root: PathBuf, + excluded_dirs: Vec, } pub(crate) struct Roots { - roots: Arena>, + roots: Arena>, +} + +impl std::ops::Deref for Roots { + type Target = Arena>; + fn deref(&self) -> &Self::Target { + &self.roots + } +} + +impl RootConfig { + fn new(root: PathBuf, excluded_dirs: Vec) -> RootConfig { + RootConfig { + root, + excluded_dirs, + } + } + /// Cheks if root contains a path and returns a root-relative path. + pub(crate) fn contains(&self, path: &Path) -> Option { + // First, check excluded dirs + if self.excluded_dirs.iter().any(|it| path.starts_with(it)) { + return None; + } + let rel_path = path.strip_prefix(&self.root).ok()?; + let rel_path = RelativePathBuf::from_path(rel_path).ok()?; + + // Ignore some common directories. + // + // FIXME: don't hard-code, specify at source-root creation time using + // gitignore + for (i, c) in rel_path.components().enumerate() { + if let Component::Normal(c) = c { + if (i == 0 && c == "target") || c == ".git" || c == "node_modules" { + return None; + } + } + } + + if path.is_file() && rel_path.extension() != Some("rs") { + return None; + } + + Some(rel_path) + } } impl Roots { @@ -120,59 +104,61 @@ impl Roots { .map(|it| it.clone()) .collect::>(); - let root_filter = Arc::new(RootFilter::new(path.clone(), nested_roots)); + let config = Arc::new(RootConfig::new(path.clone(), nested_roots)); - roots.alloc(root_filter.clone()); + roots.alloc(config.clone()); } Roots { roots } } pub(crate) fn find(&self, path: &Path) -> Option<(VfsRoot, RelativePathBuf)> { self.roots .iter() - .find_map(|(root, data)| data.can_contain(path).map(|it| (root, it))) + .find_map(|(root, data)| data.contains(path).map(|it| (root, it))) } } -impl Deref for Roots { - type Target = Arena>; - fn deref(&self) -> &Self::Target { - &self.roots - } -} +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct VfsFile(pub RawId); +impl_arena_id!(VfsFile); -impl DerefMut for Roots { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.roots - } +struct VfsFileData { + root: VfsRoot, + path: RelativePathBuf, + is_overlayed: bool, + text: Arc, } pub struct Vfs { roots: Arc, files: Arena, - root2files: FxHashMap>, + root2files: ArenaMap>, pending_changes: Vec, worker: Worker, } impl fmt::Debug for Vfs { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("Vfs { ... }") + f.debug_struct("Vfs") + .field("n_roots", &self.roots.len()) + .field("n_files", &self.files.len()) + .field("n_pending_changes", &self.pending_changes.len()) + .finish() } } impl Vfs { pub fn new(roots: Vec) -> (Vfs, Vec) { let roots = Arc::new(Roots::new(roots)); - let worker = io::Worker::start(roots.clone()); - let mut root2files = FxHashMap::default(); + let worker = io::Worker::start(Arc::clone(&roots)); + let mut root2files = ArenaMap::default(); - for (root, filter) in roots.iter() { + for (root, config) in roots.iter() { root2files.insert(root, Default::default()); worker .sender() .send(io::Task::AddRoot { root, - filter: filter.clone(), + config: Arc::clone(config), }) .unwrap(); } @@ -242,7 +228,7 @@ impl Vfs { let mut cur_files = Vec::new(); // While we were scanning the root in the backgound, a file might have // been open in the editor, so we need to account for that. - let exising = self.root2files[&root] + let exising = self.root2files[root] .iter() .map(|&file| (self.files[file].path.clone(), file)) .collect::>(); @@ -384,7 +370,7 @@ impl Vfs { is_overlayed, }; let file = self.files.alloc(data); - self.root2files.get_mut(&root).unwrap().insert(file); + self.root2files.get_mut(root).unwrap().insert(file); file } @@ -399,7 +385,7 @@ impl Vfs { self.files[file].text = Default::default(); self.files[file].path = Default::default(); let root = self.files[file].root; - let removed = self.root2files.get_mut(&root).unwrap().remove(&file); + let removed = self.root2files.get_mut(root).unwrap().remove(&file); assert!(removed); } @@ -410,7 +396,7 @@ impl Vfs { } fn find_file(&self, root: VfsRoot, path: &RelativePath) -> Option { - self.root2files[&root] + self.root2files[root] .iter() .map(|&it| it) .find(|&file| self.files[file].path == path)