diff --git a/Cargo.lock b/Cargo.lock index 2648768553..d9fa697a6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -578,7 +578,6 @@ dependencies = [ "cfg", "cov-mark", "crossbeam-channel", - "crossbeam-utils", "dot", "either", "expect-test", diff --git a/crates/ide/Cargo.toml b/crates/ide/Cargo.toml index 8cddc1e8ec..a796f25213 100644 --- a/crates/ide/Cargo.toml +++ b/crates/ide/Cargo.toml @@ -12,7 +12,6 @@ doctest = false [dependencies] cov-mark = "2.0.0-pre.1" crossbeam-channel = "0.5.0" -crossbeam-utils = "0.8.5" either = "1.5.3" itertools = "0.10.0" tracing = "0.1" diff --git a/crates/ide/src/prime_caches.rs b/crates/ide/src/prime_caches.rs index b3413c0326..8aef76eae8 100644 --- a/crates/ide/src/prime_caches.rs +++ b/crates/ide/src/prime_caches.rs @@ -8,7 +8,7 @@ use hir::db::DefDatabase; use ide_db::{ base_db::{ salsa::{Database, ParallelDatabase, Snapshot}, - CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt, + Cancelled, CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt, }, FxIndexMap, }; @@ -54,84 +54,81 @@ pub(crate) fn parallel_prime_caches( builder.build() }; - crossbeam_utils::thread::scope(move |s| { - let (work_sender, work_receiver) = crossbeam_channel::unbounded(); - let (progress_sender, progress_receiver) = crossbeam_channel::unbounded(); + let (work_sender, work_receiver) = crossbeam_channel::unbounded(); + let (progress_sender, progress_receiver) = crossbeam_channel::unbounded(); - enum ParallelPrimeCacheWorkerProgress { - BeginCrate { crate_id: CrateId, crate_name: String }, - EndCrate { crate_id: CrateId }, + enum ParallelPrimeCacheWorkerProgress { + BeginCrate { crate_id: CrateId, crate_name: String }, + EndCrate { crate_id: CrateId }, + } + + let prime_caches_worker = move |db: Snapshot| { + while let Ok((crate_id, crate_name)) = work_receiver.recv() { + progress_sender + .send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?; + + // This also computes the DefMap + db.import_map(crate_id); + + progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?; } - let prime_caches_worker = move |db: Snapshot| { - while let Ok((crate_id, crate_name)) = work_receiver.recv() { - progress_sender - .send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?; + Ok::<_, crossbeam_channel::SendError<_>>(()) + }; - // This also computes the DefMap - db.import_map(crate_id); + for _ in 0..num_worker_threads { + let worker = prime_caches_worker.clone(); + let db = db.snapshot(); + std::thread::spawn(move || Cancelled::catch(|| worker(db))); + } - progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?; + let crates_total = crates_to_prime.pending(); + let mut crates_done = 0; + + // an index map is used to preserve ordering so we can sort the progress report in order of + // "longest crate to index" first + let mut crates_currently_indexing = + FxIndexMap::with_capacity_and_hasher(num_worker_threads as _, Default::default()); + + while !crates_to_prime.is_empty() { + db.unwind_if_cancelled(); + + for crate_id in &mut crates_to_prime { + work_sender + .send(( + crate_id, + graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(), + )) + .ok(); + } + + let worker_progress = match progress_receiver.recv() { + Ok(p) => p, + Err(_) => { + // our workers may have died from a cancelled task, so we'll check and re-raise here. + db.unwind_if_cancelled(); + break; + } + }; + match worker_progress { + ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => { + crates_currently_indexing.insert(crate_id, crate_name); + } + ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => { + crates_currently_indexing.remove(&crate_id); + crates_to_prime.mark_done(crate_id); + crates_done += 1; } - - Ok::<_, crossbeam_channel::SendError<_>>(()) }; - for _ in 0..num_worker_threads { - let worker = prime_caches_worker.clone(); - let db = db.snapshot(); - s.spawn(move |_| worker(db)); - } + let progress = ParallelPrimeCachesProgress { + crates_currently_indexing: crates_currently_indexing.values().cloned().collect(), + crates_done, + crates_total, + }; - let crates_total = crates_to_prime.len(); - let mut crates_done = 0; - - // an index map is used to preserve ordering so we can sort the progress report in order of - // "longest crate to index" first - let mut crates_currently_indexing = - FxIndexMap::with_capacity_and_hasher(num_worker_threads as _, Default::default()); - - while !crates_to_prime.is_empty() { - db.unwind_if_cancelled(); - - for crate_id in &mut crates_to_prime { - work_sender - .send(( - crate_id, - graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(), - )) - .ok(); - } - - let worker_progress = match progress_receiver.recv() { - Ok(p) => p, - Err(_) => { - // our workers may have died from a cancelled task, so we'll check and re-raise here. - db.unwind_if_cancelled(); - break; - } - }; - match worker_progress { - ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => { - crates_currently_indexing.insert(crate_id, crate_name); - } - ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => { - crates_currently_indexing.remove(&crate_id); - crates_to_prime.mark_done(crate_id); - crates_done += 1; - } - }; - - let progress = ParallelPrimeCachesProgress { - crates_currently_indexing: crates_currently_indexing.values().cloned().collect(), - crates_done, - crates_total, - }; - - cb(progress); - } - }) - .unwrap(); + cb(progress); + } } fn compute_crates_to_prime(db: &RootDatabase, graph: &CrateGraph) -> FxHashSet { diff --git a/crates/ide/src/prime_caches/topologic_sort.rs b/crates/ide/src/prime_caches/topologic_sort.rs index 859c454283..68ba2d4a7b 100644 --- a/crates/ide/src/prime_caches/topologic_sort.rs +++ b/crates/ide/src/prime_caches/topologic_sort.rs @@ -56,12 +56,12 @@ where TopologicSortIterBuilder::new() } - pub(crate) fn len(&self) -> usize { + pub(crate) fn pending(&self) -> usize { self.nodes.len() } pub(crate) fn is_empty(&self) -> bool { - self.len() == 0 + self.nodes.len() == 0 && self.ready.len() == 0 } pub(crate) fn mark_done(&mut self, item: T) {