From c2c10b9014549e9c0783fb13dc202dfab6e6fd0a Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sun, 30 Dec 2018 23:23:31 +0300 Subject: [PATCH] :arrow_up: crossbeam closes #189 --- Cargo.lock | 39 ++------------- crates/ra_lsp_server/Cargo.toml | 2 +- crates/ra_lsp_server/src/main_loop.rs | 48 ++++++++++--------- crates/ra_lsp_server/src/project_model.rs | 4 +- .../tests/heavy_tests/support.rs | 13 +++-- crates/ra_vfs/Cargo.toml | 2 +- crates/ra_vfs/src/io.rs | 4 +- crates/ra_vfs/src/lib.rs | 2 +- crates/thread_worker/Cargo.toml | 2 +- crates/thread_worker/src/lib.rs | 6 +-- 10 files changed, 51 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f3723f2e0..0d63ec639d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,18 +164,6 @@ dependencies = [ "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "crossbeam-channel" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "crossbeam-channel" version = "0.3.6" @@ -210,19 +198,6 @@ dependencies = [ "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "crossbeam-epoch" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", - "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-utils 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "crossbeam-utils" version = "0.2.2" @@ -231,11 +206,6 @@ dependencies = [ "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "crossbeam-utils" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "crossbeam-utils" version = "0.6.3" @@ -775,7 +745,7 @@ name = "ra_lsp_server" version = "0.1.0" dependencies = [ "cargo_metadata 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", - "crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -835,7 +805,7 @@ dependencies = [ name = "ra_vfs" version = "0.1.0" dependencies = [ - "crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "relative-path 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hash 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1319,7 +1289,7 @@ dependencies = [ name = "thread_worker" version = "0.1.0" dependencies = [ - "crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1554,13 +1524,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "45912881121cb26fad7c38c17ba7daa18764771836b34fab7d3fbd93ed633878" "checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" -"checksum crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7b85741761b7f160bc5e7e0c14986ef685b7f8bf9b7ad081c60c604bb4649827" "checksum crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "137bc235f622ffaa0428e3854e24acb53291fc0b3ff6fb2cb75a8be6fb02f06b" "checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3" "checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150" -"checksum crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2449aaa4ec7ef96e5fb24db16024b935df718e9ae1cec0a1e68feeca2efca7b8" "checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9" -"checksum crossbeam-utils 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "677d453a17e8bd2b913fa38e8b9cf04bcdbb5be790aa294f2389661d72036015" "checksum crossbeam-utils 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "41ee4864f4797060e52044376f7d107429ce1fb43460021b126424b7180ee21a" "checksum derive-new 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "6ca414e896ae072546f4d789f452daaecf60ddee4c9df5dc6d5936d769e3d87c" "checksum deunicode 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "850878694b7933ca4c9569d30a34b55031b9b139ee1fc7b94a527c4ef960d690" diff --git a/crates/ra_lsp_server/Cargo.toml b/crates/ra_lsp_server/Cargo.toml index 646df24979..b1e8987fe7 100644 --- a/crates/ra_lsp_server/Cargo.toml +++ b/crates/ra_lsp_server/Cargo.toml @@ -13,7 +13,7 @@ failure_derive = "0.1.2" serde_json = "1.0.24" serde = "1.0.83" drop_bomb = "0.1.0" -crossbeam-channel = "0.2.4" +crossbeam-channel = "0.3.5" flexi_logger = "0.10.0" log = "0.4.3" url_serde = "0.2.0" diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 97c1be7788..3ebae4ecd3 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -7,7 +7,7 @@ use std::{ sync::Arc, }; -use crossbeam_channel::{unbounded, select, Receiver, Sender}; +use crossbeam_channel::{unbounded, select, Receiver, Sender, RecvError}; use gen_lsp_server::{ handle_shutdown, ErrorCode, RawMessage, RawNotification, RawRequest, RawResponse, }; @@ -62,7 +62,7 @@ pub fn main_loop( let (task_sender, task_receiver) = unbounded::(); let (ws_worker, ws_watcher) = workspace_loader(); - ws_worker.send(ws_root.clone()); + ws_worker.send(ws_root.clone()).unwrap(); // FIXME: support dynamic workspace loading. let workspaces = match ws_worker.recv().unwrap() { Ok(ws) => vec![ws], @@ -95,7 +95,9 @@ pub fn main_loop( ); log::info!("waiting for tasks to finish..."); - task_receiver.for_each(|task| on_task(task, msg_sender, &mut pending_requests)); + task_receiver + .into_iter() + .for_each(|task| on_task(task, msg_sender, &mut pending_requests)); log::info!("...tasks have finished"); log::info!("joining threadpool..."); drop(pool); @@ -170,16 +172,16 @@ fn main_loop_inner( loop { log::trace!("selecting"); let event = select! { - recv(msg_receiver, msg) => match msg { - Some(msg) => Event::Msg(msg), - None => bail!("client exited without shutdown"), + recv(msg_receiver) -> msg => match msg { + Ok(msg) => Event::Msg(msg), + Err(RecvError) => bail!("client exited without shutdown"), }, - recv(task_receiver, task) => Event::Task(task.unwrap()), - recv(state.vfs.read().task_receiver(), task) => match task { - None => bail!("vfs died"), - Some(task) => Event::Vfs(task), - } - recv(libdata_receiver, data) => Event::Lib(data.unwrap()) + recv(task_receiver) -> task => Event::Task(task.unwrap()), + recv(state.vfs.read().task_receiver()) -> task => match task { + Ok(task) => Event::Vfs(task), + Err(RecvError) => bail!("vfs died"), + }, + recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) }; log::info!("loop_turn = {:?}", event); let start = std::time::Instant::now(); @@ -209,7 +211,7 @@ fn main_loop_inner( ErrorCode::MethodNotFound as i32, "unknown request".to_string(), ); - msg_sender.send(RawMessage::Response(resp)) + msg_sender.send(RawMessage::Response(resp)).unwrap() } } } @@ -229,7 +231,7 @@ fn main_loop_inner( log::info!("indexing {:?} ... ", root); let data = LibraryData::prepare(root, files); log::info!("indexed {:?} {:?}", start.elapsed(), root); - sender.send(data); + sender.send(data).unwrap(); }); } if state.roots_to_scan == 0 { @@ -253,10 +255,12 @@ fn on_task(task: Task, msg_sender: &Sender, pending_requests: &mut F match task { Task::Respond(response) => { if pending_requests.remove(&response.id) { - msg_sender.send(RawMessage::Response(response)) + msg_sender.send(RawMessage::Response(response)).unwrap(); } } - Task::Notify(n) => msg_sender.send(RawMessage::Notification(n)), + Task::Notify(n) => { + msg_sender.send(RawMessage::Notification(n)).unwrap(); + } } } @@ -328,7 +332,7 @@ fn on_notification( ErrorCode::RequestCancelled as i32, "canceled by client".to_string(), ); - msg_sender.send(RawMessage::Response(response)) + msg_sender.send(RawMessage::Response(response)).unwrap() } return Ok(()); } @@ -381,7 +385,7 @@ fn on_notification( diagnostics: Vec::new(), }; let not = RawNotification::new::(¶ms); - msg_sender.send(RawMessage::Notification(not)); + msg_sender.send(RawMessage::Notification(not)).unwrap(); return Ok(()); } Err(not) => not, @@ -441,7 +445,7 @@ impl<'a> PoolDispatcher<'a> { }, }; let task = Task::Respond(resp); - sender.send(task); + sender.send(task).unwrap(); }); self.res = Some(id); } @@ -476,7 +480,7 @@ fn update_file_notifications_on_threadpool( } Ok(params) => { let not = RawNotification::new::(¶ms); - sender.send(Task::Notify(not)); + sender.send(Task::Notify(not)).unwrap(); } } if publish_decorations { @@ -488,7 +492,7 @@ fn update_file_notifications_on_threadpool( } Ok(params) => { let not = RawNotification::new::(¶ms); - sender.send(Task::Notify(not)) + sender.send(Task::Notify(not)).unwrap(); } } } @@ -501,7 +505,7 @@ fn feedback(intrnal_mode: bool, msg: &str, sender: &Sender) { return; } let not = RawNotification::new::(&msg.to_string()); - sender.send(RawMessage::Notification(not)); + sender.send(RawMessage::Notification(not)).unwrap(); } fn is_canceled(e: &failure::Error) -> bool { diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs index 5852a157d1..ae21494633 100644 --- a/crates/ra_lsp_server/src/project_model.rs +++ b/crates/ra_lsp_server/src/project_model.rs @@ -204,8 +204,10 @@ pub fn workspace_loader() -> (Worker>, WorkerHan 1, |input_receiver, output_sender| { input_receiver + .into_iter() .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) - .for_each(|it| output_sender.send(it)) + .try_for_each(|it| output_sender.send(it)) + .unwrap() }, ) } diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs index c14d287cac..82ba12f876 100644 --- a/crates/ra_lsp_server/tests/heavy_tests/support.rs +++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs @@ -118,7 +118,11 @@ impl Server { } fn send_request_(&self, r: RawRequest) -> Value { let id = r.id; - self.worker.as_ref().unwrap().send(RawMessage::Request(r)); + self.worker + .as_ref() + .unwrap() + .send(RawMessage::Request(r)) + .unwrap(); while let Some(msg) = self.recv() { match msg { RawMessage::Request(req) => panic!("unexpected request: {:?}", req), @@ -167,7 +171,8 @@ impl Server { self.worker .as_ref() .unwrap() - .send(RawMessage::Notification(not)); + .send(RawMessage::Notification(not)) + .unwrap(); } } @@ -185,7 +190,7 @@ impl Drop for Server { fn recv_timeout(receiver: &Receiver) -> Option { let timeout = Duration::from_secs(5); select! { - recv(receiver, msg) => msg, - recv(after(timeout)) => panic!("timed out"), + recv(receiver) -> msg => msg.ok(), + recv(after(timeout)) -> _ => panic!("timed out"), } } diff --git a/crates/ra_vfs/Cargo.toml b/crates/ra_vfs/Cargo.toml index ccea8a866f..7c170cdfcd 100644 --- a/crates/ra_vfs/Cargo.toml +++ b/crates/ra_vfs/Cargo.toml @@ -8,7 +8,7 @@ authors = ["Aleksey Kladov "] walkdir = "2.2.7" relative-path = "0.4.0" rustc-hash = "1.0" -crossbeam-channel = "0.2.4" +crossbeam-channel = "0.3.5" log = "0.4.6" thread_worker = { path = "../thread_worker" } diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 4cfdb83da2..80328ad186 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -32,8 +32,10 @@ pub(crate) type Worker = thread_worker::Worker; pub(crate) fn start() -> (Worker, WorkerHandle) { thread_worker::spawn("vfs", 128, |input_receiver, output_sender| { input_receiver + .into_iter() .map(handle_task) - .for_each(|it| output_sender.send(it)) + .try_for_each(|it| output_sender.send(it)) + .unwrap() }) } diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 90d5e21f43..757eac95bb 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -148,7 +148,7 @@ impl Vfs { path: path.clone(), filter: Box::new(filter), }; - res.worker.inp.send(task); + res.worker.inp.send(task).unwrap(); } let roots = res.roots.iter().map(|(id, _)| id).collect(); (res, roots) diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml index 62d66a1a3d..c74b376e22 100644 --- a/crates/thread_worker/Cargo.toml +++ b/crates/thread_worker/Cargo.toml @@ -6,6 +6,6 @@ authors = ["Aleksey Kladov "] [dependencies] drop_bomb = "0.1.0" -crossbeam-channel = "0.2.4" +crossbeam-channel = "0.3.5" log = "0.4.3" diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs index 12e8bf17ed..5e46f62fe9 100644 --- a/crates/thread_worker/src/lib.rs +++ b/crates/thread_worker/src/lib.rs @@ -2,7 +2,7 @@ use std::thread; -use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; +use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError}; use drop_bomb::DropBomb; pub struct Worker { @@ -34,10 +34,10 @@ impl Worker { self.out } - pub fn send(&self, item: I) { + pub fn send(&self, item: I) -> Result<(), SendError> { self.inp.send(item) } - pub fn recv(&self) -> Option { + pub fn recv(&self) -> Result { self.out.recv() } }