move to gen-server impl

This commit is contained in:
Aleksey Kladov 2018-09-01 17:40:45 +03:00
parent 3588d6b2da
commit 8f1ce82753
7 changed files with 181 additions and 575 deletions

View file

@ -21,7 +21,7 @@ use languageserver_types::{
pub type Result<T> = ::std::result::Result<T, failure::Error>; pub type Result<T> = ::std::result::Result<T, failure::Error>;
pub use { pub use {
msg::{RawMessage, RawRequest, RawResponse, RawResponseError, RawNotification}, msg::{RawMessage, RawRequest, RawResponse, RawResponseError, RawNotification, ErrorCode},
stdio::{stdio_transport, Threads}, stdio::{stdio_transport, Threads},
}; };

View file

@ -120,6 +120,16 @@ impl RawResponse {
} }
impl RawNotification { impl RawNotification {
pub fn new<N>(params: N::Params) -> RawNotification
where
N: Notification,
N::Params: Serialize,
{
RawNotification {
method: N::METHOD.to_string(),
params: to_value(&params).unwrap(),
}
}
pub fn cast<N>(self) -> ::std::result::Result<N::Params, RawNotification> pub fn cast<N>(self) -> ::std::result::Result<N::Params, RawNotification>
where where
N: Notification, N: Notification,

View file

@ -23,3 +23,4 @@ text_unit = { version = "0.1.2", features = ["serde"] }
libsyntax2 = { path = "../libsyntax2" } libsyntax2 = { path = "../libsyntax2" }
libeditor = { path = "../libeditor" } libeditor = { path = "../libeditor" }
libanalysis = { path = "../libanalysis" } libanalysis = { path = "../libanalysis" }
gen_lsp_server = { path = "../gen_lsp_server" }

View file

@ -1,151 +0,0 @@
use std::marker::PhantomData;
use serde::{
ser::Serialize,
de::DeserializeOwned,
};
use serde_json;
use drop_bomb::DropBomb;
use ::{
Result,
req::{ClientRequest, Notification},
io::{RawResponse, RawRequest, RawNotification},
};
pub struct Responder<R: ClientRequest> {
id: u64,
bomb: DropBomb,
ph: PhantomData<fn(R)>,
}
impl<R: ClientRequest> Responder<R> {
pub fn into_response(mut self, result: Result<R::Result>) -> Result<RawResponse> {
self.bomb.defuse();
let res = match result {
Ok(result) => {
RawResponse {
id: self.id,
result: serde_json::to_value(result)?,
error: serde_json::Value::Null,
}
}
Err(e) => {
error_response(
self.id,
ErrorCode::InternalError,
format!("internal error: {}", e),
)?
}
};
Ok(res)
}
}
fn parse_request_as<R: ClientRequest>(raw: RawRequest)
-> Result<::std::result::Result<(R::Params, Responder<R>), RawRequest>>
{
if raw.method != R::METHOD {
return Ok(Err(raw));
}
let params: R::Params = serde_json::from_value(raw.params)?;
let responder = Responder {
id: raw.id,
bomb: DropBomb::new("dropped request"),
ph: PhantomData,
};
Ok(Ok((params, responder)))
}
pub fn handle_request<R, F>(req: RawRequest, f: F) -> Result<::std::result::Result<u64, RawRequest>>
where
R: ClientRequest,
F: FnOnce(R::Params, Responder<R>) -> Result<()>
{
let id = req.id;
match parse_request_as::<R>(req)? {
Ok((params, responder)) => {
let () = f(params, responder)?;
Ok(Ok(id))
},
Err(r) => Ok(Err(r)),
}
}
fn parse_notification_as<N>(raw: RawNotification) -> Result<::std::result::Result<N::Params, RawNotification>>
where
N: Notification,
N::Params: DeserializeOwned,
{
if raw.method != N::METHOD {
return Ok(Err(raw));
}
let params: N::Params = serde_json::from_value(raw.params)?;
Ok(Ok(params))
}
pub fn handle_notification<N, F>(not: &mut Option<RawNotification>, f: F) -> Result<()>
where
N: Notification,
N::Params: DeserializeOwned,
F: FnOnce(N::Params) -> Result<()>
{
match not.take() {
None => Ok(()),
Some(n) => match parse_notification_as::<N>(n)? {
Ok(params) => f(params),
Err(n) => {
*not = Some(n);
Ok(())
}
}
}
}
pub fn send_notification<N>(params: N::Params) -> RawNotification
where
N: Notification,
N::Params: Serialize
{
RawNotification {
method: N::METHOD.to_string(),
params: serde_json::to_value(params)
.unwrap(),
}
}
pub fn unknown_method(id: u64) -> Result<RawResponse> {
error_response(id, ErrorCode::MethodNotFound, "unknown method")
}
fn error_response(id: u64, code: ErrorCode, message: impl Into<String>) -> Result<RawResponse> {
#[derive(Serialize)]
struct Error {
code: i32,
message: String,
}
let resp = RawResponse {
id,
result: serde_json::Value::Null,
error: serde_json::to_value(Error {
code: code as i32,
message: message.into(),
})?,
};
Ok(resp)
}
#[allow(unused)]
enum ErrorCode {
ParseError = -32700,
InvalidRequest = -32600,
MethodNotFound = -32601,
InvalidParams = -32602,
InternalError = -32603,
ServerErrorStart = -32099,
ServerErrorEnd = -32000,
ServerNotInitialized = -32002,
UnknownErrorCode = -32001,
RequestCancelled = -32800,
}

View file

@ -1,207 +0,0 @@
use std::{
thread,
io::{
stdout, stdin,
BufRead, Write,
},
};
use serde_json::{Value, from_str, to_string};
use crossbeam_channel::{Receiver, Sender, bounded};
use Result;
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum RawMsg {
Request(RawRequest),
Notification(RawNotification),
Response(RawResponse),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RawRequest {
pub id: u64,
pub method: String,
pub params: Value,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RawNotification {
pub method: String,
pub params: Value,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RawResponse {
// JSON RPC allows this to be null if it was impossible
// to decode the request's id. Ignore this special case
// and just die horribly.
pub id: u64,
#[serde(default)]
pub result: Value,
#[serde(default)]
pub error: Value,
}
struct MsgReceiver {
chan: Receiver<RawMsg>,
thread: Option<thread::JoinHandle<Result<()>>>,
}
impl MsgReceiver {
fn recv(&mut self) -> Result<RawMsg> {
match self.chan.recv() {
Some(msg) => Ok(msg),
None => {
self.cleanup()?;
unreachable!()
}
}
}
fn cleanup(&mut self) -> Result<()> {
self.thread
.take()
.ok_or_else(|| format_err!("MsgReceiver thread panicked"))?
.join()
.map_err(|_| format_err!("MsgReceiver thread panicked"))??;
bail!("client disconnected")
}
fn stop(self) -> Result<()> {
// Can't really self.thread.join() here, b/c it might be
// blocking on read
Ok(())
}
}
struct MsgSender {
chan: Sender<RawMsg>,
thread: thread::JoinHandle<Result<()>>,
}
impl MsgSender {
fn send(&mut self, msg: RawMsg) {
self.chan.send(msg)
}
fn stop(self) -> Result<()> {
drop(self.chan);
self.thread.join()
.map_err(|_| format_err!("MsgSender thread panicked"))??;
Ok(())
}
}
pub struct Io {
receiver: MsgReceiver,
sender: MsgSender,
}
impl Io {
pub fn from_stdio() -> Io {
let sender = {
let (tx, rx) = bounded(16);
MsgSender {
chan: tx,
thread: thread::spawn(move || {
let stdout = stdout();
let mut stdout = stdout.lock();
for msg in rx {
#[derive(Serialize)]
struct JsonRpc {
jsonrpc: &'static str,
#[serde(flatten)]
msg: RawMsg,
}
let text = to_string(&JsonRpc {
jsonrpc: "2.0",
msg,
})?;
write_msg_text(&mut stdout, &text)?;
}
Ok(())
}),
}
};
let receiver = {
let (tx, rx) = bounded(16);
MsgReceiver {
chan: rx,
thread: Some(thread::spawn(move || {
let stdin = stdin();
let mut stdin = stdin.lock();
while let Some(text) = read_msg_text(&mut stdin)? {
let msg: RawMsg = from_str(&text)?;
tx.send(msg);
}
Ok(())
})),
}
};
Io { receiver, sender }
}
pub fn send(&mut self, msg: RawMsg) {
self.sender.send(msg)
}
pub fn recv(&mut self) -> Result<RawMsg> {
self.receiver.recv()
}
pub fn receiver(&mut self) -> &mut Receiver<RawMsg> {
&mut self.receiver.chan
}
pub fn cleanup_receiver(&mut self) -> Result<()> {
self.receiver.cleanup()
}
pub fn stop(self) -> Result<()> {
self.receiver.stop()?;
self.sender.stop()?;
Ok(())
}
}
fn read_msg_text(inp: &mut impl BufRead) -> Result<Option<String>> {
let mut size = None;
let mut buf = String::new();
loop {
buf.clear();
if inp.read_line(&mut buf)? == 0 {
return Ok(None);
}
if !buf.ends_with("\r\n") {
bail!("malformed header: {:?}", buf);
}
let buf = &buf[..buf.len() - 2];
if buf.is_empty() {
break;
}
let mut parts = buf.splitn(2, ": ");
let header_name = parts.next().unwrap();
let header_value = parts.next().ok_or_else(|| format_err!("malformed header: {:?}", buf))?;
if header_name == "Content-Length" {
size = Some(header_value.parse::<usize>()?);
}
}
let size = size.ok_or_else(|| format_err!("no Content-Length"))?;
let mut buf = buf.into_bytes();
buf.resize(size, 0);
inp.read_exact(&mut buf)?;
let buf = String::from_utf8(buf)?;
debug!("< {}", buf);
Ok(Some(buf))
}
fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> {
debug!("> {}", msg);
write!(out, "Content-Length: {}\r\n\r\n", msg.len())?;
out.write_all(msg.as_bytes())?;
out.flush()?;
Ok(())
}

View file

@ -17,26 +17,20 @@ extern crate walkdir;
extern crate libeditor; extern crate libeditor;
extern crate libanalysis; extern crate libanalysis;
extern crate libsyntax2; extern crate libsyntax2;
extern crate gen_lsp_server;
extern crate im; extern crate im;
extern crate relative_path; extern crate relative_path;
mod io;
mod caps; mod caps;
mod req; mod req;
mod dispatch;
mod conv; mod conv;
mod main_loop; mod main_loop;
mod vfs; mod vfs;
mod path_map; mod path_map;
mod server_world; mod server_world;
use threadpool::ThreadPool;
use crossbeam_channel::bounded;
use flexi_logger::{Logger, Duplicate}; use flexi_logger::{Logger, Duplicate};
use gen_lsp_server::{run_server, stdio_transport};
use ::{
io::{Io, RawMsg, RawResponse, RawNotification},
};
pub type Result<T> = ::std::result::Result<T, ::failure::Error>; pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
@ -60,96 +54,10 @@ fn main() -> Result<()> {
} }
fn main_inner() -> Result<()> { fn main_inner() -> Result<()> {
let mut io = Io::from_stdio(); let (receiver, sender, threads) = stdio_transport();
let res = initialize(&mut io); run_server(caps::server_capabilities(), main_loop::main_loop, receiver, sender)?;
info!("shutting down IO..."); info!("shutting down IO...");
let io_res = io.stop(); threads.join()?;
info!("... IO is down"); info!("... IO is down");
match (res, io_res) { Ok(())
(Ok(()), Ok(())) => Ok(()),
(res, Ok(())) => res,
(Ok(()), io_res) => io_res,
(res, Err(io_err)) => {
error!("shutdown error: {:?}", io_err);
res
}
}
}
fn initialize(io: &mut Io) -> Result<()> {
match io.recv()? {
RawMsg::Notification(n) =>
bail!("expected initialize request, got {:?}", n),
RawMsg::Response(res) =>
bail!("expected initialize request, got {:?}", res),
RawMsg::Request(req) => {
let req = dispatch::handle_request::<req::Initialize, _>(req, |_params, resp| {
let res = req::InitializeResult { capabilities: caps::server_capabilities() };
let resp = resp.into_response(Ok(res))?;
io.send(RawMsg::Response(resp));
Ok(())
})?;
if let Err(req) = req {
bail!("expected initialize request, got {:?}", req)
}
match io.recv()? {
RawMsg::Notification(n) => {
if n.method != "initialized" {
bail!("expected initialized notification");
}
}
_ => bail!("expected initialized notification"),
}
}
}
initialized(io)
}
enum Task {
Respond(RawResponse),
Notify(RawNotification),
Die(::failure::Error),
}
fn initialized(io: &mut Io) -> Result<()> {
{
let mut pool = ThreadPool::new(4);
let (task_sender, task_receiver) = bounded::<Task>(16);
let (fs_events_receiver, watcher) = vfs::watch(vec![
::std::env::current_dir()?,
]);
info!("lifecycle: handshake finished, server ready to serve requests");
let res = main_loop::main_loop(
io,
&mut pool,
task_sender,
task_receiver.clone(),
fs_events_receiver,
);
info!("waiting for background jobs to finish...");
task_receiver.for_each(drop);
pool.join();
info!("...background jobs have finished");
info!("waiting for file watcher to finish...");
watcher.stop()?;
info!("...file watcher has finished");
res
}?;
match io.recv()? {
RawMsg::Notification(n) => {
if n.method == "exit" {
info!("lifecycle: shutdown complete");
return Ok(());
}
bail!("unexpected notification during shutdown: {:?}", n)
}
m => {
bail!("unexpected message during shutdown: {:?}", m)
}
}
} }

View file

@ -6,59 +6,97 @@ use std::{
}; };
use threadpool::ThreadPool; use threadpool::ThreadPool;
use crossbeam_channel::{Sender, Receiver}; use serde::{Serialize, de::DeserializeOwned};
use crossbeam_channel::{bounded, Sender, Receiver};
use languageserver_types::{NumberOrString}; use languageserver_types::{NumberOrString};
use libanalysis::{FileId, JobHandle, JobToken}; use libanalysis::{FileId, JobHandle, JobToken};
use gen_lsp_server::{RawRequest, RawNotification, RawMessage, RawResponse, ErrorCode};
use { use {
req, dispatch, req,
Task, Result, Result,
io::{Io, RawMsg, RawRequest, RawNotification}, vfs::{self, FileEvent},
vfs::FileEvent,
server_world::{ServerWorldState, ServerWorld}, server_world::{ServerWorldState, ServerWorld},
main_loop::subscriptions::{Subscriptions}, main_loop::subscriptions::{Subscriptions},
}; };
enum Task {
Respond(RawResponse),
Notify(RawNotification),
}
pub(super) fn main_loop( pub(super) fn main_loop(
io: &mut Io, receriver: &mut Receiver<RawMessage>,
pool: &mut ThreadPool, sender: &mut Sender<RawMessage>,
task_sender: Sender<Task>,
task_receiver: Receiver<Task>,
fs_events_receiver: Receiver<Vec<FileEvent>>,
) -> Result<()> { ) -> Result<()> {
let pool = ThreadPool::new(4);
let (task_sender, task_receiver) = bounded::<Task>(16);
let (fs_events_receiver, watcher) = vfs::watch(vec![
::std::env::current_dir()?,
]);
info!("server initialized, serving requests"); info!("server initialized, serving requests");
let mut state = ServerWorldState::new(); let mut state = ServerWorldState::new();
let mut pending_requests: HashMap<u64, JobHandle> = HashMap::new(); let mut pending_requests = HashMap::new();
let mut fs_events_receiver = Some(&fs_events_receiver);
let mut subs = Subscriptions::new(); let mut subs = Subscriptions::new();
main_loop_inner(
&pool,
receriver,
sender,
task_receiver.clone(),
task_sender,
fs_events_receiver,
&mut state,
&mut pending_requests,
&mut subs,
)?;
info!("waiting for background jobs to finish...");
task_receiver.for_each(drop);
pool.join();
info!("...background jobs have finished");
info!("waiting for file watcher to finish...");
watcher.stop()?;
info!("...file watcher has finished");
Ok(())
}
fn main_loop_inner(
pool: &ThreadPool,
msg_receiver: &mut Receiver<RawMessage>,
msg_sender: &mut Sender<RawMessage>,
task_receiver: Receiver<Task>,
task_sender: Sender<Task>,
fs_receiver: Receiver<Vec<FileEvent>>,
state: &mut ServerWorldState,
pending_requests: &mut HashMap<u64, JobHandle>,
subs: &mut Subscriptions,
) -> Result<u64> {
let mut fs_receiver = Some(fs_receiver);
loop { loop {
enum Event { enum Event {
Msg(RawMsg), Msg(RawMessage),
Task(Task), Task(Task),
Fs(Vec<FileEvent>), Fs(Vec<FileEvent>),
ReceiverDead,
FsWatcherDead, FsWatcherDead,
} }
let event = select! { let event = select! {
recv(io.receiver(), msg) => match msg { recv(msg_receiver, msg) => match msg {
Some(msg) => Event::Msg(msg), Some(msg) => Event::Msg(msg),
None => Event::ReceiverDead, None => bail!("client exited without shutdown"),
}, },
recv(task_receiver, task) => Event::Task(task.unwrap()), recv(task_receiver, task) => Event::Task(task.unwrap()),
recv(fs_events_receiver, events) => match events { recv(fs_receiver, events) => match events {
Some(events) => Event::Fs(events), Some(events) => Event::Fs(events),
None => Event::FsWatcherDead, None => Event::FsWatcherDead,
} }
}; };
let mut state_changed = false; let mut state_changed = false;
match event { match event {
Event::ReceiverDead => {
io.cleanup_receiver()?;
unreachable!();
}
Event::FsWatcherDead => { Event::FsWatcherDead => {
fs_events_receiver = None; fs_receiver = None;
} }
Event::Task(task) => { Event::Task(task) => {
match task { match task {
@ -66,12 +104,10 @@ pub(super) fn main_loop(
if let Some(handle) = pending_requests.remove(&response.id) { if let Some(handle) = pending_requests.remove(&response.id) {
assert!(handle.has_completed()); assert!(handle.has_completed());
} }
io.send(RawMsg::Response(response)) msg_sender.send(RawMessage::Response(response))
} }
Task::Notify(n) => Task::Notify(n) =>
io.send(RawMsg::Notification(n)), msg_sender.send(RawMessage::Notification(n)),
Task::Die(error) =>
return Err(error),
} }
continue; continue;
} }
@ -82,16 +118,29 @@ pub(super) fn main_loop(
} }
Event::Msg(msg) => { Event::Msg(msg) => {
match msg { match msg {
RawMsg::Request(req) => { RawMessage::Request(req) => {
if !on_request(io, &mut state, &mut pending_requests, pool, &task_sender, req)? { let req = match req.cast::<req::Shutdown>() {
return Ok(()); Ok((id, _params)) => return Ok(id),
Err(req) => req,
};
match on_request(state, pending_requests, pool, &task_sender, req)? {
None => (),
Some(req) => {
error!("unknown request: {:?}", req);
let resp = RawResponse::err(
req.id,
ErrorCode::MethodNotFound as i32,
"unknown request".to_string(),
);
msg_sender.send(RawMessage::Response(resp))
}
} }
} }
RawMsg::Notification(not) => { RawMessage::Notification(not) => {
on_notification(io, &mut state, &mut pending_requests, &mut subs, not)?; on_notification(msg_sender, state, pending_requests, subs, not)?;
state_changed = true; state_changed = true;
} }
RawMsg::Response(resp) => { RawMessage::Response(resp) => {
error!("unexpected response: {:?}", resp) error!("unexpected response: {:?}", resp)
} }
} }
@ -110,13 +159,12 @@ pub(super) fn main_loop(
} }
fn on_request( fn on_request(
io: &mut Io,
world: &mut ServerWorldState, world: &mut ServerWorldState,
pending_requests: &mut HashMap<u64, JobHandle>, pending_requests: &mut HashMap<u64, JobHandle>,
pool: &ThreadPool, pool: &ThreadPool,
sender: &Sender<Task>, sender: &Sender<Task>,
req: RawRequest, req: RawRequest,
) -> Result<bool> { ) -> Result<Option<RawRequest>> {
let mut pool_dispatcher = PoolDispatcher { let mut pool_dispatcher = PoolDispatcher {
req: Some(req), req: Some(req),
res: None, res: None,
@ -141,81 +189,73 @@ fn on_request(
Ok((id, handle)) => { Ok((id, handle)) => {
let inserted = pending_requests.insert(id, handle).is_none(); let inserted = pending_requests.insert(id, handle).is_none();
assert!(inserted, "duplicate request: {}", id); assert!(inserted, "duplicate request: {}", id);
Ok(None)
}, },
Err(req) => { Err(req) => Ok(Some(req)),
let req = dispatch::handle_request::<req::Shutdown, _>(req, |(), resp| {
let resp = resp.into_response(Ok(()))?;
io.send(RawMsg::Response(resp));
Ok(())
})?;
match req {
Ok(_id) => {
info!("lifecycle: initiating shutdown");
return Ok(false);
}
Err(req) => {
error!("unknown method: {:?}", req);
io.send(RawMsg::Response(dispatch::unknown_method(req.id)?));
}
}
}
} }
Ok(true)
} }
fn on_notification( fn on_notification(
io: &mut Io, msg_sender: &mut Sender<RawMessage>,
state: &mut ServerWorldState, state: &mut ServerWorldState,
pending_requests: &mut HashMap<u64, JobHandle>, pending_requests: &mut HashMap<u64, JobHandle>,
subs: &mut Subscriptions, subs: &mut Subscriptions,
not: RawNotification, not: RawNotification,
) -> Result<()> { ) -> Result<()> {
let mut not = Some(not); let not = match not.cast::<req::Cancel>() {
dispatch::handle_notification::<req::Cancel, _>(&mut not, |params| { Ok(params) => {
let id = match params.id { let id = match params.id {
NumberOrString::Number(id) => id, NumberOrString::Number(id) => id,
NumberOrString::String(id) => { NumberOrString::String(id) => {
panic!("string id's not supported: {:?}", id); panic!("string id's not supported: {:?}", id);
}
};
if let Some(handle) = pending_requests.remove(&id) {
handle.cancel();
} }
}; return Ok(())
if let Some(handle) = pending_requests.remove(&id) {
handle.cancel();
} }
Ok(()) Err(not) => not,
})?; };
dispatch::handle_notification::<req::DidOpenTextDocument, _>(&mut not, |params| { let not = match not.cast::<req::DidOpenTextDocument>() {
let uri = params.text_document.uri; Ok(params) => {
let path = uri.to_file_path() let uri = params.text_document.uri;
.map_err(|()| format_err!("invalid uri: {}", uri))?; let path = uri.to_file_path()
let file_id = state.add_mem_file(path, params.text_document.text); .map_err(|()| format_err!("invalid uri: {}", uri))?;
subs.add_sub(file_id); let file_id = state.add_mem_file(path, params.text_document.text);
Ok(()) subs.add_sub(file_id);
})?; return Ok(())
dispatch::handle_notification::<req::DidChangeTextDocument, _>(&mut not, |mut params| { }
let uri = params.text_document.uri; Err(not) => not,
let path = uri.to_file_path() };
.map_err(|()| format_err!("invalid uri: {}", uri))?; let not = match not.cast::<req::DidChangeTextDocument>() {
let text = params.content_changes.pop() Ok(mut params) => {
.ok_or_else(|| format_err!("empty changes"))? let uri = params.text_document.uri;
.text; let path = uri.to_file_path()
state.change_mem_file(path.as_path(), text)?; .map_err(|()| format_err!("invalid uri: {}", uri))?;
Ok(()) let text = params.content_changes.pop()
})?; .ok_or_else(|| format_err!("empty changes"))?
dispatch::handle_notification::<req::DidCloseTextDocument, _>(&mut not, |params| { .text;
let uri = params.text_document.uri; state.change_mem_file(path.as_path(), text)?;
let path = uri.to_file_path() return Ok(())
.map_err(|()| format_err!("invalid uri: {}", uri))?; }
let file_id = state.remove_mem_file(path.as_path())?; Err(not) => not,
subs.remove_sub(file_id); };
let not = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; let not = match not.cast::<req::DidCloseTextDocument>() {
let not = dispatch::send_notification::<req::PublishDiagnostics>(not); Ok(params) => {
io.send(RawMsg::Notification(not)); let uri = params.text_document.uri;
Ok(()) let path = uri.to_file_path()
})?; .map_err(|()| format_err!("invalid uri: {}", uri))?;
let file_id = state.remove_mem_file(path.as_path())?;
if let Some(not) = not { subs.remove_sub(file_id);
error!("unhandled notification: {:?}", not); let params = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() };
} let not = RawNotification::new::<req::PublishDiagnostics>(params);
msg_sender.send(RawMessage::Notification(not));
return Ok(())
}
Err(not) => not,
};
error!("unhandled notification: {:?}", not);
Ok(()) Ok(())
} }
@ -228,10 +268,14 @@ struct PoolDispatcher<'a> {
} }
impl<'a> PoolDispatcher<'a> { impl<'a> PoolDispatcher<'a> {
fn on<'b, R: req::ClientRequest>( fn on<'b, R>(
&'b mut self, &'b mut self,
f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result> f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result>
) -> Result<&'b mut Self> { ) -> Result<&'b mut Self>
where R: req::Request,
R::Params: DeserializeOwned + Send + 'static,
R::Result: Serialize + 'static,
{
let req = match self.req.take() { let req = match self.req.take() {
None => return Ok(self), None => return Ok(self),
Some(req) => req, Some(req) => req,
@ -239,23 +283,24 @@ impl<'a> PoolDispatcher<'a> {
let world = self.world; let world = self.world;
let sender = self.sender; let sender = self.sender;
let pool = self.pool; let pool = self.pool;
let (handle, token) = JobHandle::new(); match req.cast::<R>() {
let req = dispatch::handle_request::<R, _>(req, |params, resp| { Ok((id, params)) => {
let world = world.snapshot(); let (handle, token) = JobHandle::new();
let sender = sender.clone(); let world = world.snapshot();
pool.execute(move || { let sender = sender.clone();
let res = f(world, params, token); pool.execute(move || {
let task = match resp.into_response(res) { let resp = match f(world, params, token) {
Ok(resp) => Task::Respond(resp), Ok(resp) => RawResponse::ok(id, resp),
Err(e) => Task::Die(e), Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()),
}; };
sender.send(task); let task = Task::Respond(resp);
}); sender.send(task);
Ok(()) });
})?; self.res = Some((id, handle));
match req { }
Ok(id) => self.res = Some((id, handle)), Err(req) => {
Err(req) => self.req = Some(req), self.req = Some(req)
}
} }
Ok(self) Ok(self)
} }
@ -282,7 +327,7 @@ fn update_file_notifications_on_threadpool(
error!("failed to compute diagnostics: {:?}", e) error!("failed to compute diagnostics: {:?}", e)
} }
Ok(params) => { Ok(params) => {
let not = dispatch::send_notification::<req::PublishDiagnostics>(params); let not = RawNotification::new::<req::PublishDiagnostics>(params);
sender.send(Task::Notify(not)); sender.send(Task::Notify(not));
} }
} }
@ -291,7 +336,7 @@ fn update_file_notifications_on_threadpool(
error!("failed to compute decorations: {:?}", e) error!("failed to compute decorations: {:?}", e)
} }
Ok(params) => { Ok(params) => {
let not = dispatch::send_notification::<req::PublishDecorations>(params); let not = RawNotification::new::<req::PublishDecorations>(params);
sender.send(Task::Notify(not)) sender.send(Task::Notify(not))
} }
} }