diff --git a/Cargo.lock b/Cargo.lock index 65f839fe84..8f087749fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1130,7 +1130,6 @@ dependencies = [ "relative-path 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hash 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.7 (registry+https://github.com/rust-lang/crates.io-index)", - "thread_worker 0.1.0", "walkdir 2.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/crates/ra_vfs/Cargo.toml b/crates/ra_vfs/Cargo.toml index 2fe3102ab0..fdaf31b9c6 100644 --- a/crates/ra_vfs/Cargo.toml +++ b/crates/ra_vfs/Cargo.toml @@ -13,8 +13,6 @@ log = "0.4.6" notify = "4.0.9" parking_lot = "0.7.0" -thread_worker = { path = "../thread_worker" } - [dev-dependencies] tempfile = "3" flexi_logger = "0.10.0" diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 8eb148a38b..b6a0576977 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -3,8 +3,9 @@ use std::{ path::{Path, PathBuf}, sync::{mpsc, Arc}, time::Duration, + thread, }; -use crossbeam_channel::{Sender, unbounded, RecvError, select}; +use crossbeam_channel::{Sender, Receiver, unbounded, RecvError, select}; use relative_path::RelativePathBuf; use walkdir::WalkDir; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; @@ -46,7 +47,40 @@ enum ChangeKind { const WATCHER_DELAY: Duration = Duration::from_millis(250); -pub(crate) type Worker = thread_worker::Worker; +// Like thread::JoinHandle, but joins the thread on drop. +// +// This is useful because it guarantees the absence of run-away threads, even if +// code panics. This is important, because we might seem panics in the test and +// we might be used in an IDE context, where a failed component is just +// restarted. +// +// Because all threads are joined, care must be taken to avoid deadlocks. That +// typically means ensuring that channels are dropped before the threads. +struct ScopedThread(Option>); + +impl ScopedThread { + fn spawn(name: String, f: impl FnOnce() + Send + 'static) -> ScopedThread { + let handle = thread::Builder::new().name(name).spawn(f).unwrap(); + ScopedThread(Some(handle)) + } +} + +impl Drop for ScopedThread { + fn drop(&mut self) { + let res = self.0.take().unwrap().join(); + if !thread::panicking() { + res.unwrap(); + } + } +} + +pub(crate) struct Worker { + // XXX: it's important to drop `sender` before `_thread` to avoid deadlock. + pub(crate) sender: Sender, + _thread: ScopedThread, + pub(crate) receiver: Receiver, +} + pub(crate) fn start(roots: Arc) -> Worker { // This is a pretty elaborate setup of threads & channels! It is // explained by the following concerns: @@ -55,69 +89,70 @@ pub(crate) fn start(roots: Arc) -> Worker { // * we want to read all files from a single thread, to guarantee that // we always get fresher versions and never go back in time. // * we want to tear down everything neatly during shutdown. - Worker::spawn( - "vfs", - 128, - // This are the channels we use to communicate with outside world. - // If `input_receiver` is closed we need to tear ourselves down. - // `output_sender` should not be closed unless the parent died. - move |input_receiver, output_sender| { - // Make sure that the destruction order is - // - // * notify_sender - // * _thread - // * watcher_sender - // - // this is required to avoid deadlocks. + let _thread; + // This are the channels we use to communicate with outside world. + // If `input_receiver` is closed we need to tear ourselves down. + // `output_sender` should not be closed unless the parent died. + let (input_sender, input_receiver) = unbounded(); + let (output_sender, output_receiver) = unbounded(); - // These are the corresponding crossbeam channels - let (watcher_sender, watcher_receiver) = unbounded(); - let _thread; - { - // These are `std` channels notify will send events to - let (notify_sender, notify_receiver) = mpsc::channel(); + _thread = ScopedThread::spawn("vfs".to_string(), move || { + // Make sure that the destruction order is + // + // * notify_sender + // * _thread + // * watcher_sender + // + // this is required to avoid deadlocks. - let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) - .map_err(|e| log::error!("failed to spawn notify {}", e)) - .ok(); - // Start a silly thread to transform between two channels - _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { - notify_receiver - .into_iter() - .for_each(|event| convert_notify_event(event, &watcher_sender)) - }); + // These are the corresponding crossbeam channels + let (watcher_sender, watcher_receiver) = unbounded(); + let _notify_thread; + { + // These are `std` channels notify will send events to + let (notify_sender, notify_receiver) = mpsc::channel(); - // Process requests from the called or notifications from - // watcher until the caller says stop. - loop { - select! { - // Received request from the caller. If this channel is - // closed, we should shutdown everything. - recv(input_receiver) -> t => match t { - Err(RecvError) => { - drop(input_receiver); - break - }, - Ok(Task::AddRoot { root }) => { - watch_root(watcher.as_mut(), &output_sender, &*roots, root); - } + let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) + .map_err(|e| log::error!("failed to spawn notify {}", e)) + .ok(); + // Start a silly thread to transform between two channels + _notify_thread = ScopedThread::spawn("notify-convertor".to_string(), move || { + notify_receiver + .into_iter() + .for_each(|event| convert_notify_event(event, &watcher_sender)) + }); + + // Process requests from the called or notifications from + // watcher until the caller says stop. + loop { + select! { + // Received request from the caller. If this channel is + // closed, we should shutdown everything. + recv(input_receiver) -> t => match t { + Err(RecvError) => { + drop(input_receiver); + break }, - // Watcher send us changes. If **this** channel is - // closed, the watcher has died, which indicates a bug - // -- escalate! - recv(watcher_receiver) -> event => match event { - Err(RecvError) => panic!("watcher is dead"), - Ok((path, change)) => { - handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); - } - }, - } + Ok(Task::AddRoot { root }) => { + watch_root(watcher.as_mut(), &output_sender, &*roots, root); + } + }, + // Watcher send us changes. If **this** channel is + // closed, the watcher has died, which indicates a bug + // -- escalate! + recv(watcher_receiver) -> event => match event { + Err(RecvError) => panic!("watcher is dead"), + Ok((path, change)) => { + handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); + } + }, } } - // Drain pending events: we are not interested in them anyways! - watcher_receiver.into_iter().for_each(|_| ()); - }, - ) + } + // Drain pending events: we are not interested in them anyways! + watcher_receiver.into_iter().for_each(|_| ()); + }); + Worker { sender: input_sender, _thread, receiver: output_receiver } } fn watch_root( diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 3cd11c9f67..808c138dfe 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -92,7 +92,7 @@ impl Vfs { for root in roots.iter() { root2files.insert(root, Default::default()); - worker.sender().send(io::Task::AddRoot { root }).unwrap(); + worker.sender.send(io::Task::AddRoot { root }).unwrap(); } let res = Vfs { roots, files: Vec::new(), root2files, worker, pending_changes: Vec::new() }; let vfs_roots = res.roots.iter().collect(); @@ -170,7 +170,7 @@ impl Vfs { } pub fn task_receiver(&self) -> &Receiver { - self.worker.receiver() + &self.worker.receiver } pub fn handle_task(&mut self, task: VfsTask) {