377: update crossbeam r=matklad a=matklad



Co-authored-by: Aleksey Kladov <aleksey.kladov@gmail.com>
This commit is contained in:
bors[bot] 2018-12-30 20:24:48 +00:00
commit 45e3de8eed
13 changed files with 70 additions and 74 deletions

39
Cargo.lock generated
View file

@ -166,13 +166,12 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.2.6" version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-utils 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
"smallvec 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -199,19 +198,6 @@ dependencies = [
"scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "crossbeam-epoch"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.2.2" version = "0.2.2"
@ -220,11 +206,6 @@ dependencies = [
"cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "cfg-if 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "crossbeam-utils"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.6.3" version = "0.6.3"
@ -355,7 +336,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "gen_lsp_server" name = "gen_lsp_server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"languageserver-types 0.53.1 (registry+https://github.com/rust-lang/crates.io-index)", "languageserver-types 0.53.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -764,7 +745,7 @@ name = "ra_lsp_server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"cargo_metadata 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "cargo_metadata 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -824,7 +805,7 @@ dependencies = [
name = "ra_vfs" name = "ra_vfs"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"relative-path 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "relative-path 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hash 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hash 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1308,7 +1289,7 @@ dependencies = [
name = "thread_worker" name = "thread_worker"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
@ -1543,12 +1524,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "45912881121cb26fad7c38c17ba7daa18764771836b34fab7d3fbd93ed633878" "checksum chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "45912881121cb26fad7c38c17ba7daa18764771836b34fab7d3fbd93ed633878"
"checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e" "checksum clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b957d88f4b6a63b9d70d5f454ac8011819c6efa7727858f458ab71c756ce2d3e"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7b85741761b7f160bc5e7e0c14986ef685b7f8bf9b7ad081c60c604bb4649827" "checksum crossbeam-channel 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "137bc235f622ffaa0428e3854e24acb53291fc0b3ff6fb2cb75a8be6fb02f06b"
"checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3" "checksum crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f739f8c5363aca78cfb059edf753d8f0d36908c348f3d8d1503f03d8b75d9cf3"
"checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150" "checksum crossbeam-epoch 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "927121f5407de9956180ff5e936fe3cf4324279280001cd56b669d28ee7e9150"
"checksum crossbeam-epoch 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2449aaa4ec7ef96e5fb24db16024b935df718e9ae1cec0a1e68feeca2efca7b8"
"checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9" "checksum crossbeam-utils 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2760899e32a1d58d5abb31129f8fae5de75220bc2176e77ff7c627ae45c918d9"
"checksum crossbeam-utils 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "677d453a17e8bd2b913fa38e8b9cf04bcdbb5be790aa294f2389661d72036015"
"checksum crossbeam-utils 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "41ee4864f4797060e52044376f7d107429ce1fb43460021b126424b7180ee21a" "checksum crossbeam-utils 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "41ee4864f4797060e52044376f7d107429ce1fb43460021b126424b7180ee21a"
"checksum derive-new 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "6ca414e896ae072546f4d789f452daaecf60ddee4c9df5dc6d5936d769e3d87c" "checksum derive-new 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "6ca414e896ae072546f4d789f452daaecf60ddee4c9df5dc6d5936d769e3d87c"
"checksum deunicode 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "850878694b7933ca4c9569d30a34b55031b9b139ee1fc7b94a527c4ef960d690" "checksum deunicode 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "850878694b7933ca4c9569d30a34b55031b9b139ee1fc7b94a527c4ef960d690"

View file

@ -13,4 +13,4 @@ log = "0.4.3"
failure = "0.1.2" failure = "0.1.2"
serde_json = "1.0.24" serde_json = "1.0.24"
serde = { version = "1.0.83", features = ["derive"] } serde = { version = "1.0.83", features = ["derive"] }
crossbeam-channel = "0.2.4" crossbeam-channel = "0.3.5"

View file

@ -95,7 +95,7 @@ pub fn run_server(
server(params, &receiver, &sender)?; server(params, &receiver, &sender)?;
log::info!("lsp server waiting for exit notification"); log::info!("lsp server waiting for exit notification");
match receiver.recv() { match receiver.recv() {
Some(RawMessage::Notification(n)) => n Ok(RawMessage::Notification(n)) => n
.cast::<Exit>() .cast::<Exit>()
.map_err(|n| format_err!("unexpected notification during shutdown: {:?}", n))?, .map_err(|n| format_err!("unexpected notification during shutdown: {:?}", n))?,
m => bail!("unexpected message during shutdown: {:?}", m), m => bail!("unexpected message during shutdown: {:?}", m),
@ -109,7 +109,7 @@ pub fn handle_shutdown(req: RawRequest, sender: &Sender<RawMessage>) -> Option<R
match req.cast::<Shutdown>() { match req.cast::<Shutdown>() {
Ok((id, ())) => { Ok((id, ())) => {
let resp = RawResponse::ok::<Shutdown>(id, &()); let resp = RawResponse::ok::<Shutdown>(id, &());
sender.send(RawMessage::Response(resp)); let _ = sender.send(RawMessage::Response(resp));
None None
} }
Err(req) => Some(req), Err(req) => Some(req),
@ -122,16 +122,16 @@ fn initialize(
caps: ServerCapabilities, caps: ServerCapabilities,
) -> Result<InitializeParams> { ) -> Result<InitializeParams> {
let (id, params) = match receiver.recv() { let (id, params) = match receiver.recv() {
Some(RawMessage::Request(req)) => match req.cast::<Initialize>() { Ok(RawMessage::Request(req)) => match req.cast::<Initialize>() {
Err(req) => bail!("expected initialize request, got {:?}", req), Err(req) => bail!("expected initialize request, got {:?}", req),
Ok(req) => req, Ok(req) => req,
}, },
msg => bail!("expected initialize request, got {:?}", msg), msg => bail!("expected initialize request, got {:?}", msg),
}; };
let resp = RawResponse::ok::<Initialize>(id, &InitializeResult { capabilities: caps }); let resp = RawResponse::ok::<Initialize>(id, &InitializeResult { capabilities: caps });
sender.send(RawMessage::Response(resp)); sender.send(RawMessage::Response(resp)).unwrap();
match receiver.recv() { match receiver.recv() {
Some(RawMessage::Notification(n)) => { Ok(RawMessage::Notification(n)) => {
n.cast::<Initialized>() n.cast::<Initialized>()
.map_err(|_| format_err!("expected initialized notification"))?; .map_err(|_| format_err!("expected initialized notification"))?;
} }

View file

@ -9,11 +9,13 @@ use failure::bail;
use crate::{RawMessage, Result}; use crate::{RawMessage, Result};
pub fn stdio_transport() -> (Receiver<RawMessage>, Sender<RawMessage>, Threads) { pub fn stdio_transport() -> (Receiver<RawMessage>, Sender<RawMessage>, Threads) {
let (writer_sender, mut writer_receiver) = bounded::<RawMessage>(16); let (writer_sender, writer_receiver) = bounded::<RawMessage>(16);
let writer = thread::spawn(move || { let writer = thread::spawn(move || {
let stdout = stdout(); let stdout = stdout();
let mut stdout = stdout.lock(); let mut stdout = stdout.lock();
writer_receiver.try_for_each(|it| it.write(&mut stdout))?; writer_receiver
.into_iter()
.try_for_each(|it| it.write(&mut stdout))?;
Ok(()) Ok(())
}); });
let (reader_sender, reader_receiver) = bounded::<RawMessage>(16); let (reader_sender, reader_receiver) = bounded::<RawMessage>(16);
@ -21,7 +23,9 @@ pub fn stdio_transport() -> (Receiver<RawMessage>, Sender<RawMessage>, Threads)
let stdin = stdin(); let stdin = stdin();
let mut stdin = stdin.lock(); let mut stdin = stdin.lock();
while let Some(msg) = RawMessage::read(&mut stdin)? { while let Some(msg) = RawMessage::read(&mut stdin)? {
reader_sender.send(msg); if let Err(_) = reader_sender.send(msg) {
break;
}
} }
Ok(()) Ok(())
}); });

View file

@ -13,7 +13,7 @@ failure_derive = "0.1.2"
serde_json = "1.0.24" serde_json = "1.0.24"
serde = "1.0.83" serde = "1.0.83"
drop_bomb = "0.1.0" drop_bomb = "0.1.0"
crossbeam-channel = "0.2.4" crossbeam-channel = "0.3.5"
flexi_logger = "0.10.0" flexi_logger = "0.10.0"
log = "0.4.3" log = "0.4.3"
url_serde = "0.2.0" url_serde = "0.2.0"

View file

@ -7,7 +7,7 @@ use std::{
sync::Arc, sync::Arc,
}; };
use crossbeam_channel::{unbounded, select, Receiver, Sender}; use crossbeam_channel::{unbounded, select, Receiver, Sender, RecvError};
use gen_lsp_server::{ use gen_lsp_server::{
handle_shutdown, ErrorCode, RawMessage, RawNotification, RawRequest, RawResponse, handle_shutdown, ErrorCode, RawMessage, RawNotification, RawRequest, RawResponse,
}; };
@ -62,7 +62,7 @@ pub fn main_loop(
let (task_sender, task_receiver) = unbounded::<Task>(); let (task_sender, task_receiver) = unbounded::<Task>();
let (ws_worker, ws_watcher) = workspace_loader(); let (ws_worker, ws_watcher) = workspace_loader();
ws_worker.send(ws_root.clone()); ws_worker.send(ws_root.clone()).unwrap();
// FIXME: support dynamic workspace loading. // FIXME: support dynamic workspace loading.
let workspaces = match ws_worker.recv().unwrap() { let workspaces = match ws_worker.recv().unwrap() {
Ok(ws) => vec![ws], Ok(ws) => vec![ws],
@ -95,7 +95,9 @@ pub fn main_loop(
); );
log::info!("waiting for tasks to finish..."); log::info!("waiting for tasks to finish...");
task_receiver.for_each(|task| on_task(task, msg_sender, &mut pending_requests)); task_receiver
.into_iter()
.for_each(|task| on_task(task, msg_sender, &mut pending_requests));
log::info!("...tasks have finished"); log::info!("...tasks have finished");
log::info!("joining threadpool..."); log::info!("joining threadpool...");
drop(pool); drop(pool);
@ -170,16 +172,16 @@ fn main_loop_inner(
loop { loop {
log::trace!("selecting"); log::trace!("selecting");
let event = select! { let event = select! {
recv(msg_receiver, msg) => match msg { recv(msg_receiver) -> msg => match msg {
Some(msg) => Event::Msg(msg), Ok(msg) => Event::Msg(msg),
None => bail!("client exited without shutdown"), Err(RecvError) => bail!("client exited without shutdown"),
}, },
recv(task_receiver, task) => Event::Task(task.unwrap()), recv(task_receiver) -> task => Event::Task(task.unwrap()),
recv(state.vfs.read().task_receiver(), task) => match task { recv(state.vfs.read().task_receiver()) -> task => match task {
None => bail!("vfs died"), Ok(task) => Event::Vfs(task),
Some(task) => Event::Vfs(task), Err(RecvError) => bail!("vfs died"),
} },
recv(libdata_receiver, data) => Event::Lib(data.unwrap()) recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
}; };
log::info!("loop_turn = {:?}", event); log::info!("loop_turn = {:?}", event);
let start = std::time::Instant::now(); let start = std::time::Instant::now();
@ -209,7 +211,7 @@ fn main_loop_inner(
ErrorCode::MethodNotFound as i32, ErrorCode::MethodNotFound as i32,
"unknown request".to_string(), "unknown request".to_string(),
); );
msg_sender.send(RawMessage::Response(resp)) msg_sender.send(RawMessage::Response(resp)).unwrap()
} }
} }
} }
@ -229,7 +231,7 @@ fn main_loop_inner(
log::info!("indexing {:?} ... ", root); log::info!("indexing {:?} ... ", root);
let data = LibraryData::prepare(root, files); let data = LibraryData::prepare(root, files);
log::info!("indexed {:?} {:?}", start.elapsed(), root); log::info!("indexed {:?} {:?}", start.elapsed(), root);
sender.send(data); sender.send(data).unwrap();
}); });
} }
if state.roots_to_scan == 0 { if state.roots_to_scan == 0 {
@ -253,10 +255,12 @@ fn on_task(task: Task, msg_sender: &Sender<RawMessage>, pending_requests: &mut F
match task { match task {
Task::Respond(response) => { Task::Respond(response) => {
if pending_requests.remove(&response.id) { if pending_requests.remove(&response.id) {
msg_sender.send(RawMessage::Response(response)) msg_sender.send(RawMessage::Response(response)).unwrap();
} }
} }
Task::Notify(n) => msg_sender.send(RawMessage::Notification(n)), Task::Notify(n) => {
msg_sender.send(RawMessage::Notification(n)).unwrap();
}
} }
} }
@ -328,7 +332,7 @@ fn on_notification(
ErrorCode::RequestCancelled as i32, ErrorCode::RequestCancelled as i32,
"canceled by client".to_string(), "canceled by client".to_string(),
); );
msg_sender.send(RawMessage::Response(response)) msg_sender.send(RawMessage::Response(response)).unwrap()
} }
return Ok(()); return Ok(());
} }
@ -381,7 +385,7 @@ fn on_notification(
diagnostics: Vec::new(), diagnostics: Vec::new(),
}; };
let not = RawNotification::new::<req::PublishDiagnostics>(&params); let not = RawNotification::new::<req::PublishDiagnostics>(&params);
msg_sender.send(RawMessage::Notification(not)); msg_sender.send(RawMessage::Notification(not)).unwrap();
return Ok(()); return Ok(());
} }
Err(not) => not, Err(not) => not,
@ -441,7 +445,7 @@ impl<'a> PoolDispatcher<'a> {
}, },
}; };
let task = Task::Respond(resp); let task = Task::Respond(resp);
sender.send(task); sender.send(task).unwrap();
}); });
self.res = Some(id); self.res = Some(id);
} }
@ -476,7 +480,7 @@ fn update_file_notifications_on_threadpool(
} }
Ok(params) => { Ok(params) => {
let not = RawNotification::new::<req::PublishDiagnostics>(&params); let not = RawNotification::new::<req::PublishDiagnostics>(&params);
sender.send(Task::Notify(not)); sender.send(Task::Notify(not)).unwrap();
} }
} }
if publish_decorations { if publish_decorations {
@ -488,7 +492,7 @@ fn update_file_notifications_on_threadpool(
} }
Ok(params) => { Ok(params) => {
let not = RawNotification::new::<req::PublishDecorations>(&params); let not = RawNotification::new::<req::PublishDecorations>(&params);
sender.send(Task::Notify(not)) sender.send(Task::Notify(not)).unwrap();
} }
} }
} }
@ -501,7 +505,7 @@ fn feedback(intrnal_mode: bool, msg: &str, sender: &Sender<RawMessage>) {
return; return;
} }
let not = RawNotification::new::<req::InternalFeedback>(&msg.to_string()); let not = RawNotification::new::<req::InternalFeedback>(&msg.to_string());
sender.send(RawMessage::Notification(not)); sender.send(RawMessage::Notification(not)).unwrap();
} }
fn is_canceled(e: &failure::Error) -> bool { fn is_canceled(e: &failure::Error) -> bool {

View file

@ -204,8 +204,10 @@ pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, WorkerHan
1, 1,
|input_receiver, output_sender| { |input_receiver, output_sender| {
input_receiver input_receiver
.into_iter()
.map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path()))
.for_each(|it| output_sender.send(it)) .try_for_each(|it| output_sender.send(it))
.unwrap()
}, },
) )
} }

