From 8286847bee20cb6fa0667b3b7d9e4a4da48bed28 Mon Sep 17 00:00:00 2001 From: Lukas Wirth Date: Fri, 2 Aug 2024 12:57:15 +0200 Subject: [PATCH 1/3] internal: Load VFS config changes in parallel --- Cargo.lock | 1 + crates/load-cargo/src/lib.rs | 10 +- crates/rust-analyzer/src/main_loop.rs | 11 +- crates/rust-analyzer/tests/slow-tests/main.rs | 2 +- crates/vfs-notify/Cargo.toml | 3 +- crates/vfs-notify/src/lib.rs | 170 +++++++++++------- crates/vfs/src/loader.rs | 11 +- 7 files changed, 128 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4f6ad28c1..925afdcc98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2332,6 +2332,7 @@ dependencies = [ "crossbeam-channel", "notify", "paths", + "rayon", "stdx", "tracing", "vfs", diff --git a/crates/load-cargo/src/lib.rs b/crates/load-cargo/src/lib.rs index 8737f2246b..a1aee43815 100644 --- a/crates/load-cargo/src/lib.rs +++ b/crates/load-cargo/src/lib.rs @@ -19,7 +19,11 @@ use project_model::{ CargoConfig, ManifestPath, PackageRoot, ProjectManifest, ProjectWorkspace, ProjectWorkspaceKind, }; use span::Span; -use vfs::{file_set::FileSetConfig, loader::Handle, AbsPath, AbsPathBuf, VfsPath}; +use vfs::{ + file_set::FileSetConfig, + loader::{Handle, LoadingProgress}, + AbsPath, AbsPathBuf, VfsPath, +}; pub struct LoadCargoConfig { pub load_out_dirs_from_check: bool, @@ -409,8 +413,8 @@ fn load_crate_graph( // wait until Vfs has loaded all roots for task in receiver { match task { - vfs::loader::Message::Progress { n_done, n_total, .. } => { - if n_done == Some(n_total) { + vfs::loader::Message::Progress { n_done, .. } => { + if n_done == LoadingProgress::Finished { break; } } diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index 9c820749ec..9db81f2295 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs @@ -15,7 +15,7 @@ use lsp_server::{Connection, Notification, Request}; use lsp_types::{notification::Notification as _, TextDocumentIdentifier}; use stdx::thread::ThreadIntent; use tracing::{error, span, Level}; -use vfs::{AbsPathBuf, FileId}; +use vfs::{loader::LoadingProgress, AbsPathBuf, FileId}; use crate::{ config::Config, @@ -772,12 +772,11 @@ impl GlobalState { let _p = tracing::info_span!("GlobalState::handle_vfs_mgs/progress").entered(); always!(config_version <= self.vfs_config_version); - let state = match n_done { - None => Progress::Begin, - Some(done) if done == n_total => Progress::End, - Some(_) => Progress::Report, + let (n_done, state) = match n_done { + LoadingProgress::Started => (0, Progress::Begin), + LoadingProgress::Progress(n_done) => (n_done.min(n_total), Progress::Report), + LoadingProgress::Finished => (n_total, Progress::End), }; - let n_done = n_done.unwrap_or_default(); self.vfs_progress_config_version = config_version; self.vfs_progress_n_total = n_total; diff --git a/crates/rust-analyzer/tests/slow-tests/main.rs b/crates/rust-analyzer/tests/slow-tests/main.rs index b1ef483771..a2825464f0 100644 --- a/crates/rust-analyzer/tests/slow-tests/main.rs +++ b/crates/rust-analyzer/tests/slow-tests/main.rs @@ -909,7 +909,7 @@ version = \"0.0.0\" fn out_dirs_check_impl(root_contains_symlink: bool) { if skip_slow_tests() { - // return; + return; } let mut server = Project::with_fixture( diff --git a/crates/vfs-notify/Cargo.toml b/crates/vfs-notify/Cargo.toml index a6d5027c3a..3602bac4dd 100644 --- a/crates/vfs-notify/Cargo.toml +++ b/crates/vfs-notify/Cargo.toml @@ -16,10 +16,11 @@ tracing.workspace = true walkdir = "2.3.2" crossbeam-channel = "0.5.5" notify = "6.1.1" +rayon = "1.10.0" stdx.workspace = true vfs.workspace = true paths.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs index a87b75e68f..0972e4234e 100644 --- a/crates/vfs-notify/src/lib.rs +++ b/crates/vfs-notify/src/lib.rs @@ -10,12 +10,14 @@ use std::{ fs, path::{Component, Path}, + sync::atomic::AtomicUsize, }; use crossbeam_channel::{never, select, unbounded, Receiver, Sender}; -use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; +use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use paths::{AbsPath, AbsPathBuf, Utf8PathBuf}; -use vfs::loader; +use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator}; +use vfs::loader::{self, LoadingProgress}; use walkdir::WalkDir; #[derive(Debug)] @@ -104,35 +106,61 @@ impl NotifyActor { let config_version = config.version; let n_total = config.load.len(); - self.send(loader::Message::Progress { + self.watched_entries.clear(); + + let send = |msg| (self.sender)(msg); + send(loader::Message::Progress { n_total, - n_done: None, + n_done: LoadingProgress::Started, config_version, dir: None, }); - self.watched_entries.clear(); - - for (i, entry) in config.load.into_iter().enumerate() { - let watch = config.watch.contains(&i); - if watch { - self.watched_entries.push(entry.clone()); + let (entry_tx, entry_rx) = unbounded(); + let (watch_tx, watch_rx) = unbounded(); + let processed = AtomicUsize::new(0); + config.load.into_par_iter().enumerate().for_each(move |(i, entry)| { + let do_watch = config.watch.contains(&i); + if do_watch { + _ = entry_tx.send(entry.clone()); } - let files = - self.load_entry(entry, watch, |file| loader::Message::Progress { - n_total, - n_done: Some(i), - dir: Some(file), - config_version, - }); - self.send(loader::Message::Loaded { files }); - self.send(loader::Message::Progress { + let files = Self::load_entry( + |f| _ = watch_tx.send(f.to_owned()), + entry, + do_watch, + |file| { + send(loader::Message::Progress { + n_total, + n_done: LoadingProgress::Progress( + processed.load(std::sync::atomic::Ordering::Relaxed), + ), + dir: Some(file), + config_version, + }) + }, + ); + send(loader::Message::Loaded { files }); + send(loader::Message::Progress { n_total, - n_done: Some(i + 1), + n_done: LoadingProgress::Progress( + processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1, + ), config_version, dir: None, }); + }); + for path in watch_rx { + self.watch(&path); } + for entry in entry_rx { + self.watched_entries.push(entry); + } + self.send(loader::Message::Progress { + n_total, + n_done: LoadingProgress::Finished, + config_version, + dir: None, + }); } Message::Invalidate(path) => { let contents = read(path.as_path()); @@ -142,60 +170,67 @@ impl NotifyActor { }, Event::NotifyEvent(event) => { if let Some(event) = log_notify_error(event) { - let files = event - .paths - .into_iter() - .filter_map(|path| { - Some( - AbsPathBuf::try_from(Utf8PathBuf::from_path_buf(path).ok()?) + if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) = + event.kind + { + let files = event + .paths + .into_iter() + .filter_map(|path| { + Some( + AbsPathBuf::try_from( + Utf8PathBuf::from_path_buf(path).ok()?, + ) .expect("path is absolute"), - ) - }) - .filter_map(|path| { - let meta = fs::metadata(&path).ok()?; - if meta.file_type().is_dir() - && self + ) + }) + .filter_map(|path| { + let meta = fs::metadata(&path).ok()?; + if meta.file_type().is_dir() + && self + .watched_entries + .iter() + .any(|entry| entry.contains_dir(&path)) + { + self.watch(path.as_ref()); + return None; + } + + if !meta.file_type().is_file() { + return None; + } + if !self .watched_entries .iter() - .any(|entry| entry.contains_dir(&path)) - { - self.watch(path); - return None; - } + .any(|entry| entry.contains_file(&path)) + { + return None; + } - if !meta.file_type().is_file() { - return None; - } - if !self - .watched_entries - .iter() - .any(|entry| entry.contains_file(&path)) - { - return None; - } - - let contents = read(&path); - Some((path, contents)) - }) - .collect(); - self.send(loader::Message::Changed { files }); + let contents = read(&path); + Some((path, contents)) + }) + .collect(); + self.send(loader::Message::Changed { files }); + } } } } } } + fn load_entry( - &mut self, + mut watch: impl FnMut(&Path), entry: loader::Entry, - watch: bool, - make_message: impl Fn(AbsPathBuf) -> loader::Message, + do_watch: bool, + send_message: impl Fn(AbsPathBuf), ) -> Vec<(AbsPathBuf, Option>)> { match entry { loader::Entry::Files(files) => files .into_iter() .map(|file| { - if watch { - self.watch(file.clone()); + if do_watch { + watch(file.as_ref()); } let contents = read(file.as_path()); (file, contents) @@ -205,7 +240,7 @@ impl NotifyActor { let mut res = Vec::new(); for root in &dirs.include { - self.send(make_message(root.clone())); + send_message(root.clone()); let walkdir = WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| { if !entry.file_type().is_dir() { @@ -213,7 +248,7 @@ impl NotifyActor { } let path = entry.path(); - if path_is_parent_symlink(path) { + if path_might_be_cyclic(path) { return false; } @@ -230,10 +265,10 @@ impl NotifyActor { ) .ok()?; if depth < 2 && is_dir { - self.send(make_message(abs_path.clone())); + send_message(abs_path.clone()); } - if is_dir && watch { - self.watch(abs_path.clone()); + if is_dir && do_watch { + watch(abs_path.as_ref()); } if !is_file { return None; @@ -255,12 +290,13 @@ impl NotifyActor { } } - fn watch(&mut self, path: AbsPathBuf) { + fn watch(&mut self, path: &Path) { if let Some((watcher, _)) = &mut self.watcher { - log_notify_error(watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)); + log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive)); } } - fn send(&mut self, msg: loader::Message) { + + fn send(&self, msg: loader::Message) { (self.sender)(msg); } } @@ -279,7 +315,7 @@ fn log_notify_error(res: notify::Result) -> Option { /// heuristic is not sufficient to catch all symlink cycles (it's /// possible to construct cycle using two or more symlinks), but it /// catches common cases. -fn path_is_parent_symlink(path: &Path) -> bool { +fn path_might_be_cyclic(path: &Path) -> bool { let Ok(destination) = std::fs::read_link(path) else { return false; }; diff --git a/crates/vfs/src/loader.rs b/crates/vfs/src/loader.rs index 3af91b1af8..30c08a9ff2 100644 --- a/crates/vfs/src/loader.rs +++ b/crates/vfs/src/loader.rs @@ -43,6 +43,13 @@ pub struct Config { pub watch: Vec, } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum LoadingProgress { + Started, + Progress(usize), + Finished, +} + /// Message about an action taken by a [`Handle`]. pub enum Message { /// Indicate a gradual progress. @@ -52,7 +59,7 @@ pub enum Message { /// The total files to be loaded. n_total: usize, /// The files that have been loaded successfully. - n_done: Option, + n_done: LoadingProgress, /// The dir being loaded, `None` if its for a file. dir: Option, /// The [`Config`] version. @@ -65,7 +72,7 @@ pub enum Message { } /// Type that will receive [`Messages`](Message) from a [`Handle`]. -pub type Sender = Box; +pub type Sender = Box; /// Interface for reading and watching files. pub trait Handle: fmt::Debug { From c6ae9cde9978223b85df608607d392ecd9709c39 Mon Sep 17 00:00:00 2001 From: Lukas Wirth Date: Fri, 2 Aug 2024 17:09:25 +0200 Subject: [PATCH 2/3] Wait with change processing until the vfs is done --- crates/rust-analyzer/src/global_state.rs | 10 ++++----- crates/rust-analyzer/src/main_loop.rs | 21 +++++++++++-------- crates/rust-analyzer/src/reload.rs | 26 +++++++++++------------- crates/vfs-notify/src/lib.rs | 1 + crates/vfs/src/lib.rs | 6 +++--- 5 files changed, 34 insertions(+), 30 deletions(-) diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs index f1dde104fc..e435af6c80 100644 --- a/crates/rust-analyzer/src/global_state.rs +++ b/crates/rust-analyzer/src/global_state.rs @@ -33,7 +33,7 @@ use crate::{ lsp_ext, main_loop::Task, mem_docs::MemDocs, - op_queue::OpQueue, + op_queue::{Cause, OpQueue}, reload, target_spec::{CargoTargetSpec, ProjectJsonTargetSpec, TargetSpec}, task_pool::{TaskPool, TaskQueue}, @@ -108,8 +108,8 @@ pub(crate) struct GlobalState { pub(crate) vfs: Arc)>>, pub(crate) vfs_config_version: u32, pub(crate) vfs_progress_config_version: u32, - pub(crate) vfs_progress_n_total: usize, - pub(crate) vfs_progress_n_done: usize, + pub(crate) vfs_done: bool, + pub(crate) wants_to_switch: Option, /// `workspaces` field stores the data we actually use, while the `OpQueue` /// stores the result of the last fetch. @@ -252,8 +252,8 @@ impl GlobalState { vfs: Arc::new(RwLock::new((vfs::Vfs::default(), IntMap::default()))), vfs_config_version: 0, vfs_progress_config_version: 0, - vfs_progress_n_total: 0, - vfs_progress_n_done: 0, + vfs_done: true, + wants_to_switch: None, workspaces: Arc::from(Vec::new()), crate_graph_file_dependencies: FxHashSet::default(), diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index 9db81f2295..23ae282396 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs @@ -375,9 +375,14 @@ impl GlobalState { } } let event_handling_duration = loop_start.elapsed(); - - let state_changed = self.process_changes(); - let memdocs_added_or_removed = self.mem_docs.take_changes(); + let (state_changed, memdocs_added_or_removed) = if self.vfs_done { + if let Some(cause) = self.wants_to_switch.take() { + self.switch_workspaces(cause); + } + (self.process_changes(), self.mem_docs.take_changes()) + } else { + (false, false) + }; if self.is_quiescent() { let became_quiescent = !was_quiescent; @@ -672,7 +677,7 @@ impl GlobalState { if let Err(e) = self.fetch_workspace_error() { error!("FetchWorkspaceError:\n{e}"); } - self.switch_workspaces("fetched workspace".to_owned()); + self.wants_to_switch = Some("fetched workspace".to_owned()); (Progress::End, None) } }; @@ -718,8 +723,9 @@ impl GlobalState { error!("FetchBuildDataError:\n{e}"); } - self.switch_workspaces("fetched build data".to_owned()); - + if self.wants_to_switch.is_none() { + self.wants_to_switch = Some("fetched build data".to_owned()); + } (Some(Progress::End), None) } }; @@ -779,8 +785,7 @@ impl GlobalState { }; self.vfs_progress_config_version = config_version; - self.vfs_progress_n_total = n_total; - self.vfs_progress_n_done = n_done; + self.vfs_done = state == Progress::End; let mut message = format!("{n_done}/{n_total}"); if let Some(dir) = dir { diff --git a/crates/rust-analyzer/src/reload.rs b/crates/rust-analyzer/src/reload.rs index 5c95ccd4b8..39301f4288 100644 --- a/crates/rust-analyzer/src/reload.rs +++ b/crates/rust-analyzer/src/reload.rs @@ -62,13 +62,13 @@ pub(crate) enum ProcMacroProgress { impl GlobalState { pub(crate) fn is_quiescent(&self) -> bool { - !(self.last_reported_status.is_none() - || self.fetch_workspaces_queue.op_in_progress() - || self.fetch_build_data_queue.op_in_progress() - || self.fetch_proc_macros_queue.op_in_progress() - || self.discover_workspace_queue.op_in_progress() - || self.vfs_progress_config_version < self.vfs_config_version - || self.vfs_progress_n_done < self.vfs_progress_n_total) + self.vfs_done + && self.last_reported_status.is_some() + && !self.fetch_workspaces_queue.op_in_progress() + && !self.fetch_build_data_queue.op_in_progress() + && !self.fetch_proc_macros_queue.op_in_progress() + && !self.discover_workspace_queue.op_in_progress() + && self.vfs_progress_config_version >= self.vfs_config_version } pub(crate) fn update_configuration(&mut self, config: Config) { @@ -102,15 +102,13 @@ impl GlobalState { } pub(crate) fn current_status(&self) -> lsp_ext::ServerStatusParams { - let mut status = lsp_ext::ServerStatusParams { - health: lsp_ext::Health::Ok, - quiescent: self.is_quiescent(), - message: None, - }; + let quiescent = self.is_quiescent(); + let mut status = + lsp_ext::ServerStatusParams { health: lsp_ext::Health::Ok, quiescent, message: None }; let mut message = String::new(); if !self.config.cargo_autoreload(None) - && self.is_quiescent() + && quiescent && self.fetch_workspaces_queue.op_requested() && self.config.discover_workspace_config().is_none() { @@ -242,7 +240,7 @@ impl GlobalState { let discover_command = self.config.discover_workspace_config().cloned(); let is_quiescent = !(self.discover_workspace_queue.op_in_progress() || self.vfs_progress_config_version < self.vfs_config_version - || self.vfs_progress_n_done < self.vfs_progress_n_total); + || !self.vfs_done); move |sender| { let progress = { diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs index 0972e4234e..7328cd9ed6 100644 --- a/crates/vfs-notify/src/lib.rs +++ b/crates/vfs-notify/src/lib.rs @@ -61,6 +61,7 @@ type NotifyEvent = notify::Result; struct NotifyActor { sender: loader::Sender, + // FIXME: Consider hashset watched_entries: Vec, // Drop order is significant. watcher: Option<(RecommendedWatcher, Receiver)>, diff --git a/crates/vfs/src/lib.rs b/crates/vfs/src/lib.rs index 77f890fd7e..bc40e03c5a 100644 --- a/crates/vfs/src/lib.rs +++ b/crates/vfs/src/lib.rs @@ -201,8 +201,8 @@ impl Vfs { pub fn set_file_contents(&mut self, path: VfsPath, contents: Option>) -> bool { let _p = span!(Level::INFO, "Vfs::set_file_contents").entered(); let file_id = self.alloc_file_id(path); - let state = self.get(file_id); - let change_kind = match (state, contents) { + let state: FileState = self.get(file_id); + let change = match (state, contents) { (FileState::Deleted, None) => return false, (FileState::Deleted, Some(v)) => { let hash = hash_once::(&*v); @@ -225,7 +225,7 @@ impl Vfs { }; }; - let changed_file = ChangedFile { file_id, change: change_kind }; + let changed_file = ChangedFile { file_id, change }; match self.changes.entry(file_id) { // two changes to the same file in one cycle, merge them appropriately Entry::Occupied(mut o) => { From e437db2483e687d2c22f82f4bb15aed71dfe8081 Mon Sep 17 00:00:00 2001 From: Lukas Wirth Date: Mon, 5 Aug 2024 15:56:23 +0200 Subject: [PATCH 3/3] Slightly optimize watch list in vfs --- Cargo.lock | 1 + crates/vfs-notify/Cargo.toml | 1 + crates/vfs-notify/src/lib.rs | 40 +++++++++++++++++++++++++----------- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 925afdcc98..65318b1947 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2333,6 +2333,7 @@ dependencies = [ "notify", "paths", "rayon", + "rustc-hash", "stdx", "tracing", "vfs", diff --git a/crates/vfs-notify/Cargo.toml b/crates/vfs-notify/Cargo.toml index 3602bac4dd..2e4a452bf8 100644 --- a/crates/vfs-notify/Cargo.toml +++ b/crates/vfs-notify/Cargo.toml @@ -21,6 +21,7 @@ rayon = "1.10.0" stdx.workspace = true vfs.workspace = true paths.workspace = true +rustc-hash.workspace = true [lints] workspace = true diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs index 7328cd9ed6..d0d3a84446 100644 --- a/crates/vfs-notify/src/lib.rs +++ b/crates/vfs-notify/src/lib.rs @@ -17,6 +17,7 @@ use crossbeam_channel::{never, select, unbounded, Receiver, Sender}; use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use paths::{AbsPath, AbsPathBuf, Utf8PathBuf}; use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator}; +use rustc_hash::FxHashSet; use vfs::loader::{self, LoadingProgress}; use walkdir::WalkDir; @@ -61,8 +62,8 @@ type NotifyEvent = notify::Result; struct NotifyActor { sender: loader::Sender, - // FIXME: Consider hashset - watched_entries: Vec, + watched_file_entries: FxHashSet, + watched_dir_entries: Vec, // Drop order is significant. watcher: Option<(RecommendedWatcher, Receiver)>, } @@ -75,7 +76,12 @@ enum Event { impl NotifyActor { fn new(sender: loader::Sender) -> NotifyActor { - NotifyActor { sender, watched_entries: Vec::new(), watcher: None } + NotifyActor { + sender, + watched_dir_entries: Vec::new(), + watched_file_entries: FxHashSet::default(), + watcher: None, + } } fn next_event(&self, receiver: &Receiver) -> Option { @@ -107,7 +113,8 @@ impl NotifyActor { let config_version = config.version; let n_total = config.load.len(); - self.watched_entries.clear(); + self.watched_dir_entries.clear(); + self.watched_file_entries.clear(); let send = |msg| (self.sender)(msg); send(loader::Message::Progress { @@ -154,7 +161,14 @@ impl NotifyActor { self.watch(&path); } for entry in entry_rx { - self.watched_entries.push(entry); + match entry { + loader::Entry::Files(files) => { + self.watched_file_entries.extend(files) + } + loader::Entry::Directories(dir) => { + self.watched_dir_entries.push(dir) + } + } } self.send(loader::Message::Progress { n_total, @@ -185,13 +199,13 @@ impl NotifyActor { .expect("path is absolute"), ) }) - .filter_map(|path| { + .filter_map(|path| -> Option<(AbsPathBuf, Option>)> { let meta = fs::metadata(&path).ok()?; if meta.file_type().is_dir() && self - .watched_entries + .watched_dir_entries .iter() - .any(|entry| entry.contains_dir(&path)) + .any(|dir| dir.contains_dir(&path)) { self.watch(path.as_ref()); return None; @@ -200,10 +214,12 @@ impl NotifyActor { if !meta.file_type().is_file() { return None; } - if !self - .watched_entries - .iter() - .any(|entry| entry.contains_file(&path)) + + if !(self.watched_file_entries.contains(&path) + || self + .watched_dir_entries + .iter() + .any(|dir| dir.contains_file(&path))) { return None; }