Auto merge of #17771 - Veykril:parallel-vfs-config, r=Veykril

internal: Load VFS config changes in parallel

Simple attempt to make some progress f or https://github.com/rust-lang/rust-analyzer/issues/17373
No clue if those atomic orderings are right, though I don't think they are really too relevant either.

A more complete fix would probably need to replace our `ProjectFolders` handling a bit.
This commit is contained in:
bors 2024-08-05 14:07:22 +00:00
commit ce73b7cba2
10 changed files with 183 additions and 113 deletions

2
Cargo.lock generated
View file

@ -2349,6 +2349,8 @@ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"notify", "notify",
"paths", "paths",
"rayon",
"rustc-hash",
"stdx", "stdx",
"tracing", "tracing",
"vfs", "vfs",

View file

@ -19,7 +19,11 @@ use project_model::{
CargoConfig, PackageRoot, ProjectManifest, ProjectWorkspace, ProjectWorkspaceKind, CargoConfig, PackageRoot, ProjectManifest, ProjectWorkspace, ProjectWorkspaceKind,
}; };
use span::Span; use span::Span;
use vfs::{file_set::FileSetConfig, loader::Handle, AbsPath, AbsPathBuf, VfsPath}; use vfs::{
file_set::FileSetConfig,
loader::{Handle, LoadingProgress},
AbsPath, AbsPathBuf, VfsPath,
};
pub struct LoadCargoConfig { pub struct LoadCargoConfig {
pub load_out_dirs_from_check: bool, pub load_out_dirs_from_check: bool,
@ -409,8 +413,8 @@ fn load_crate_graph(
// wait until Vfs has loaded all roots // wait until Vfs has loaded all roots
for task in receiver { for task in receiver {
match task { match task {
vfs::loader::Message::Progress { n_done, n_total, .. } => { vfs::loader::Message::Progress { n_done, .. } => {
if n_done == Some(n_total) { if n_done == LoadingProgress::Finished {
break; break;
} }
} }

View file

@ -33,7 +33,7 @@ use crate::{
lsp_ext, lsp_ext,
main_loop::Task, main_loop::Task,
mem_docs::MemDocs, mem_docs::MemDocs,
op_queue::OpQueue, op_queue::{Cause, OpQueue},
reload, reload,
target_spec::{CargoTargetSpec, ProjectJsonTargetSpec, TargetSpec}, target_spec::{CargoTargetSpec, ProjectJsonTargetSpec, TargetSpec},
task_pool::{TaskPool, TaskQueue}, task_pool::{TaskPool, TaskQueue},
@ -108,8 +108,8 @@ pub(crate) struct GlobalState {
pub(crate) vfs: Arc<RwLock<(vfs::Vfs, IntMap<FileId, LineEndings>)>>, pub(crate) vfs: Arc<RwLock<(vfs::Vfs, IntMap<FileId, LineEndings>)>>,
pub(crate) vfs_config_version: u32, pub(crate) vfs_config_version: u32,
pub(crate) vfs_progress_config_version: u32, pub(crate) vfs_progress_config_version: u32,
pub(crate) vfs_progress_n_total: usize, pub(crate) vfs_done: bool,
pub(crate) vfs_progress_n_done: usize, pub(crate) wants_to_switch: Option<Cause>,
/// `workspaces` field stores the data we actually use, while the `OpQueue` /// `workspaces` field stores the data we actually use, while the `OpQueue`
/// stores the result of the last fetch. /// stores the result of the last fetch.
@ -252,8 +252,8 @@ impl GlobalState {
vfs: Arc::new(RwLock::new((vfs::Vfs::default(), IntMap::default()))), vfs: Arc::new(RwLock::new((vfs::Vfs::default(), IntMap::default()))),
vfs_config_version: 0, vfs_config_version: 0,
vfs_progress_config_version: 0, vfs_progress_config_version: 0,
vfs_progress_n_total: 0, vfs_done: true,
vfs_progress_n_done: 0, wants_to_switch: None,
workspaces: Arc::from(Vec::new()), workspaces: Arc::from(Vec::new()),
crate_graph_file_dependencies: FxHashSet::default(), crate_graph_file_dependencies: FxHashSet::default(),

View file

@ -15,7 +15,7 @@ use lsp_server::{Connection, Notification, Request};
use lsp_types::{notification::Notification as _, TextDocumentIdentifier}; use lsp_types::{notification::Notification as _, TextDocumentIdentifier};
use stdx::thread::ThreadIntent; use stdx::thread::ThreadIntent;
use tracing::{error, span, Level}; use tracing::{error, span, Level};
use vfs::{AbsPathBuf, FileId}; use vfs::{loader::LoadingProgress, AbsPathBuf, FileId};
use crate::{ use crate::{
config::Config, config::Config,
@ -381,9 +381,14 @@ impl GlobalState {
} }
} }
let event_handling_duration = loop_start.elapsed(); let event_handling_duration = loop_start.elapsed();
let (state_changed, memdocs_added_or_removed) = if self.vfs_done {
let state_changed = self.process_changes(); if let Some(cause) = self.wants_to_switch.take() {
let memdocs_added_or_removed = self.mem_docs.take_changes(); self.switch_workspaces(cause);
}
(self.process_changes(), self.mem_docs.take_changes())
} else {
(false, false)
};
if self.is_quiescent() { if self.is_quiescent() {
let became_quiescent = !was_quiescent; let became_quiescent = !was_quiescent;
@ -691,7 +696,7 @@ impl GlobalState {
if let Err(e) = self.fetch_workspace_error() { if let Err(e) = self.fetch_workspace_error() {
error!("FetchWorkspaceError:\n{e}"); error!("FetchWorkspaceError:\n{e}");
} }
self.switch_workspaces("fetched workspace".to_owned()); self.wants_to_switch = Some("fetched workspace".to_owned());
(Progress::End, None) (Progress::End, None)
} }
}; };
@ -737,8 +742,9 @@ impl GlobalState {
error!("FetchBuildDataError:\n{e}"); error!("FetchBuildDataError:\n{e}");
} }
self.switch_workspaces("fetched build data".to_owned()); if self.wants_to_switch.is_none() {
self.wants_to_switch = Some("fetched build data".to_owned());
}
(Some(Progress::End), None) (Some(Progress::End), None)
} }
}; };
@ -791,16 +797,14 @@ impl GlobalState {
let _p = tracing::info_span!("GlobalState::handle_vfs_mgs/progress").entered(); let _p = tracing::info_span!("GlobalState::handle_vfs_mgs/progress").entered();
always!(config_version <= self.vfs_config_version); always!(config_version <= self.vfs_config_version);
let state = match n_done { let (n_done, state) = match n_done {
None => Progress::Begin, LoadingProgress::Started => (0, Progress::Begin),
Some(done) if done == n_total => Progress::End, LoadingProgress::Progress(n_done) => (n_done.min(n_total), Progress::Report),
Some(_) => Progress::Report, LoadingProgress::Finished => (n_total, Progress::End),
}; };
let n_done = n_done.unwrap_or_default();
self.vfs_progress_config_version = config_version; self.vfs_progress_config_version = config_version;
self.vfs_progress_n_total = n_total; self.vfs_done = state == Progress::End;
self.vfs_progress_n_done = n_done;
let mut message = format!("{n_done}/{n_total}"); let mut message = format!("{n_done}/{n_total}");
if let Some(dir) = dir { if let Some(dir) = dir {

View file

@ -62,13 +62,13 @@ pub(crate) enum ProcMacroProgress {
impl GlobalState { impl GlobalState {
pub(crate) fn is_quiescent(&self) -> bool { pub(crate) fn is_quiescent(&self) -> bool {
!(self.last_reported_status.is_none() self.vfs_done
|| self.fetch_workspaces_queue.op_in_progress() && self.last_reported_status.is_some()
|| self.fetch_build_data_queue.op_in_progress() && !self.fetch_workspaces_queue.op_in_progress()
|| self.fetch_proc_macros_queue.op_in_progress() && !self.fetch_build_data_queue.op_in_progress()
|| self.discover_workspace_queue.op_in_progress() && !self.fetch_proc_macros_queue.op_in_progress()
|| self.vfs_progress_config_version < self.vfs_config_version && !self.discover_workspace_queue.op_in_progress()
|| self.vfs_progress_n_done < self.vfs_progress_n_total) && self.vfs_progress_config_version >= self.vfs_config_version
} }
pub(crate) fn update_configuration(&mut self, config: Config) { pub(crate) fn update_configuration(&mut self, config: Config) {
@ -102,15 +102,13 @@ impl GlobalState {
} }
pub(crate) fn current_status(&self) -> lsp_ext::ServerStatusParams { pub(crate) fn current_status(&self) -> lsp_ext::ServerStatusParams {
let mut status = lsp_ext::ServerStatusParams { let quiescent = self.is_quiescent();
health: lsp_ext::Health::Ok, let mut status =
quiescent: self.is_quiescent(), lsp_ext::ServerStatusParams { health: lsp_ext::Health::Ok, quiescent, message: None };
message: None,
};
let mut message = String::new(); let mut message = String::new();
if !self.config.cargo_autoreload(None) if !self.config.cargo_autoreload(None)
&& self.is_quiescent() && quiescent
&& self.fetch_workspaces_queue.op_requested() && self.fetch_workspaces_queue.op_requested()
&& self.config.discover_workspace_config().is_none() && self.config.discover_workspace_config().is_none()
{ {
@ -242,7 +240,7 @@ impl GlobalState {
let discover_command = self.config.discover_workspace_config().cloned(); let discover_command = self.config.discover_workspace_config().cloned();
let is_quiescent = !(self.discover_workspace_queue.op_in_progress() let is_quiescent = !(self.discover_workspace_queue.op_in_progress()
|| self.vfs_progress_config_version < self.vfs_config_version || self.vfs_progress_config_version < self.vfs_config_version
|| self.vfs_progress_n_done < self.vfs_progress_n_total); || !self.vfs_done);
move |sender| { move |sender| {
let progress = { let progress = {

View file

@ -909,7 +909,7 @@ version = \"0.0.0\"
fn out_dirs_check_impl(root_contains_symlink: bool) { fn out_dirs_check_impl(root_contains_symlink: bool) {
if skip_slow_tests() { if skip_slow_tests() {
// return; return;
} }
let mut server = Project::with_fixture( let mut server = Project::with_fixture(

View file

@ -16,10 +16,12 @@ tracing.workspace = true
walkdir = "2.3.2" walkdir = "2.3.2"
crossbeam-channel = "0.5.5" crossbeam-channel = "0.5.5"
notify = "6.1.1" notify = "6.1.1"
rayon = "1.10.0"
stdx.workspace = true stdx.workspace = true
vfs.workspace = true vfs.workspace = true
paths.workspace = true paths.workspace = true
rustc-hash.workspace = true
[lints] [lints]
workspace = true workspace = true

View file

@ -10,12 +10,15 @@
use std::{ use std::{
fs, fs,
path::{Component, Path}, path::{Component, Path},
sync::atomic::AtomicUsize,
}; };
use crossbeam_channel::{never, select, unbounded, Receiver, Sender}; use crossbeam_channel::{never, select, unbounded, Receiver, Sender};
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use paths::{AbsPath, AbsPathBuf, Utf8PathBuf}; use paths::{AbsPath, AbsPathBuf, Utf8PathBuf};
use vfs::loader; use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator as _, ParallelIterator};
use rustc_hash::FxHashSet;
use vfs::loader::{self, LoadingProgress};
use walkdir::WalkDir; use walkdir::WalkDir;
#[derive(Debug)] #[derive(Debug)]
@ -59,7 +62,8 @@ type NotifyEvent = notify::Result<notify::Event>;
struct NotifyActor { struct NotifyActor {
sender: loader::Sender, sender: loader::Sender,
watched_entries: Vec<loader::Entry>, watched_file_entries: FxHashSet<AbsPathBuf>,
watched_dir_entries: Vec<loader::Directories>,
// Drop order is significant. // Drop order is significant.
watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>, watcher: Option<(RecommendedWatcher, Receiver<NotifyEvent>)>,
} }
@ -72,7 +76,12 @@ enum Event {
impl NotifyActor { impl NotifyActor {
fn new(sender: loader::Sender) -> NotifyActor { fn new(sender: loader::Sender) -> NotifyActor {
NotifyActor { sender, watched_entries: Vec::new(), watcher: None } NotifyActor {
sender,
watched_dir_entries: Vec::new(),
watched_file_entries: FxHashSet::default(),
watcher: None,
}
} }
fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> { fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
@ -104,35 +113,69 @@ impl NotifyActor {
let config_version = config.version; let config_version = config.version;
let n_total = config.load.len(); let n_total = config.load.len();
self.send(loader::Message::Progress { self.watched_dir_entries.clear();
self.watched_file_entries.clear();
let send = |msg| (self.sender)(msg);
send(loader::Message::Progress {
n_total, n_total,
n_done: None, n_done: LoadingProgress::Started,
config_version, config_version,
dir: None, dir: None,
}); });
self.watched_entries.clear(); let (entry_tx, entry_rx) = unbounded();
let (watch_tx, watch_rx) = unbounded();
for (i, entry) in config.load.into_iter().enumerate() { let processed = AtomicUsize::new(0);
let watch = config.watch.contains(&i); config.load.into_par_iter().enumerate().for_each(move |(i, entry)| {
if watch { let do_watch = config.watch.contains(&i);
self.watched_entries.push(entry.clone()); if do_watch {
_ = entry_tx.send(entry.clone());
} }
let files = let files = Self::load_entry(
self.load_entry(entry, watch, |file| loader::Message::Progress { |f| _ = watch_tx.send(f.to_owned()),
n_total, entry,
n_done: Some(i), do_watch,
dir: Some(file), |file| {
config_version, send(loader::Message::Progress {
}); n_total,
self.send(loader::Message::Loaded { files }); n_done: LoadingProgress::Progress(
self.send(loader::Message::Progress { processed.load(std::sync::atomic::Ordering::Relaxed),
),
dir: Some(file),
config_version,
})
},
);
send(loader::Message::Loaded { files });
send(loader::Message::Progress {
n_total, n_total,
n_done: Some(i + 1), n_done: LoadingProgress::Progress(
processed.fetch_add(1, std::sync::atomic::Ordering::AcqRel) + 1,
),
config_version, config_version,
dir: None, dir: None,
}); });
});
for path in watch_rx {
self.watch(&path);
} }
for entry in entry_rx {
match entry {
loader::Entry::Files(files) => {
self.watched_file_entries.extend(files)
}
loader::Entry::Directories(dir) => {
self.watched_dir_entries.push(dir)
}
}
}
self.send(loader::Message::Progress {
n_total,
n_done: LoadingProgress::Finished,
config_version,
dir: None,
});
} }
Message::Invalidate(path) => { Message::Invalidate(path) => {
let contents = read(path.as_path()); let contents = read(path.as_path());
@ -142,60 +185,69 @@ impl NotifyActor {
}, },
Event::NotifyEvent(event) => { Event::NotifyEvent(event) => {
if let Some(event) = log_notify_error(event) { if let Some(event) = log_notify_error(event) {
let files = event if let EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) =
.paths event.kind
.into_iter() {
.filter_map(|path| { let files = event
Some( .paths
AbsPathBuf::try_from(Utf8PathBuf::from_path_buf(path).ok()?) .into_iter()
.filter_map(|path| {
Some(
AbsPathBuf::try_from(
Utf8PathBuf::from_path_buf(path).ok()?,
)
.expect("path is absolute"), .expect("path is absolute"),
) )
}) })
.filter_map(|path| { .filter_map(|path| -> Option<(AbsPathBuf, Option<Vec<u8>>)> {
let meta = fs::metadata(&path).ok()?; let meta = fs::metadata(&path).ok()?;
if meta.file_type().is_dir() if meta.file_type().is_dir()
&& self && self
.watched_entries .watched_dir_entries
.iter() .iter()
.any(|entry| entry.contains_dir(&path)) .any(|dir| dir.contains_dir(&path))
{ {
self.watch(path); self.watch(path.as_ref());
return None; return None;
} }
if !meta.file_type().is_file() { if !meta.file_type().is_file() {
return None; return None;
} }
if !self
.watched_entries
.iter()
.any(|entry| entry.contains_file(&path))
{
return None;
}
let contents = read(&path); if !(self.watched_file_entries.contains(&path)
Some((path, contents)) || self
}) .watched_dir_entries
.collect(); .iter()
self.send(loader::Message::Changed { files }); .any(|dir| dir.contains_file(&path)))
{
return None;
}
let contents = read(&path);
Some((path, contents))
})
.collect();
self.send(loader::Message::Changed { files });
}
} }
} }
} }
} }
} }
fn load_entry( fn load_entry(
&mut self, mut watch: impl FnMut(&Path),
entry: loader::Entry, entry: loader::Entry,
watch: bool, do_watch: bool,
make_message: impl Fn(AbsPathBuf) -> loader::Message, send_message: impl Fn(AbsPathBuf),
) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> { ) -> Vec<(AbsPathBuf, Option<Vec<u8>>)> {
match entry { match entry {
loader::Entry::Files(files) => files loader::Entry::Files(files) => files
.into_iter() .into_iter()
.map(|file| { .map(|file| {
if watch { if do_watch {
self.watch(file.clone()); watch(file.as_ref());
} }
let contents = read(file.as_path()); let contents = read(file.as_path());
(file, contents) (file, contents)
@ -205,7 +257,7 @@ impl NotifyActor {
let mut res = Vec::new(); let mut res = Vec::new();
for root in &dirs.include { for root in &dirs.include {
self.send(make_message(root.clone())); send_message(root.clone());
let walkdir = let walkdir =
WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| { WalkDir::new(root).follow_links(true).into_iter().filter_entry(|entry| {
if !entry.file_type().is_dir() { if !entry.file_type().is_dir() {
@ -213,7 +265,7 @@ impl NotifyActor {
} }
let path = entry.path(); let path = entry.path();
if path_is_parent_symlink(path) { if path_might_be_cyclic(path) {
return false; return false;
} }
@ -230,10 +282,10 @@ impl NotifyActor {
) )
.ok()?; .ok()?;
if depth < 2 && is_dir { if depth < 2 && is_dir {
self.send(make_message(abs_path.clone())); send_message(abs_path.clone());
} }
if is_dir && watch { if is_dir && do_watch {
self.watch(abs_path.clone()); watch(abs_path.as_ref());
} }
if !is_file { if !is_file {
return None; return None;
@ -255,12 +307,13 @@ impl NotifyActor {
} }
} }
fn watch(&mut self, path: AbsPathBuf) { fn watch(&mut self, path: &Path) {
if let Some((watcher, _)) = &mut self.watcher { if let Some((watcher, _)) = &mut self.watcher {
log_notify_error(watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)); log_notify_error(watcher.watch(path, RecursiveMode::NonRecursive));
} }
} }
fn send(&mut self, msg: loader::Message) {
fn send(&self, msg: loader::Message) {
(self.sender)(msg); (self.sender)(msg);
} }
} }
@ -279,7 +332,7 @@ fn log_notify_error<T>(res: notify::Result<T>) -> Option<T> {
/// heuristic is not sufficient to catch all symlink cycles (it's /// heuristic is not sufficient to catch all symlink cycles (it's
/// possible to construct cycle using two or more symlinks), but it /// possible to construct cycle using two or more symlinks), but it
/// catches common cases. /// catches common cases.
fn path_is_parent_symlink(path: &Path) -> bool { fn path_might_be_cyclic(path: &Path) -> bool {
let Ok(destination) = std::fs::read_link(path) else { let Ok(destination) = std::fs::read_link(path) else {
return false; return false;
}; };

View file

@ -201,8 +201,8 @@ impl Vfs {
pub fn set_file_contents(&mut self, path: VfsPath, contents: Option<Vec<u8>>) -> bool { pub fn set_file_contents(&mut self, path: VfsPath, contents: Option<Vec<u8>>) -> bool {
let _p = span!(Level::INFO, "Vfs::set_file_contents").entered(); let _p = span!(Level::INFO, "Vfs::set_file_contents").entered();
let file_id = self.alloc_file_id(path); let file_id = self.alloc_file_id(path);
let state = self.get(file_id); let state: FileState = self.get(file_id);
let change_kind = match (state, contents) { let change = match (state, contents) {
(FileState::Deleted, None) => return false, (FileState::Deleted, None) => return false,
(FileState::Deleted, Some(v)) => { (FileState::Deleted, Some(v)) => {
let hash = hash_once::<FxHasher>(&*v); let hash = hash_once::<FxHasher>(&*v);
@ -225,7 +225,7 @@ impl Vfs {
}; };
}; };
let changed_file = ChangedFile { file_id, change: change_kind }; let changed_file = ChangedFile { file_id, change };
match self.changes.entry(file_id) { match self.changes.entry(file_id) {
// two changes to the same file in one cycle, merge them appropriately // two changes to the same file in one cycle, merge them appropriately
Entry::Occupied(mut o) => { Entry::Occupied(mut o) => {

View file

@ -43,6 +43,13 @@ pub struct Config {
pub watch: Vec<usize>, pub watch: Vec<usize>,
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum LoadingProgress {
Started,
Progress(usize),
Finished,
}
/// Message about an action taken by a [`Handle`]. /// Message about an action taken by a [`Handle`].
pub enum Message { pub enum Message {
/// Indicate a gradual progress. /// Indicate a gradual progress.
@ -52,7 +59,7 @@ pub enum Message {
/// The total files to be loaded. /// The total files to be loaded.
n_total: usize, n_total: usize,
/// The files that have been loaded successfully. /// The files that have been loaded successfully.
n_done: Option<usize>, n_done: LoadingProgress,
/// The dir being loaded, `None` if its for a file. /// The dir being loaded, `None` if its for a file.
dir: Option<AbsPathBuf>, dir: Option<AbsPathBuf>,
/// The [`Config`] version. /// The [`Config`] version.
@ -65,7 +72,7 @@ pub enum Message {
} }
/// Type that will receive [`Messages`](Message) from a [`Handle`]. /// Type that will receive [`Messages`](Message) from a [`Handle`].
pub type Sender = Box<dyn Fn(Message) + Send>; pub type Sender = Box<dyn Fn(Message) + Send + Sync>;
/// Interface for reading and watching files. /// Interface for reading and watching files.
pub trait Handle: fmt::Debug { pub trait Handle: fmt::Debug {