//! An implementation of `loader::Handle`, based on `walkdir` and `notify`. //! //! The file watching bits here are untested and quite probably buggy. For this //! reason, by default we don't watch files and rely on editor's file watching //! capabilities. //! //! Hopefully, one day a reliable file watching/walking crate appears on //! crates.io, and we can reduce this to trivial glue code. use std::{ fs, path::{Component, Path}, sync::atomic::AtomicUsize, }; use crossbeam_channel::{select, unbounded, Receiver, Sender}; use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use paths::{AbsPath, AbsPathBuf, Utf8PathBuf}; use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator}; use rustc_hash::FxHashSet; use vfs::loader::{self, LoadingProgress}; use walkdir::WalkDir; #[derive(Debug)] pub struct NotifyHandle { // Relative order of fields below is significant. sender: Sender, _thread: stdx::thread::JoinHandle, } #[derive(Debug)] enum Message { Config(loader::Config), Invalidate(AbsPathBuf), } impl loader::Handle for NotifyHandle { fn spawn(sender: loader::Sender) -> NotifyHandle { let actor = NotifyActor::new(sender); let (sender, receiver) = unbounded::(); let thread = stdx::thread::Builder::new(stdx::thread::ThreadIntent::Worker) .name("VfsLoader".to_owned()) .spawn(move || actor.run(receiver)) .expect("failed to spawn thread"); NotifyHandle { sender, _thread: thread } } fn set_config(&mut self, config: loader::Config) { self.sender.send(Message::Config(config)).unwrap(); } fn invalidate(&mut self, path: AbsPathBuf) { self.sender.send(Message::Invalidate(path)).unwrap(); } fn load_sync(&mut self, path: &AbsPath) -> Option> { read(path) } } type NotifyEvent = notify::Result; struct NotifyActor { sender: loader::Sender, watched_file_entries: FxHashSet, watched_dir_entries: Vec, // Drop order is significant. watcher: Option<(RecommendedWatcher, Receiver)>, } #[derive(Debug)] enum Event { Message(Message), NotifyEvent(NotifyEvent), } impl NotifyActor { fn new(sender: loader::Sender) -> NotifyActor { NotifyActor { sender, watched_dir_entries: Vec::new(), watched_file_entries: FxHashSet::default(), watcher: None, } } fn next_event(&self, receiver: &Receiver) -> Option { let Some((_, watcher_receiver)) = &self.watcher else { return receiver.recv().ok().map(Event::Message); }; select! { recv(receiver) -> it => it.ok().map(Event::Message), recv(watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())), } } fn run(mut self, inbox: Receiver) { while let Some(event) = self.next_event(&inbox) { tracing::debug!(?event, "vfs-notify event"); match event { Event::Message(msg) => match msg { Message::Config(config) => { self.watcher = None; if !config.watch.is_empty() { let (watcher_sender, watcher_receiver) = unbounded(); let watcher = log_notify_error(RecommendedWatcher::new( move |event| { // we don't care about the error. If sending fails that usually // means we were dropped, so unwrapping will just add to the // panic noise. _ = watcher_sender.send(event); }, Config::default(), )); self.watcher = watcher.map(|it| (it, watcher_receiver)); } let config_version = config.version; let n_total = config.load.len(); self.watched_dir_entries.clear(); self.watched_file_entries.clear(); self.send(loader::Message::Progress { n_total, n_done: LoadingProgress::Started, config_version, dir: None, }); let (entry_tx, entry_rx) = unbounded(); let (watch_tx, watch_rx) = unbounded(); let processed = AtomicUsize::new(0); config.load.into_par_iter().enumerate().for_each(|(i, entry)| { let do_watch = config.watch.contains(&i); if do_watch { _ = entry_tx.send(entry.clone()); } let files = Self::load_entry( |f| _ = watch_tx.send(f.to_owned()), entry, do_watch, |file| { self.send(loader::Message::Progress { n_total, n_done: LoadingProgress::Progress( processed.load(std::sync::atomic::Ordering::Relaxed), ), dir: Some(file), config_version, }); }, ); self.send(loader::Message::Loaded { files }); self.send(loader::Message::Progress { n_total, n_done: LoadingProgress::Progress( processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1, ), config_version, dir: None, }); }); drop(watch_tx); for path in watch_rx { self.watch(&path); } drop(entry_tx); for entry in entry_rx { match entry { loader::Entry::Files(files) => { self.watched_file_entries.extend(files) } loader::Entry::Directories(dir) => { self.watched_dir_entries.push(dir) } } } self.send(loader::Message::Progress { n_total, n_done: LoadingProgress::Finished, config_version, dir: None, }); } Message::Invalidate(path) => { let contents = read(path.as_path()); let files = vec![(path, contents)]; self.send(loader::Message::Changed { files }); } }, Event::NotifyEvent(event) => { if let Some(event) = log_notify_error(event) { if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) = event.kind { let files = event .paths .into_iter() .filter_map(|path| { Some( AbsPathBuf::try_from( Utf8PathBuf::from_path_buf(path).ok()?, ) .expect("path is absolute"), ) }) .filter_map(|path| -> Option<(AbsPathBuf, Option>)> { let meta = fs::metadata(&path).ok()?; if meta.file_type().is_dir() && self .watched_dir_entries .iter() .any(|dir| dir.contains_dir(&path)) { self.watch(path.as_ref()); return None; } if !meta.file_type().is_file() { return None; } if !(self.watched_file_entries.contains(&path) || self .watched_dir_entries .iter() .any(|dir| dir.contains_file(&path))) { return None; } let contents = read(&path); Some((path, contents)) }) .collect(); self.send(loader::Message::Changed { files }); } } } } } } fn load_entry( mut watch: impl FnMut(&Path), entry: loader::Entry, do_watch: bool, send_message: impl Fn(AbsPathBuf), ) -> Vec<(AbsPathBuf, Option>)> { match entry { loader::Entry::Files(files) => files .into_iter() .map(|file| { if do_watch { watch(file.as_ref()); } let contents = read(file.as_path()); (file, contents) }) .collect::>(), loader::Entry::Directories(dirs) => { let mut res = Vec::new(); for root in &dirs.include { send_message(root.clone()); let walkdir = WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| { if !entry.file_type().is_dir() { return true; } let path = entry.path(); if path_might_be_cyclic(path) { return false; } root == path || dirs.exclude.iter().chain(&dirs.include).all(|it| it != path) }); let files = walkdir.filter_map(|it| it.ok()).filter_map(|entry| { let depth = entry.depth(); let is_dir = entry.file_type().is_dir(); let is_file = entry.file_type().is_file(); let abs_path = AbsPathBuf::try_from( Utf8PathBuf::from_path_buf(entry.into_path()).ok()?, ) .ok()?; if depth < 2 && is_dir { send_message(abs_path.clone()); } if is_dir && do_watch { watch(abs_path.as_ref()); } if !is_file { return None; } let ext = abs_path.extension().unwrap_or_default(); if dirs.extensions.iter().all(|it| it.as_str() != ext) { return None; } Some(abs_path) }); res.extend(files.map(|file| { let contents = read(file.as_path()); (file, contents) })); } res } } } fn watch(&mut self, path: &Path) { if let Some((watcher, _)) = &mut self.watcher { log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive)); } } #[track_caller] fn send(&self, msg: loader::Message) { self.sender.send(msg).unwrap(); } } fn read(path: &AbsPath) -> Option> { std::fs::read(path).ok() } fn log_notify_error(res: notify::Result) -> Option { res.map_err(|err| tracing::warn!("notify error: {}", err)).ok() } /// Is `path` a symlink to a parent directory? /// /// Including this path is guaranteed to cause an infinite loop. This /// heuristic is not sufficient to catch all symlink cycles (it's /// possible to construct cycle using two or more symlinks), but it /// catches common cases. fn path_might_be_cyclic(path: &Path) -> bool { let Ok(destination) = std::fs::read_link(path) else { return false; }; // If the symlink is of the form "../..", it's a parent symlink. let is_relative_parent = destination.components().all(|c| matches!(c, Component::CurDir | Component::ParentDir)); is_relative_parent || path.starts_with(destination) }