move thread worker to a separate crate

This commit is contained in:
Aleksey Kladov 2018-12-18 12:45:20 +03:00
parent 4a1ab869b7
commit 193992fd14
9 changed files with 62 additions and 46 deletions

10
Cargo.lock generated
View file

@ -700,6 +700,7 @@ dependencies = [
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"test_utils 0.1.0", "test_utils 0.1.0",
"text_unit 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "text_unit 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"thread_worker 0.1.0",
"threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"url_serde 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "url_serde 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"walkdir 2.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "walkdir 2.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1116,6 +1117,15 @@ dependencies = [
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "thread_worker"
version = "0.1.0"
dependencies = [
"crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
"drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "threadpool" name = "threadpool"
version = "1.7.1" version = "1.7.1"

View file

@ -26,6 +26,7 @@ text_unit = { version = "0.1.2", features = ["serde"] }
smol_str = { version = "0.1.5", features = ["serde"] } smol_str = { version = "0.1.5", features = ["serde"] }
rustc-hash = "1.0" rustc-hash = "1.0"
thread_worker = { path = "../thread_worker" }
ra_syntax = { path = "../ra_syntax" } ra_syntax = { path = "../ra_syntax" }
ra_editor = { path = "../ra_editor" } ra_editor = { path = "../ra_editor" }
ra_text_edit = { path = "../ra_text_edit" } ra_text_edit = { path = "../ra_text_edit" }

View file

@ -5,7 +5,6 @@ mod path_map;
mod project_model; mod project_model;
pub mod req; pub mod req;
mod server_world; mod server_world;
pub mod thread_watcher;
mod vfs; mod vfs;
pub type Result<T> = ::std::result::Result<T, ::failure::Error>; pub type Result<T> = ::std::result::Result<T, ::failure::Error>;

View file

@ -10,6 +10,7 @@ use gen_lsp_server::{
use languageserver_types::NumberOrString; use languageserver_types::NumberOrString;
use ra_analysis::{Canceled, FileId, LibraryData}; use ra_analysis::{Canceled, FileId, LibraryData};
use rayon; use rayon;
use thread_worker::Worker;
use threadpool::ThreadPool; use threadpool::ThreadPool;
use rustc_hash::FxHashSet; use rustc_hash::FxHashSet;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
@ -21,7 +22,6 @@ use crate::{
project_model::{workspace_loader, CargoWorkspace}, project_model::{workspace_loader, CargoWorkspace},
req, req,
server_world::{ServerWorld, ServerWorldState}, server_world::{ServerWorld, ServerWorldState},
thread_watcher::Worker,
vfs::{self, FileEvent}, vfs::{self, FileEvent},
Result, Result,
}; };
@ -92,8 +92,8 @@ pub fn main_loop(
let ws_res = ws_watcher.stop(); let ws_res = ws_watcher.stop();
main_res?; main_res?;
fs_res?; fs_res.map_err(|_| format_err!("fs watcher died"))?;
ws_res?; ws_res.map_err(|_| format_err!("ws watcher died"))?;
Ok(()) Ok(())
} }

View file

@ -4,11 +4,9 @@ use cargo_metadata::{metadata_run, CargoOpt};
use ra_syntax::SmolStr; use ra_syntax::SmolStr;
use rustc_hash::{FxHashMap, FxHashSet}; use rustc_hash::{FxHashMap, FxHashSet};
use failure::{format_err, bail}; use failure::{format_err, bail};
use thread_worker::{WorkerHandle, Worker};
use crate::{ use crate::Result;
Result,
thread_watcher::{ThreadWatcher, Worker},
};
/// `CargoWorksapce` represents the logical structure of, well, a Cargo /// `CargoWorksapce` represents the logical structure of, well, a Cargo
/// workspace. It pretty closely mirrors `cargo metadata` output. /// workspace. It pretty closely mirrors `cargo metadata` output.
@ -199,8 +197,8 @@ impl TargetKind {
} }
} }
pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, ThreadWatcher) { pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, WorkerHandle) {
Worker::<PathBuf, Result<CargoWorkspace>>::spawn( thread_worker::spawn::<PathBuf, Result<CargoWorkspace>, _>(
"workspace loader", "workspace loader",
1, 1,
|input_receiver, output_sender| { |input_receiver, output_sender| {

View file

@ -4,8 +4,7 @@ use std::{
}; };
use walkdir::WalkDir; use walkdir::WalkDir;
use thread_worker::{WorkerHandle, Worker};
use crate::thread_watcher::{ThreadWatcher, Worker};
#[derive(Debug)] #[derive(Debug)]
pub struct FileEvent { pub struct FileEvent {
@ -18,8 +17,8 @@ pub enum FileEventKind {
Add(String), Add(String),
} }
pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, ThreadWatcher) { pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, WorkerHandle) {
Worker::<PathBuf, (PathBuf, Vec<FileEvent>)>::spawn( thread_worker::spawn::<PathBuf, (PathBuf, Vec<FileEvent>), _>(
"roots loader", "roots loader",
128, 128,
|input_receiver, output_sender| { |input_receiver, output_sender| {

View file

@ -17,11 +17,11 @@ use languageserver_types::{
use serde::Serialize; use serde::Serialize;
use serde_json::{to_string_pretty, Value}; use serde_json::{to_string_pretty, Value};
use tempdir::TempDir; use tempdir::TempDir;
use thread_worker::{WorkerHandle, Worker};
use test_utils::{parse_fixture, find_mismatch}; use test_utils::{parse_fixture, find_mismatch};
use ra_lsp_server::{ use ra_lsp_server::{
main_loop, req, main_loop, req,
thread_watcher::{ThreadWatcher, Worker},
}; };
pub fn project(fixture: &str) -> Server { pub fn project(fixture: &str) -> Server {
@ -45,13 +45,13 @@ pub struct Server {
messages: RefCell<Vec<RawMessage>>, messages: RefCell<Vec<RawMessage>>,
dir: TempDir, dir: TempDir,
worker: Option<Worker<RawMessage, RawMessage>>, worker: Option<Worker<RawMessage, RawMessage>>,
watcher: Option<ThreadWatcher>, watcher: Option<WorkerHandle>,
} }
impl Server { impl Server {
fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
let path = dir.path().to_path_buf(); let path = dir.path().to_path_buf();
let (worker, watcher) = Worker::<RawMessage, RawMessage>::spawn( let (worker, watcher) = thread_worker::spawn::<RawMessage, RawMessage, _>(
"test server", "test server",
128, 128,
move |mut msg_receiver, mut msg_sender| { move |mut msg_receiver, mut msg_sender| {

View file

@ -0,0 +1,11 @@
[package]
edition = "2018"
name = "thread_worker"
version = "0.1.0"
authors = ["Aleksey Kladov <aleksey.kladov@gmail.com>"]
[dependencies]
drop_bomb = "0.1.0"
crossbeam-channel = "0.2.4"
log = "0.4.3"

View file

@ -1,28 +1,35 @@
//! Small utility to correctly spawn crossbeam-channel based worker threads.
use std::thread; use std::thread;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use drop_bomb::DropBomb; use drop_bomb::DropBomb;
use failure::format_err;
use crate::Result;
pub struct Worker<I, O> { pub struct Worker<I, O> {
pub inp: Sender<I>, pub inp: Sender<I>,
pub out: Receiver<O>, pub out: Receiver<O>,
} }
impl<I, O> Worker<I, O> { pub struct WorkerHandle {
pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) name: &'static str,
where thread: thread::JoinHandle<()>,
F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, bomb: DropBomb,
I: Send + 'static, }
O: Send + 'static,
{
let (worker, inp_r, out_s) = worker_chan(buf);
let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s));
(worker, watcher)
}
pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle)
where
F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
I: Send + 'static,
O: Send + 'static,
{
let (worker, inp_r, out_s) = worker_chan(buf);
let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s));
(worker, watcher)
}
impl<I, O> Worker<I, O> {
/// Stops the worker. Returns the message receiver to fetch results which
/// have become ready before the worker is stopped.
pub fn stop(self) -> Receiver<O> { pub fn stop(self) -> Receiver<O> {
self.out self.out
} }
@ -32,30 +39,21 @@ impl<I, O> Worker<I, O> {
} }
} }
pub struct ThreadWatcher { impl WorkerHandle {
name: &'static str, fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle {
thread: thread::JoinHandle<()>,
bomb: DropBomb,
}
impl ThreadWatcher {
fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher {
let thread = thread::spawn(f); let thread = thread::spawn(f);
ThreadWatcher { WorkerHandle {
name, name,
thread, thread,
bomb: DropBomb::new(format!("ThreadWatcher {} was not stopped", name)), bomb: DropBomb::new(format!("WorkerHandle {} was not stopped", name)),
} }
} }
pub fn stop(mut self) -> Result<()> { pub fn stop(mut self) -> thread::Result<()> {
log::info!("waiting for {} to finish ...", self.name); log::info!("waiting for {} to finish ...", self.name);
let name = self.name; let name = self.name;
self.bomb.defuse(); self.bomb.defuse();
let res = self let res = self.thread.join();
.thread
.join()
.map_err(|_| format_err!("ThreadWatcher {} died", name));
match &res { match &res {
Ok(()) => log::info!("... {} terminated with ok", name), Ok(()) => log::info!("... {} terminated with ok", name),
Err(_) => log::error!("... {} terminated with err", name), Err(_) => log::error!("... {} terminated with err", name),