From 8286847bee20cb6fa0667b3b7d9e4a4da48bed28 Mon Sep 17 00:00:00 2001 From: Lukas Wirth Date: Fri, 2 Aug 2024 12:57:15 +0200 Subject: [PATCH] internal: Load VFS config changes in parallel --- Cargo.lock | 1 + crates/load-cargo/src/lib.rs | 10 +- crates/rust-analyzer/src/main_loop.rs | 11 +- crates/rust-analyzer/tests/slow-tests/main.rs | 2 +- crates/vfs-notify/Cargo.toml | 3 +- crates/vfs-notify/src/lib.rs | 170 +++++++++++------- crates/vfs/src/loader.rs | 11 +- 7 files changed, 128 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4f6ad28c1..925afdcc98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2332,6 +2332,7 @@ dependencies = [ "crossbeam-channel", "notify", "paths", + "rayon", "stdx", "tracing", "vfs", diff --git a/crates/load-cargo/src/lib.rs b/crates/load-cargo/src/lib.rs index 8737f2246b..a1aee43815 100644 --- a/crates/load-cargo/src/lib.rs +++ b/crates/load-cargo/src/lib.rs @@ -19,7 +19,11 @@ use project_model::{ CargoConfig, ManifestPath, PackageRoot, ProjectManifest, ProjectWorkspace, ProjectWorkspaceKind, }; use span::Span; -use vfs::{file_set::FileSetConfig, loader::Handle, AbsPath, AbsPathBuf, VfsPath}; +use vfs::{ + file_set::FileSetConfig, + loader::{Handle, LoadingProgress}, + AbsPath, AbsPathBuf, VfsPath, +}; pub struct LoadCargoConfig { pub load_out_dirs_from_check: bool, @@ -409,8 +413,8 @@ fn load_crate_graph( // wait until Vfs has loaded all roots for task in receiver { match task { - vfs::loader::Message::Progress { n_done, n_total, .. } => { - if n_done == Some(n_total) { + vfs::loader::Message::Progress { n_done, .. } => { + if n_done == LoadingProgress::Finished { break; } } diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index 9c820749ec..9db81f2295 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs @@ -15,7 +15,7 @@ use lsp_server::{Connection, Notification, Request}; use lsp_types::{notification::Notification as _, TextDocumentIdentifier}; use stdx::thread::ThreadIntent; use tracing::{error, span, Level}; -use vfs::{AbsPathBuf, FileId}; +use vfs::{loader::LoadingProgress, AbsPathBuf, FileId}; use crate::{ config::Config, @@ -772,12 +772,11 @@ impl GlobalState { let _p = tracing::info_span!("GlobalState::handle_vfs_mgs/progress").entered(); always!(config_version <= self.vfs_config_version); - let state = match n_done { - None => Progress::Begin, - Some(done) if done == n_total => Progress::End, - Some(_) => Progress::Report, + let (n_done, state) = match n_done { + LoadingProgress::Started => (0, Progress::Begin), + LoadingProgress::Progress(n_done) => (n_done.min(n_total), Progress::Report), + LoadingProgress::Finished => (n_total, Progress::End), }; - let n_done = n_done.unwrap_or_default(); self.vfs_progress_config_version = config_version; self.vfs_progress_n_total = n_total; diff --git a/crates/rust-analyzer/tests/slow-tests/main.rs b/crates/rust-analyzer/tests/slow-tests/main.rs index b1ef483771..a2825464f0 100644 --- a/crates/rust-analyzer/tests/slow-tests/main.rs +++ b/crates/rust-analyzer/tests/slow-tests/main.rs @@ -909,7 +909,7 @@ version = \"0.0.0\" fn out_dirs_check_impl(root_contains_symlink: bool) { if skip_slow_tests() { - // return; + return; } let mut server = Project::with_fixture( diff --git a/crates/vfs-notify/Cargo.toml b/crates/vfs-notify/Cargo.toml index a6d5027c3a..3602bac4dd 100644 --- a/crates/vfs-notify/Cargo.toml +++ b/crates/vfs-notify/Cargo.toml @@ -16,10 +16,11 @@ tracing.workspace = true walkdir = "2.3.2" crossbeam-channel = "0.5.5" notify = "6.1.1" +rayon = "1.10.0" stdx.workspace = true vfs.workspace = true paths.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs index a87b75e68f..0972e4234e 100644 --- a/crates/vfs-notify/src/lib.rs +++ b/crates/vfs-notify/src/lib.rs @@ -10,12 +10,14 @@ use std::{ fs, path::{Component, Path}, + sync::atomic::AtomicUsize, }; use crossbeam_channel::{never, select, unbounded, Receiver, Sender}; -use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; +use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use paths::{AbsPath, AbsPathBuf, Utf8PathBuf}; -use vfs::loader; +use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator}; +use vfs::loader::{self, LoadingProgress}; use walkdir::WalkDir; #[derive(Debug)] @@ -104,35 +106,61 @@ impl NotifyActor { let config_version = config.version; let n_total = config.load.len(); - self.send(loader::Message::Progress { + self.watched_entries.clear(); + + let send = |msg| (self.sender)(msg); + send(loader::Message::Progress { n_total, - n_done: None, + n_done: LoadingProgress::Started, config_version, dir: None, }); - self.watched_entries.clear(); - - for (i, entry) in config.load.into_iter().enumerate() { - let watch = config.watch.contains(&i); - if watch { - self.watched_entries.push(entry.clone()); + let (entry_tx, entry_rx) = unbounded(); + let (watch_tx, watch_rx) = unbounded(); + let processed = AtomicUsize::new(0); + config.load.into_par_iter().enumerate().for_each(move |(i, entry)| { + let do_watch = config.watch.contains(&i); + if do_watch { + _ = entry_tx.send(entry.clone()); } - let files = - self.load_entry(entry, watch, |file| loader::Message::Progress { - n_total, - n_done: Some(i), - dir: Some(file), - config_version, - }); - self.send(loader::Message::Loaded { files }); - self.send(loader::Message::Progress { + let files = Self::load_entry( + |f| _ = watch_tx.send(f.to_owned()), + entry, + do_watch, + |file| { + send(loader::Message::Progress { + n_total, + n_done: LoadingProgress::Progress( + processed.load(std::sync::atomic::Ordering::Relaxed), + ), + dir: Some(file), + config_version, + }) + }, + ); + send(loader::Message::Loaded { files }); + send(loader::Message::Progress { n_total, - n_done: Some(i + 1), + n_done: LoadingProgress::Progress( + processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1, + ), config_version, dir: None, }); + }); + for path in watch_rx { + self.watch(&path); } + for entry in entry_rx { + self.watched_entries.push(entry); + } + self.send(loader::Message::Progress { + n_total, + n_done: LoadingProgress::Finished, + config_version, + dir: None, + }); } Message::Invalidate(path) => { let contents = read(path.as_path()); @@ -142,60 +170,67 @@ impl NotifyActor { }, Event::NotifyEvent(event) => { if let Some(event) = log_notify_error(event) { - let files = event - .paths - .into_iter() - .filter_map(|path| { - Some( - AbsPathBuf::try_from(Utf8PathBuf::from_path_buf(path).ok()?) + if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) = + event.kind + { + let files = event + .paths + .into_iter() + .filter_map(|path| { + Some( + AbsPathBuf::try_from( + Utf8PathBuf::from_path_buf(path).ok()?, + ) .expect("path is absolute"), - ) - }) - .filter_map(|path| { - let meta = fs::metadata(&path).ok()?; - if meta.file_type().is_dir() - && self + ) + }) + .filter_map(|path| { + let meta = fs::metadata(&path).ok()?; + if meta.file_type().is_dir() + && self + .watched_entries + .iter() + .any(|entry| entry.contains_dir(&path)) + { + self.watch(path.as_ref()); + return None; + } + + if !meta.file_type().is_file() { + return None; + } + if !self .watched_entries .iter() - .any(|entry| entry.contains_dir(&path)) - { - self.watch(path); - return None; - } + .any(|entry| entry.contains_file(&path)) + { + return None; + } - if !meta.file_type().is_file() { - return None; - } - if !self - .watched_entries - .iter() - .any(|entry| entry.contains_file(&path)) - { - return None; - } - - let contents = read(&path); - Some((path, contents)) - }) - .collect(); - self.send(loader::Message::Changed { files }); + let contents = read(&path); + Some((path, contents)) + }) + .collect(); + self.send(loader::Message::Changed { files }); + } } } } } } + fn load_entry( - &mut self, + mut watch: impl FnMut(&Path), entry: loader::Entry, - watch: bool, - make_message: impl Fn(AbsPathBuf) -> loader::Message, + do_watch: bool, + send_message: impl Fn(AbsPathBuf), ) -> Vec<(AbsPathBuf, Option>)> { match entry { loader::Entry::Files(files) => files .into_iter() .map(|file| { - if watch { - self.watch(file.clone()); + if do_watch { + watch(file.as_ref()); } let contents = read(file.as_path()); (file, contents) @@ -205,7 +240,7 @@ impl NotifyActor { let mut res = Vec::new(); for root in &dirs.include { - self.send(make_message(root.clone())); + send_message(root.clone()); let walkdir = WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| { if !entry.file_type().is_dir() { @@ -213,7 +248,7 @@ impl NotifyActor { } let path = entry.path(); - if path_is_parent_symlink(path) { + if path_might_be_cyclic(path) { return false; } @@ -230,10 +265,10 @@ impl NotifyActor { ) .ok()?; if depth < 2 && is_dir { - self.send(make_message(abs_path.clone())); + send_message(abs_path.clone()); } - if is_dir && watch { - self.watch(abs_path.clone()); + if is_dir && do_watch { + watch(abs_path.as_ref()); } if !is_file { return None; @@ -255,12 +290,13 @@ impl NotifyActor { } } - fn watch(&mut self, path: AbsPathBuf) { + fn watch(&mut self, path: &Path) { if let Some((watcher, _)) = &mut self.watcher { - log_notify_error(watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)); + log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive)); } } - fn send(&mut self, msg: loader::Message) { + + fn send(&self, msg: loader::Message) { (self.sender)(msg); } } @@ -279,7 +315,7 @@ fn log_notify_error(res: notify::Result) -> Option { /// heuristic is not sufficient to catch all symlink cycles (it's /// possible to construct cycle using two or more symlinks), but it /// catches common cases. -fn path_is_parent_symlink(path: &Path) -> bool { +fn path_might_be_cyclic(path: &Path) -> bool { let Ok(destination) = std::fs::read_link(path) else { return false; }; diff --git a/crates/vfs/src/loader.rs b/crates/vfs/src/loader.rs index 3af91b1af8..30c08a9ff2 100644 --- a/crates/vfs/src/loader.rs +++ b/crates/vfs/src/loader.rs @@ -43,6 +43,13 @@ pub struct Config { pub watch: Vec, } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum LoadingProgress { + Started, + Progress(usize), + Finished, +} + /// Message about an action taken by a [`Handle`]. pub enum Message { /// Indicate a gradual progress. @@ -52,7 +59,7 @@ pub enum Message { /// The total files to be loaded. n_total: usize, /// The files that have been loaded successfully. - n_done: Option, + n_done: LoadingProgress, /// The dir being loaded, `None` if its for a file. dir: Option, /// The [`Config`] version. @@ -65,7 +72,7 @@ pub enum Message { } /// Type that will receive [`Messages`](Message) from a [`Handle`]. -pub type Sender = Box; +pub type Sender = Box; /// Interface for reading and watching files. pub trait Handle: fmt::Debug {