View file

@ -118,7 +118,11 @@ impl Server {
} }
fn send_request_(&self, r: RawRequest) -> Value { fn send_request_(&self, r: RawRequest) -> Value {
let id = r.id; let id = r.id;
self.worker.as_ref().unwrap().send(RawMessage::Request(r)); self.worker
.as_ref()
.unwrap()
.send(RawMessage::Request(r))
.unwrap();
while let Some(msg) = self.recv() { while let Some(msg) = self.recv() {
match msg { match msg {
RawMessage::Request(req) => panic!("unexpected request: {:?}", req), RawMessage::Request(req) => panic!("unexpected request: {:?}", req),
@ -167,7 +171,8 @@ impl Server {
self.worker self.worker
.as_ref() .as_ref()
.unwrap() .unwrap()
.send(RawMessage::Notification(not)); .send(RawMessage::Notification(not))
.unwrap();
} }
} }
@ -185,7 +190,7 @@ impl Drop for Server {
fn recv_timeout(receiver: &Receiver<RawMessage>) -> Option<RawMessage> { fn recv_timeout(receiver: &Receiver<RawMessage>) -> Option<RawMessage> {
let timeout = Duration::from_secs(5); let timeout = Duration::from_secs(5);
select! { select! {
recv(receiver, msg) => msg, recv(receiver) -> msg => msg.ok(),
recv(after(timeout)) => panic!("timed out"), recv(after(timeout)) -> _ => panic!("timed out"),
} }
} }

View file

@ -8,7 +8,7 @@ authors = ["Aleksey Kladov <aleksey.kladov@gmail.com>"]
walkdir = "2.2.7" walkdir = "2.2.7"
relative-path = "0.4.0" relative-path = "0.4.0"
rustc-hash = "1.0" rustc-hash = "1.0"
crossbeam-channel = "0.2.4" crossbeam-channel = "0.3.5"
log = "0.4.6" log = "0.4.6"
thread_worker = { path = "../thread_worker" } thread_worker = { path = "../thread_worker" }

View file

@ -32,8 +32,10 @@ pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>;
pub(crate) fn start() -> (Worker, WorkerHandle) { pub(crate) fn start() -> (Worker, WorkerHandle) {
thread_worker::spawn("vfs", 128, |input_receiver, output_sender| { thread_worker::spawn("vfs", 128, |input_receiver, output_sender| {
input_receiver input_receiver
.into_iter()
.map(handle_task) .map(handle_task)
.for_each(|it| output_sender.send(it)) .try_for_each(|it| output_sender.send(it))
.unwrap()
}) })
} }

View file

@ -148,7 +148,7 @@ impl Vfs {
path: path.clone(), path: path.clone(),
filter: Box::new(filter), filter: Box::new(filter),
}; };
res.worker.inp.send(task); res.worker.inp.send(task).unwrap();
} }
let roots = res.roots.iter().map(|(id, _)| id).collect(); let roots = res.roots.iter().map(|(id, _)| id).collect();
(res, roots) (res, roots)

View file

@ -6,6 +6,6 @@ authors = ["Aleksey Kladov <aleksey.kladov@gmail.com>"]
[dependencies] [dependencies]
drop_bomb = "0.1.0" drop_bomb = "0.1.0"
crossbeam-channel = "0.2.4" crossbeam-channel = "0.3.5"
log = "0.4.3" log = "0.4.3"

View file

@ -2,7 +2,7 @@
use std::thread; use std::thread;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError};
use drop_bomb::DropBomb; use drop_bomb::DropBomb;
pub struct Worker<I, O> { pub struct Worker<I, O> {
@ -34,10 +34,10 @@ impl<I, O> Worker<I, O> {
self.out self.out
} }
pub fn send(&self, item: I) { pub fn send(&self, item: I) -> Result<(), SendError<I>> {
self.inp.send(item) self.inp.send(item)
} }
pub fn recv(&self) -> Option<O> { pub fn recv(&self) -> Result<O, RecvError> {
self.out.recv() self.out.recv()
} }
} }