mirror of
https://github.com/rust-lang/rust-analyzer
synced 2024-12-26 13:03:31 +00:00
drop dependency on thread_worker
This commit is contained in:
parent
9da3705191
commit
def7bc0ec5
4 changed files with 96 additions and 64 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1130,7 +1130,6 @@ dependencies = [
|
|||
"relative-path 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rustc-hash 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"thread_worker 0.1.0",
|
||||
"walkdir 2.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
|
|
|
@ -13,8 +13,6 @@ log = "0.4.6"
|
|||
notify = "4.0.9"
|
||||
parking_lot = "0.7.0"
|
||||
|
||||
thread_worker = { path = "../thread_worker" }
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
flexi_logger = "0.10.0"
|
||||
|
|
|
@ -3,8 +3,9 @@ use std::{
|
|||
path::{Path, PathBuf},
|
||||
sync::{mpsc, Arc},
|
||||
time::Duration,
|
||||
thread,
|
||||
};
|
||||
use crossbeam_channel::{Sender, unbounded, RecvError, select};
|
||||
use crossbeam_channel::{Sender, Receiver, unbounded, RecvError, select};
|
||||
use relative_path::RelativePathBuf;
|
||||
use walkdir::WalkDir;
|
||||
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
|
||||
|
@ -46,7 +47,40 @@ enum ChangeKind {
|
|||
|
||||
const WATCHER_DELAY: Duration = Duration::from_millis(250);
|
||||
|
||||
pub(crate) type Worker = thread_worker::Worker<Task, VfsTask>;
|
||||
// Like thread::JoinHandle, but joins the thread on drop.
|
||||
//
|
||||
// This is useful because it guarantees the absence of run-away threads, even if
|
||||
// code panics. This is important, because we might seem panics in the test and
|
||||
// we might be used in an IDE context, where a failed component is just
|
||||
// restarted.
|
||||
//
|
||||
// Because all threads are joined, care must be taken to avoid deadlocks. That
|
||||
// typically means ensuring that channels are dropped before the threads.
|
||||
struct ScopedThread(Option<thread::JoinHandle<()>>);
|
||||
|
||||
impl ScopedThread {
|
||||
fn spawn(name: String, f: impl FnOnce() + Send + 'static) -> ScopedThread {
|
||||
let handle = thread::Builder::new().name(name).spawn(f).unwrap();
|
||||
ScopedThread(Some(handle))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ScopedThread {
|
||||
fn drop(&mut self) {
|
||||
let res = self.0.take().unwrap().join();
|
||||
if !thread::panicking() {
|
||||
res.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Worker {
|
||||
// XXX: it's important to drop `sender` before `_thread` to avoid deadlock.
|
||||
pub(crate) sender: Sender<Task>,
|
||||
_thread: ScopedThread,
|
||||
pub(crate) receiver: Receiver<VfsTask>,
|
||||
}
|
||||
|
||||
pub(crate) fn start(roots: Arc<Roots>) -> Worker {
|
||||
// This is a pretty elaborate setup of threads & channels! It is
|
||||
// explained by the following concerns:
|
||||
|
@ -55,69 +89,70 @@ pub(crate) fn start(roots: Arc<Roots>) -> Worker {
|
|||
// * we want to read all files from a single thread, to guarantee that
|
||||
// we always get fresher versions and never go back in time.
|
||||
// * we want to tear down everything neatly during shutdown.
|
||||
Worker::spawn(
|
||||
"vfs",
|
||||
128,
|
||||
// This are the channels we use to communicate with outside world.
|
||||
// If `input_receiver` is closed we need to tear ourselves down.
|
||||
// `output_sender` should not be closed unless the parent died.
|
||||
move |input_receiver, output_sender| {
|
||||
// Make sure that the destruction order is
|
||||
//
|
||||
// * notify_sender
|
||||
// * _thread
|
||||
// * watcher_sender
|
||||
//
|
||||
// this is required to avoid deadlocks.
|
||||
let _thread;
|
||||
// This are the channels we use to communicate with outside world.
|
||||
// If `input_receiver` is closed we need to tear ourselves down.
|
||||
// `output_sender` should not be closed unless the parent died.
|
||||
let (input_sender, input_receiver) = unbounded();
|
||||
let (output_sender, output_receiver) = unbounded();
|
||||
|
||||
// These are the corresponding crossbeam channels
|
||||
let (watcher_sender, watcher_receiver) = unbounded();
|
||||
let _thread;
|
||||
{
|
||||
// These are `std` channels notify will send events to
|
||||
let (notify_sender, notify_receiver) = mpsc::channel();
|
||||
_thread = ScopedThread::spawn("vfs".to_string(), move || {
|
||||
// Make sure that the destruction order is
|
||||
//
|
||||
// * notify_sender
|
||||
// * _thread
|
||||
// * watcher_sender
|
||||
//
|
||||
// this is required to avoid deadlocks.
|
||||
|
||||
let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
|
||||
.map_err(|e| log::error!("failed to spawn notify {}", e))
|
||||
.ok();
|
||||
// Start a silly thread to transform between two channels
|
||||
_thread = thread_worker::ScopedThread::spawn("notify-convertor", move || {
|
||||
notify_receiver
|
||||
.into_iter()
|
||||
.for_each(|event| convert_notify_event(event, &watcher_sender))
|
||||
});
|
||||
// These are the corresponding crossbeam channels
|
||||
let (watcher_sender, watcher_receiver) = unbounded();
|
||||
let _notify_thread;
|
||||
{
|
||||
// These are `std` channels notify will send events to
|
||||
let (notify_sender, notify_receiver) = mpsc::channel();
|
||||
|
||||
// Process requests from the called or notifications from
|
||||
// watcher until the caller says stop.
|
||||
loop {
|
||||
select! {
|
||||
// Received request from the caller. If this channel is
|
||||
// closed, we should shutdown everything.
|
||||
recv(input_receiver) -> t => match t {
|
||||
Err(RecvError) => {
|
||||
drop(input_receiver);
|
||||
break
|
||||
},
|
||||
Ok(Task::AddRoot { root }) => {
|
||||
watch_root(watcher.as_mut(), &output_sender, &*roots, root);
|
||||
}
|
||||
let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
|
||||
.map_err(|e| log::error!("failed to spawn notify {}", e))
|
||||
.ok();
|
||||
// Start a silly thread to transform between two channels
|
||||
_notify_thread = ScopedThread::spawn("notify-convertor".to_string(), move || {
|
||||
notify_receiver
|
||||
.into_iter()
|
||||
.for_each(|event| convert_notify_event(event, &watcher_sender))
|
||||
});
|
||||
|
||||
// Process requests from the called or notifications from
|
||||
// watcher until the caller says stop.
|
||||
loop {
|
||||
select! {
|
||||
// Received request from the caller. If this channel is
|
||||
// closed, we should shutdown everything.
|
||||
recv(input_receiver) -> t => match t {
|
||||
Err(RecvError) => {
|
||||
drop(input_receiver);
|
||||
break
|
||||
},
|
||||
// Watcher send us changes. If **this** channel is
|
||||
// closed, the watcher has died, which indicates a bug
|
||||
// -- escalate!
|
||||
recv(watcher_receiver) -> event => match event {
|
||||
Err(RecvError) => panic!("watcher is dead"),
|
||||
Ok((path, change)) => {
|
||||
handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
|
||||
}
|
||||
},
|
||||
}
|
||||
Ok(Task::AddRoot { root }) => {
|
||||
watch_root(watcher.as_mut(), &output_sender, &*roots, root);
|
||||
}
|
||||
},
|
||||
// Watcher send us changes. If **this** channel is
|
||||
// closed, the watcher has died, which indicates a bug
|
||||
// -- escalate!
|
||||
recv(watcher_receiver) -> event => match event {
|
||||
Err(RecvError) => panic!("watcher is dead"),
|
||||
Ok((path, change)) => {
|
||||
handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
// Drain pending events: we are not interested in them anyways!
|
||||
watcher_receiver.into_iter().for_each(|_| ());
|
||||
},
|
||||
)
|
||||
}
|
||||
// Drain pending events: we are not interested in them anyways!
|
||||
watcher_receiver.into_iter().for_each(|_| ());
|
||||
});
|
||||
Worker { sender: input_sender, _thread, receiver: output_receiver }
|
||||
}
|
||||
|
||||
fn watch_root(
|
||||
|
|
|
@ -92,7 +92,7 @@ impl Vfs {
|
|||
|
||||
for root in roots.iter() {
|
||||
root2files.insert(root, Default::default());
|
||||
worker.sender().send(io::Task::AddRoot { root }).unwrap();
|
||||
worker.sender.send(io::Task::AddRoot { root }).unwrap();
|
||||
}
|
||||
let res = Vfs { roots, files: Vec::new(), root2files, worker, pending_changes: Vec::new() };
|
||||
let vfs_roots = res.roots.iter().collect();
|
||||
|
@ -170,7 +170,7 @@ impl Vfs {
|
|||
}
|
||||
|
||||
pub fn task_receiver(&self) -> &Receiver<VfsTask> {
|
||||
self.worker.receiver()
|
||||
&self.worker.receiver
|
||||
}
|
||||
|
||||
pub fn handle_task(&mut self, task: VfsTask) {
|
||||
|
|
Loading…
Reference in a new issue