⬆️ crossbeam

closes #189
This commit is contained in:
Aleksey Kladov 2018-12-30 23:23:31 +03:00
parent effc1eae8b
commit c2c10b9014
10 changed files with 51 additions and 71 deletions

39
Cargo.lock generated
View file

@ -164,18 +164,6 @@ dependencies = [
"bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-channel"
version = "0.3.6"
@ -210,19 +198,6 @@ dependencies = [
"scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-epoch"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-utils"
version = "0.2.2"
@ -231,11 +206,6 @@ dependencies = [
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-utils"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "crossbeam-utils"
version = "0.6.3"
@ -775,7 +745,7 @@ name = "ra_lsp_server"
version = "0.1.0"
dependencies = [
"cargo_metadata 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -835,7 +805,7 @@ dependencies = [
name = "ra_vfs"
version = "0.1.0"
dependencies = [
"crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"relative-path 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hash 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1319,7 +1289,7 @@ dependencies = [
name = "thread_worker"
version = "0.1.0"
dependencies = [
"crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1554,13 +1524,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "45912881121cb26fad7c38c17ba7daa18764771836b34fab7d3fbd93ed633878"
"checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7b85741761b7f160bc5e7e0c14986ef685b7f8bf9b7ad081c60c604bb4649827"
"checksum crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "137bc235f622ffaa0428e3854e24acb53291fc0b3ff6fb2cb75a8be6fb02f06b"
"checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3"
"checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150"
"checksum crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2449aaa4ec7ef96e5fb24db16024b935df718e9ae1cec0a1e68feeca2efca7b8"
"checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9"
"checksum crossbeam-utils 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "677d453a17e8bd2b913fa38e8b9cf04bcdbb5be790aa294f2389661d72036015"
"checksum crossbeam-utils 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "41ee4864f4797060e52044376f7d107429ce1fb43460021b126424b7180ee21a"
"checksum derive-new 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "6ca414e896ae072546f4d789f452daaecf60ddee4c9df5dc6d5936d769e3d87c"
"checksum deunicode 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "850878694b7933ca4c9569d30a34b55031b9b139ee1fc7b94a527c4ef960d690"

View file

@ -13,7 +13,7 @@ failure_derive = "0.1.2"
serde_json = "1.0.24"
serde = "1.0.83"
drop_bomb = "0.1.0"
crossbeam-channel = "0.2.4"
crossbeam-channel = "0.3.5"
flexi_logger = "0.10.0"
log = "0.4.3"
url_serde = "0.2.0"

View file

@ -7,7 +7,7 @@ use std::{
sync::Arc,
};
use crossbeam_channel::{unbounded, select, Receiver, Sender};
use crossbeam_channel::{unbounded, select, Receiver, Sender, RecvError};
use gen_lsp_server::{
handle_shutdown, ErrorCode, RawMessage, RawNotification, RawRequest, RawResponse,
};
@ -62,7 +62,7 @@ pub fn main_loop(
let (task_sender, task_receiver) = unbounded::<Task>();
let (ws_worker, ws_watcher) = workspace_loader();
ws_worker.send(ws_root.clone());
ws_worker.send(ws_root.clone()).unwrap();
// FIXME: support dynamic workspace loading.
let workspaces = match ws_worker.recv().unwrap() {
Ok(ws) => vec![ws],
@ -95,7 +95,9 @@ pub fn main_loop(
);
log::info!("waiting for tasks to finish...");
task_receiver.for_each(|task| on_task(task, msg_sender, &mut pending_requests));
task_receiver
.into_iter()
.for_each(|task| on_task(task, msg_sender, &mut pending_requests));
log::info!("...tasks have finished");
log::info!("joining threadpool...");
drop(pool);
@ -170,16 +172,16 @@ fn main_loop_inner(
loop {
log::trace!("selecting");
let event = select! {
recv(msg_receiver, msg) => match msg {
Some(msg) => Event::Msg(msg),
None => bail!("client exited without shutdown"),
recv(msg_receiver) -> msg => match msg {
Ok(msg) => Event::Msg(msg),
Err(RecvError) => bail!("client exited without shutdown"),
},
recv(task_receiver, task) => Event::Task(task.unwrap()),
recv(state.vfs.read().task_receiver(), task) => match task {
None => bail!("vfs died"),
Some(task) => Event::Vfs(task),
}
recv(libdata_receiver, data) => Event::Lib(data.unwrap())
recv(task_receiver) -> task => Event::Task(task.unwrap()),
recv(state.vfs.read().task_receiver()) -> task => match task {
Ok(task) => Event::Vfs(task),
Err(RecvError) => bail!("vfs died"),
},
recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
};
log::info!("loop_turn = {:?}", event);
let start = std::time::Instant::now();
@ -209,7 +211,7 @@ fn main_loop_inner(
ErrorCode::MethodNotFound as i32,
"unknown request".to_string(),
);
msg_sender.send(RawMessage::Response(resp))
msg_sender.send(RawMessage::Response(resp)).unwrap()
}
}
}
@ -229,7 +231,7 @@ fn main_loop_inner(
log::info!("indexing {:?} ... ", root);
let data = LibraryData::prepare(root, files);
log::info!("indexed {:?} {:?}", start.elapsed(), root);
sender.send(data);
sender.send(data).unwrap();
});
}
if state.roots_to_scan == 0 {
@ -253,10 +255,12 @@ fn on_task(task: Task, msg_sender: &Sender<RawMessage>, pending_requests: &mut F
match task {
Task::Respond(response) => {
if pending_requests.remove(&response.id) {
msg_sender.send(RawMessage::Response(response))
msg_sender.send(RawMessage::Response(response)).unwrap();
}
}
Task::Notify(n) => msg_sender.send(RawMessage::Notification(n)),
Task::Notify(n) => {
msg_sender.send(RawMessage::Notification(n)).unwrap();
}
}
}
@ -328,7 +332,7 @@ fn on_notification(
ErrorCode::RequestCancelled as i32,
"canceled by client".to_string(),
);
msg_sender.send(RawMessage::Response(response))
msg_sender.send(RawMessage::Response(response)).unwrap()
}
return Ok(());
}
@ -381,7 +385,7 @@ fn on_notification(
diagnostics: Vec::new(),
};
let not = RawNotification::new::<req::PublishDiagnostics>(&params);
msg_sender.send(RawMessage::Notification(not));
msg_sender.send(RawMessage::Notification(not)).unwrap();
return Ok(());
}
Err(not) => not,
@ -441,7 +445,7 @@ impl<'a> PoolDispatcher<'a> {
},
};
let task = Task::Respond(resp);
sender.send(task);
sender.send(task).unwrap();
});
self.res = Some(id);
}
@ -476,7 +480,7 @@ fn update_file_notifications_on_threadpool(
}
Ok(params) => {
let not = RawNotification::new::<req::PublishDiagnostics>(&params);
sender.send(Task::Notify(not));
sender.send(Task::Notify(not)).unwrap();
}
}
if publish_decorations {
@ -488,7 +492,7 @@ fn update_file_notifications_on_threadpool(
}
Ok(params) => {
let not = RawNotification::new::<req::PublishDecorations>(&params);
sender.send(Task::Notify(not))
sender.send(Task::Notify(not)).unwrap();
}
}
}
@ -501,7 +505,7 @@ fn feedback(intrnal_mode: bool, msg: &str, sender: &Sender<RawMessage>) {
return;
}
let not = RawNotification::new::<req::InternalFeedback>(&msg.to_string());
sender.send(RawMessage::Notification(not));
sender.send(RawMessage::Notification(not)).unwrap();
}
fn is_canceled(e: &failure::Error) -> bool {

View file

@ -204,8 +204,10 @@ pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, WorkerHan
1,
|input_receiver, output_sender| {
input_receiver
.into_iter()
.map(|path| CargoWorkspace::from_cargo_metadata(path.as_path()))
.for_each(|it| output_sender.send(it))
.try_for_each(|it| output_sender.send(it))
.unwrap()
},
)
}

