Some abstraction around workers

This commit is contained in:
Aleksey Kladov 2018-09-08 13:15:01 +03:00
parent 326ffcefe0
commit 7daaddb2ac
5 changed files with 92 additions and 74 deletions

View file

@ -23,6 +23,7 @@ use {
server_world::{ServerWorldState, ServerWorld}, server_world::{ServerWorldState, ServerWorld},
main_loop::subscriptions::{Subscriptions}, main_loop::subscriptions::{Subscriptions},
project_model::{CargoWorkspace, workspace_loader}, project_model::{CargoWorkspace, workspace_loader},
thread_watcher::Worker,
}; };
#[derive(Debug)] #[derive(Debug)]
@ -43,8 +44,8 @@ pub fn main_loop(
.build() .build()
.unwrap(); .unwrap();
let (task_sender, task_receiver) = unbounded::<Task>(); let (task_sender, task_receiver) = unbounded::<Task>();
let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader(); let (fs_worker, fs_watcher) = vfs::roots_loader();
let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader(); let (ws_worker, ws_watcher) = workspace_loader();
info!("server initialized, serving requests"); info!("server initialized, serving requests");
let mut state = ServerWorldState::new(); let mut state = ServerWorldState::new();
@ -59,10 +60,8 @@ pub fn main_loop(
msg_receriver, msg_receriver,
task_sender, task_sender,
task_receiver.clone(), task_receiver.clone(),
fs_sender, fs_worker,
fs_receiver, ws_worker,
ws_sender,
ws_receiver,
&mut state, &mut state,
&mut pending_requests, &mut pending_requests,
&mut subs, &mut subs,
@ -93,17 +92,15 @@ fn main_loop_inner(
msg_receiver: &mut Receiver<RawMessage>, msg_receiver: &mut Receiver<RawMessage>,
task_sender: Sender<Task>, task_sender: Sender<Task>,
task_receiver: Receiver<Task>, task_receiver: Receiver<Task>,
fs_sender: Sender<PathBuf>, fs_worker: Worker<PathBuf, (PathBuf, Vec<FileEvent>)>,
fs_receiver: Receiver<(PathBuf, Vec<FileEvent>)>, ws_worker: Worker<PathBuf, Result<CargoWorkspace>>,
ws_sender: Sender<PathBuf>,
ws_receiver: Receiver<Result<CargoWorkspace>>,
state: &mut ServerWorldState, state: &mut ServerWorldState,
pending_requests: &mut HashMap<u64, JobHandle>, pending_requests: &mut HashMap<u64, JobHandle>,
subs: &mut Subscriptions, subs: &mut Subscriptions,
) -> Result<()> { ) -> Result<()> {
let (libdata_sender, libdata_receiver) = unbounded(); let (libdata_sender, libdata_receiver) = unbounded();
ws_sender.send(ws_root.clone()); ws_worker.send(ws_root.clone());
fs_sender.send(ws_root.clone()); fs_worker.send(ws_root.clone());
loop { loop {
#[derive(Debug)] #[derive(Debug)]
enum Event { enum Event {
@ -120,11 +117,11 @@ fn main_loop_inner(
None => bail!("client exited without shutdown"), None => bail!("client exited without shutdown"),
}, },
recv(task_receiver, task) => Event::Task(task.unwrap()), recv(task_receiver, task) => Event::Task(task.unwrap()),
recv(fs_receiver, events) => match events { recv(fs_worker.out, events) => match events {
None => bail!("roots watcher died"), None => bail!("roots watcher died"),
Some((pb, events)) => Event::Fs(pb, events), Some((pb, events)) => Event::Fs(pb, events),
} }
recv(ws_receiver, ws) => match ws { recv(ws_worker.out, ws) => match ws {
None => bail!("workspace watcher died"), None => bail!("workspace watcher died"),
Some(ws) => Event::Ws(ws), Some(ws) => Event::Ws(ws),
} }
@ -158,8 +155,7 @@ fn main_loop_inner(
for ws in workspaces.iter() { for ws in workspaces.iter() {
for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) {
debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); debug!("sending root, {}", pkg.root(ws).to_path_buf().display());
// deadlocky :-( fs_worker.send(pkg.root(ws).to_path_buf());
fs_sender.send(pkg.root(ws).to_path_buf());
} }
} }
state.set_workspaces(workspaces); state.set_workspaces(workspaces);

View file

@ -3,12 +3,11 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
use cargo_metadata::{metadata_run, CargoOpt}; use cargo_metadata::{metadata_run, CargoOpt};
use crossbeam_channel::{Sender, Receiver};
use libsyntax2::SmolStr; use libsyntax2::SmolStr;
use { use {
Result, Result,
thread_watcher::{ThreadWatcher, worker_chan}, thread_watcher::{Worker, ThreadWatcher},
}; };
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -162,14 +161,15 @@ impl TargetKind {
} }
} }
pub fn workspace_loader() -> ((Sender<PathBuf>, Receiver<Result<CargoWorkspace>>), ThreadWatcher) { pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, ThreadWatcher) {
let (interface, input_receiver, output_sender) = worker_chan::<PathBuf, Result<CargoWorkspace>>(1); Worker::<PathBuf, Result<CargoWorkspace>>::spawn(
let thread = ThreadWatcher::spawn("workspace loader", move || { "workspace loader",
1,
|input_receiver, output_sender| {
input_receiver input_receiver
.into_iter() .into_iter()
.map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path()))
.for_each(|it| output_sender.send(it)) .for_each(|it| output_sender.send(it))
}); }
)
(interface, thread)
} }

