removed scoped threads, cleanup

This commit is contained in:
Jake Heinz 2022-01-14 10:06:28 +00:00
parent f83c0166be
commit c3f30ae4f0
4 changed files with 69 additions and 74 deletions

1
Cargo.lock generated
View file

@ -578,7 +578,6 @@ dependencies = [
"cfg", "cfg",
"cov-mark", "cov-mark",
"crossbeam-channel", "crossbeam-channel",
"crossbeam-utils",
"dot", "dot",
"either", "either",
"expect-test", "expect-test",

View file

@ -12,7 +12,6 @@ doctest = false
[dependencies] [dependencies]
cov-mark = "2.0.0-pre.1" cov-mark = "2.0.0-pre.1"
crossbeam-channel = "0.5.0" crossbeam-channel = "0.5.0"
crossbeam-utils = "0.8.5"
either = "1.5.3" either = "1.5.3"
itertools = "0.10.0" itertools = "0.10.0"
tracing = "0.1" tracing = "0.1"

View file

@ -8,7 +8,7 @@ use hir::db::DefDatabase;
use ide_db::{ use ide_db::{
base_db::{ base_db::{
salsa::{Database, ParallelDatabase, Snapshot}, salsa::{Database, ParallelDatabase, Snapshot},
CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt, Cancelled, CrateGraph, CrateId, SourceDatabase, SourceDatabaseExt,
}, },
FxIndexMap, FxIndexMap,
}; };
@ -54,84 +54,81 @@ pub(crate) fn parallel_prime_caches(
builder.build() builder.build()
}; };
crossbeam_utils::thread::scope(move |s| { let (work_sender, work_receiver) = crossbeam_channel::unbounded();
let (work_sender, work_receiver) = crossbeam_channel::unbounded(); let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
let (progress_sender, progress_receiver) = crossbeam_channel::unbounded();
enum ParallelPrimeCacheWorkerProgress { enum ParallelPrimeCacheWorkerProgress {
BeginCrate { crate_id: CrateId, crate_name: String }, BeginCrate { crate_id: CrateId, crate_name: String },
EndCrate { crate_id: CrateId }, EndCrate { crate_id: CrateId },
}
let prime_caches_worker = move |db: Snapshot<RootDatabase>| {
while let Ok((crate_id, crate_name)) = work_receiver.recv() {
progress_sender
.send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
// This also computes the DefMap
db.import_map(crate_id);
progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?;
} }
let prime_caches_worker = move |db: Snapshot<RootDatabase>| { Ok::<_, crossbeam_channel::SendError<_>>(())
while let Ok((crate_id, crate_name)) = work_receiver.recv() { };
progress_sender
.send(ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name })?;
// This also computes the DefMap for _ in 0..num_worker_threads {
db.import_map(crate_id); let worker = prime_caches_worker.clone();
let db = db.snapshot();
std::thread::spawn(move || Cancelled::catch(|| worker(db)));
}
progress_sender.send(ParallelPrimeCacheWorkerProgress::EndCrate { crate_id })?; let crates_total = crates_to_prime.pending();
let mut crates_done = 0;
// an index map is used to preserve ordering so we can sort the progress report in order of
// "longest crate to index" first
let mut crates_currently_indexing =
FxIndexMap::with_capacity_and_hasher(num_worker_threads as _, Default::default());
while !crates_to_prime.is_empty() {
db.unwind_if_cancelled();
for crate_id in &mut crates_to_prime {
work_sender
.send((
crate_id,
graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(),
))
.ok();
}
let worker_progress = match progress_receiver.recv() {
Ok(p) => p,
Err(_) => {
// our workers may have died from a cancelled task, so we'll check and re-raise here.
db.unwind_if_cancelled();
break;
}
};
match worker_progress {
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
crates_currently_indexing.insert(crate_id, crate_name);
}
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
crates_currently_indexing.remove(&crate_id);
crates_to_prime.mark_done(crate_id);
crates_done += 1;
} }
Ok::<_, crossbeam_channel::SendError<_>>(())
}; };
for _ in 0..num_worker_threads { let progress = ParallelPrimeCachesProgress {
let worker = prime_caches_worker.clone(); crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
let db = db.snapshot(); crates_done,
s.spawn(move |_| worker(db)); crates_total,
} };
let crates_total = crates_to_prime.len(); cb(progress);
let mut crates_done = 0; }
// an index map is used to preserve ordering so we can sort the progress report in order of
// "longest crate to index" first
let mut crates_currently_indexing =
FxIndexMap::with_capacity_and_hasher(num_worker_threads as _, Default::default());
while !crates_to_prime.is_empty() {
db.unwind_if_cancelled();
for crate_id in &mut crates_to_prime {
work_sender
.send((
crate_id,
graph[crate_id].display_name.as_deref().unwrap_or_default().to_string(),
))
.ok();
}
let worker_progress = match progress_receiver.recv() {
Ok(p) => p,
Err(_) => {
// our workers may have died from a cancelled task, so we'll check and re-raise here.
db.unwind_if_cancelled();
break;
}
};
match worker_progress {
ParallelPrimeCacheWorkerProgress::BeginCrate { crate_id, crate_name } => {
crates_currently_indexing.insert(crate_id, crate_name);
}
ParallelPrimeCacheWorkerProgress::EndCrate { crate_id } => {
crates_currently_indexing.remove(&crate_id);
crates_to_prime.mark_done(crate_id);
crates_done += 1;
}
};
let progress = ParallelPrimeCachesProgress {
crates_currently_indexing: crates_currently_indexing.values().cloned().collect(),
crates_done,
crates_total,
};
cb(progress);
}
})
.unwrap();
} }
fn compute_crates_to_prime(db: &RootDatabase, graph: &CrateGraph) -> FxHashSet<CrateId> { fn compute_crates_to_prime(db: &RootDatabase, graph: &CrateGraph) -> FxHashSet<CrateId> {

View file

@ -56,12 +56,12 @@ where
TopologicSortIterBuilder::new() TopologicSortIterBuilder::new()
} }
pub(crate) fn len(&self) -> usize { pub(crate) fn pending(&self) -> usize {
self.nodes.len() self.nodes.len()
} }
pub(crate) fn is_empty(&self) -> bool { pub(crate) fn is_empty(&self) -> bool {
self.len() == 0 self.nodes.len() == 0 && self.ready.len() == 0
} }
pub(crate) fn mark_done(&mut self, item: T) { pub(crate) fn mark_done(&mut self, item: T) {