View file

@ -118,7 +118,11 @@ impl Server {
}
fn send_request_(&self, r: RawRequest) -> Value {
let id = r.id;
self.worker.as_ref().unwrap().send(RawMessage::Request(r));
self.worker
.as_ref()
.unwrap()
.send(RawMessage::Request(r))
.unwrap();
while let Some(msg) = self.recv() {
match msg {
RawMessage::Request(req) => panic!("unexpected request: {:?}", req),
@ -167,7 +171,8 @@ impl Server {
self.worker
.as_ref()
.unwrap()
.send(RawMessage::Notification(not));
.send(RawMessage::Notification(not))
.unwrap();
}
}
@ -185,7 +190,7 @@ impl Drop for Server {
fn recv_timeout(receiver: &Receiver<RawMessage>) -> Option<RawMessage> {
let timeout = Duration::from_secs(5);
select! {
recv(receiver, msg) => msg,
recv(after(timeout)) => panic!("timed out"),
recv(receiver) -> msg => msg.ok(),
recv(after(timeout)) -> _ => panic!("timed out"),
}
}

View file

@ -8,7 +8,7 @@ authors = ["Aleksey Kladov <aleksey.kladov@gmail.com>"]
walkdir = "2.2.7"
relative-path = "0.4.0"
rustc-hash = "1.0"
crossbeam-channel = "0.2.4"
crossbeam-channel = "0.3.5"
log = "0.4.6"
thread_worker = { path = "../thread_worker" }

View file

@ -32,8 +32,10 @@ pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>;
pub(crate) fn start() -> (Worker, WorkerHandle) {
thread_worker::spawn("vfs", 128, |input_receiver, output_sender| {
input_receiver
.into_iter()
.map(handle_task)
.for_each(|it| output_sender.send(it))
.try_for_each(|it| output_sender.send(it))
.unwrap()
})
}

View file

@ -148,7 +148,7 @@ impl Vfs {
path: path.clone(),
filter: Box::new(filter),
};
res.worker.inp.send(task);
res.worker.inp.send(task).unwrap();
}
let roots = res.roots.iter().map(|(id, _)| id).collect();
(res, roots)

View file

@ -6,6 +6,6 @@ authors = ["Aleksey Kladov <aleksey.kladov@gmail.com>"]
[dependencies]
drop_bomb = "0.1.0"
crossbeam-channel = "0.2.4"
crossbeam-channel = "0.3.5"
log = "0.4.3"

View file

@ -2,7 +2,7 @@
use std::thread;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError};
use drop_bomb::DropBomb;
pub struct Worker<I, O> {
@ -34,10 +34,10 @@ impl<I, O> Worker<I, O> {
self.out
}
pub fn send(&self, item: I) {
pub fn send(&self, item: I) -> Result<(), SendError<I>> {
self.inp.send(item)
}
pub fn recv(&self) -> Option<O> {
pub fn recv(&self) -> Result<O, RecvError> {
self.out.recv()
}
}