View file

@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
use drop_bomb::DropBomb; use drop_bomb::DropBomb;
use Result; use Result;
pub struct Worker<I, O> {
pub inp: Sender<I>,
pub out: Receiver<O>,
}
impl<I, O> Worker<I, O> {
pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher)
where
F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
I: Send + 'static,
O: Send + 'static,
{
let ((inp, out), inp_r, out_s) = worker_chan(buf);
let worker = Worker { inp, out };
let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s));
(worker, watcher)
}
pub fn stop(self) -> Receiver<O> {
self.out
}
pub fn send(&self, item: I) {
self.inp.send(item)
}
}
pub struct ThreadWatcher { pub struct ThreadWatcher {
name: &'static str, name: &'static str,
thread: thread::JoinHandle<()>, thread: thread::JoinHandle<()>,
@ -10,7 +37,7 @@ pub struct ThreadWatcher {
} }
impl ThreadWatcher { impl ThreadWatcher {
pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher {
let thread = thread::spawn(f); let thread = thread::spawn(f);
ThreadWatcher { ThreadWatcher {
name, name,
@ -36,7 +63,7 @@ impl ThreadWatcher {
/// Sets up worker channels in a deadlock-avoind way. /// Sets up worker channels in a deadlock-avoind way.
/// If one sets both input and output buffers to a fixed size, /// If one sets both input and output buffers to a fixed size,
/// a worker might get stuck. /// a worker might get stuck.
pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) {
let (input_sender, input_receiver) = bounded::<I>(buf); let (input_sender, input_receiver) = bounded::<I>(buf);
let (output_sender, output_receiver) = unbounded::<O>(); let (output_sender, output_receiver) = unbounded::<O>();
((input_sender, output_receiver), input_receiver, output_sender) ((input_sender, output_receiver), input_receiver, output_sender)

View file

@ -3,11 +3,10 @@ use std::{
fs, fs,
}; };
use crossbeam_channel::{Sender, Receiver};
use walkdir::WalkDir; use walkdir::WalkDir;
use { use {
thread_watcher::{ThreadWatcher, worker_chan}, thread_watcher::{Worker, ThreadWatcher},
}; };
@ -22,10 +21,10 @@ pub enum FileEventKind {
Add(String), Add(String),
} }
pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>), ThreadWatcher) { pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, ThreadWatcher) {
let (interface, input_receiver, output_sender) = Worker::<PathBuf, (PathBuf, Vec<FileEvent>)>::spawn(
worker_chan::<PathBuf, (PathBuf, Vec<FileEvent>)>(128); "roots loader",
let thread = ThreadWatcher::spawn("roots loader", move || { 128, |input_receiver, output_sender| {
input_receiver input_receiver
.into_iter() .into_iter()
.map(|path| { .map(|path| {
@ -35,9 +34,8 @@ pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>)
(path, events) (path, events)
}) })
.for_each(|it| output_sender.send(it)) .for_each(|it| output_sender.send(it))
}); }
)
(interface, thread)
} }
fn load_root(path: &Path) -> Vec<FileEvent> { fn load_root(path: &Path) -> Vec<FileEvent> {

View file

@ -1,6 +1,5 @@
use std::{ use std::{
fs, fs,
thread,
cell::{Cell, RefCell}, cell::{Cell, RefCell},
path::PathBuf, path::PathBuf,
time::Duration, time::Duration,
@ -8,7 +7,7 @@ use std::{
}; };
use tempdir::TempDir; use tempdir::TempDir;
use crossbeam_channel::{after, Sender, Receiver}; use crossbeam_channel::{after, Receiver};
use flexi_logger::Logger; use flexi_logger::Logger;
use languageserver_types::{ use languageserver_types::{
Url, Url,
@ -22,7 +21,7 @@ use serde::Serialize;
use serde_json::{Value, from_str, to_string_pretty}; use serde_json::{Value, from_str, to_string_pretty};
use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; use gen_lsp_server::{RawMessage, RawRequest, RawNotification};
use m::{Result, main_loop, req, thread_watcher::worker_chan}; use m::{main_loop, req, thread_watcher::{ThreadWatcher, Worker}};
pub fn project(fixture: &str) -> Server { pub fn project(fixture: &str) -> Server {
static INIT: Once = Once::new(); static INIT: Once = Once::new();
@ -61,28 +60,27 @@ pub struct Server {
req_id: Cell<u64>, req_id: Cell<u64>,
messages: RefCell<Vec<RawMessage>>, messages: RefCell<Vec<RawMessage>>,
dir: TempDir, dir: TempDir,
sender: Option<Sender<RawMessage>>, worker: Option<Worker<RawMessage, RawMessage>>,
receiver: Receiver<RawMessage>, watcher: Option<ThreadWatcher>,
server: Option<thread::JoinHandle<Result<()>>>,
} }
impl Server { impl Server {
fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
let path = dir.path().to_path_buf(); let path = dir.path().to_path_buf();
let ((msg_sender, msg_receiver), server) = { let (worker, watcher) = Worker::<RawMessage, RawMessage>::spawn(
let (api, mut msg_receiver, mut msg_sender) = worker_chan::<RawMessage, RawMessage>(128); "test server",
let server = thread::spawn(move || { 128,
move |mut msg_receiver, mut msg_sender| {
main_loop(true, path, &mut msg_receiver, &mut msg_sender) main_loop(true, path, &mut msg_receiver, &mut msg_sender)
}); .unwrap()
(api, server) }
}; );
let res = Server { let res = Server {
req_id: Cell::new(1), req_id: Cell::new(1),
dir, dir,
messages: Default::default(), messages: Default::default(),
sender: Some(msg_sender), worker: Some(worker),
receiver: msg_receiver, watcher: Some(watcher),
server: Some(server),
}; };
for (path, text) in files { for (path, text) in files {
@ -140,7 +138,7 @@ impl Server {
fn send_request_(&self, r: RawRequest) -> Value fn send_request_(&self, r: RawRequest) -> Value
{ {
let id = r.id; let id = r.id;
self.sender.as_ref() self.worker.as_ref()
.unwrap() .unwrap()
.send(RawMessage::Request(r)); .send(RawMessage::Request(r));
while let Some(msg) = self.recv() { while let Some(msg) = self.recv() {
@ -183,14 +181,14 @@ impl Server {
} }
} }
fn recv(&self) -> Option<RawMessage> { fn recv(&self) -> Option<RawMessage> {
recv_timeout(&self.receiver) recv_timeout(&self.worker.as_ref().unwrap().out)
.map(|msg| { .map(|msg| {
self.messages.borrow_mut().push(msg.clone()); self.messages.borrow_mut().push(msg.clone());
msg msg
}) })
} }
fn send_notification(&self, not: RawNotification) { fn send_notification(&self, not: RawNotification) {
self.sender.as_ref() self.worker.as_ref()
.unwrap() .unwrap()
.send(RawMessage::Notification(not)); .send(RawMessage::Notification(not));
} }
@ -198,16 +196,15 @@ impl Server {
impl Drop for Server { impl Drop for Server {
fn drop(&mut self) { fn drop(&mut self) {
{
self.send_request::<Shutdown>(666, ()); self.send_request::<Shutdown>(666, ());
drop(self.sender.take().unwrap()); let receiver = self.worker.take().unwrap().stop();
while let Some(msg) = recv_timeout(&self.receiver) { while let Some(msg) = recv_timeout(&receiver) {
drop(msg); drop(msg);
} }
} self.watcher.take()
self.server.take()
.unwrap() .unwrap()
.join().unwrap().unwrap(); .stop()
.unwrap();
} }
} }