diff --git a/crates/ra_cargo_watch/src/lib.rs b/crates/ra_cargo_watch/src/lib.rs index c863866108..70afd7f8ac 100644 --- a/crates/ra_cargo_watch/src/lib.rs +++ b/crates/ra_cargo_watch/src/lib.rs @@ -2,7 +2,7 @@ //! another compatible command (f.x. clippy) in a background thread and provide //! LSP diagnostics based on the output of the command. use cargo_metadata::Message; -use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender, TryRecvError}; +use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender}; use lsp_types::{ Diagnostic, Url, WorkDoneProgress, WorkDoneProgressBegin, WorkDoneProgressEnd, WorkDoneProgressReport, @@ -191,7 +191,8 @@ impl CheckWatcherState { self.last_update_req.take(); self.shared.write().clear(task_send); - self.watcher.cancel(); + // By replacing the watcher, we drop the previous one which + // causes it to shut down automatically. self.watcher = WatchThread::new(&self.options, &self.workspace_root); } } @@ -277,9 +278,11 @@ impl CheckWatcherState { /// doesn't provide a way to read sub-process output without blocking, so we /// have to wrap sub-processes output handling in a thread and pass messages /// back over a channel. +/// The correct way to dispose of the thread is to drop it, on which the +/// sub-process will be killed, and the thread will be joined. struct WatchThread { + handle: Option>, message_recv: Receiver, - cancel_send: Sender<()>, } enum CheckEvent { @@ -302,9 +305,8 @@ impl WatchThread { args.extend(options.args.iter().cloned()); let (message_send, message_recv) = unbounded(); - let (cancel_send, cancel_recv) = unbounded(); let enabled = options.enable; - std::thread::spawn(move || { + let handle = std::thread::spawn(move || { if !enabled { return; } @@ -316,24 +318,56 @@ impl WatchThread { .spawn() .expect("couldn't launch cargo"); - message_send.send(CheckEvent::Begin).unwrap(); + // If we trigger an error here, we will do so in the loop instead, + // which will break out of the loop, and continue the shutdown + let _ = message_send.send(CheckEvent::Begin); + for message in cargo_metadata::parse_messages(command.stdout.take().unwrap()) { - match cancel_recv.try_recv() { - Ok(()) | Err(TryRecvError::Disconnected) => { - command.kill().expect("couldn't kill command"); + let message = match message { + Ok(message) => message, + Err(err) => { + log::error!("Invalid json from cargo check, ignoring: {}", err); + continue; + } + }; + + match message_send.send(CheckEvent::Msg(message)) { + Ok(()) => {} + Err(_err) => { + // The send channel was closed, so we want to shutdown + break; } - Err(TryRecvError::Empty) => (), } - - message_send.send(CheckEvent::Msg(message.unwrap())).unwrap(); } - message_send.send(CheckEvent::End).unwrap(); - }); - WatchThread { message_recv, cancel_send } - } - fn cancel(&self) { - let _ = self.cancel_send.send(()); + // We can ignore any error here, as we are already in the progress + // of shutting down. + let _ = message_send.send(CheckEvent::End); + + // It is okay to ignore the result, as it only errors if the process is already dead + let _ = command.kill(); + + // Again, we don't care about the exit status so just ignore the result + let _ = command.wait(); + }); + WatchThread { handle: Some(handle), message_recv } + } +} + +impl std::ops::Drop for WatchThread { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + // Replace our reciever with dummy one, so we can drop and close the + // one actually communicating with the thread + let recv = std::mem::replace(&mut self.message_recv, crossbeam_channel::never()); + + // Dropping the original reciever initiates thread sub-process shutdown + drop(recv); + + // Join the thread, it should finish shortly. We don't really care + // whether it panicked, so it is safe to ignore the result + let _ = handle.join(); + } } }