Move ReqQueue to lsp-server

This commit is contained in:
Aleksey Kladov 2020-06-20 23:08:01 +02:00
parent b575b02449
commit f3cf85ab48
7 changed files with 109 additions and 192 deletions

4
Cargo.lock generated
View file

@ -640,9 +640,9 @@ dependencies = [
[[package]] [[package]]
name = "lsp-server" name = "lsp-server"
version = "0.3.2" version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dccec31bfd027ac0dd288a78e19005fd89624d9099456e284b5241316a6c3072" checksum = "53b4ace8ebe5d2aff3687ce0ed507f6020d6a47a7de2b0d3d664ea237ffb0c62"
dependencies = [ dependencies = [
"crossbeam-channel", "crossbeam-channel",
"log", "log",

View file

@ -32,7 +32,7 @@ threadpool = "1.7.1"
stdx = { path = "../stdx" } stdx = { path = "../stdx" }
lsp-server = "0.3.2" lsp-server = "0.3.3"
ra_flycheck = { path = "../ra_flycheck" } ra_flycheck = { path = "../ra_flycheck" }
ra_ide = { path = "../ra_ide" } ra_ide = { path = "../ra_ide" }
ra_prof = { path = "../ra_prof" } ra_prof = { path = "../ra_prof" }

View file

@ -20,7 +20,7 @@ use stdx::format_to;
use crate::{ use crate::{
config::{Config, FilesWatcher}, config::{Config, FilesWatcher},
diagnostics::{CheckFixes, DiagnosticCollection}, diagnostics::{CheckFixes, DiagnosticCollection},
main_loop::req_queue::{CompletedInRequest, LatestRequests}, main_loop::request_metrics::{LatestRequests, RequestMetrics},
to_proto::url_from_abs_path, to_proto::url_from_abs_path,
vfs_glob::{Glob, RustPackageFilterBuilder}, vfs_glob::{Glob, RustPackageFilterBuilder},
LspError, Result, LspError, Result,
@ -55,10 +55,10 @@ pub struct GlobalState {
pub analysis_host: AnalysisHost, pub analysis_host: AnalysisHost,
pub vfs: Arc<RwLock<Vfs>>, pub vfs: Arc<RwLock<Vfs>>,
pub task_receiver: Receiver<VfsTask>, pub task_receiver: Receiver<VfsTask>,
pub latest_requests: Arc<RwLock<LatestRequests>>,
pub flycheck: Option<Flycheck>, pub flycheck: Option<Flycheck>,
pub diagnostics: DiagnosticCollection, pub diagnostics: DiagnosticCollection,
pub proc_macro_client: ProcMacroClient, pub proc_macro_client: ProcMacroClient,
pub(crate) latest_requests: Arc<RwLock<LatestRequests>>,
} }
/// An immutable snapshot of the world's state at a point in time. /// An immutable snapshot of the world's state at a point in time.
@ -66,8 +66,8 @@ pub struct GlobalStateSnapshot {
pub config: Config, pub config: Config,
pub workspaces: Arc<Vec<ProjectWorkspace>>, pub workspaces: Arc<Vec<ProjectWorkspace>>,
pub analysis: Analysis, pub analysis: Analysis,
pub latest_requests: Arc<RwLock<LatestRequests>>,
pub check_fixes: CheckFixes, pub check_fixes: CheckFixes,
pub(crate) latest_requests: Arc<RwLock<LatestRequests>>,
vfs: Arc<RwLock<Vfs>>, vfs: Arc<RwLock<Vfs>>,
} }
@ -236,7 +236,7 @@ impl GlobalState {
self.analysis_host.collect_garbage() self.analysis_host.collect_garbage()
} }
pub(crate) fn complete_request(&mut self, request: CompletedInRequest) { pub(crate) fn complete_request(&mut self, request: RequestMetrics) {
self.latest_requests.write().record(request) self.latest_requests.write().record(request)
} }
} }

View file

@ -32,7 +32,7 @@ mod semantic_tokens;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; pub type Result<T, E = Box<dyn std::error::Error + Send + Sync>> = std::result::Result<T, E>;
pub use crate::{ pub use crate::{
caps::server_capabilities, caps::server_capabilities,
main_loop::LspError, main_loop::LspError,

View file

@ -3,7 +3,7 @@
mod handlers; mod handlers;
mod subscriptions; mod subscriptions;
pub(crate) mod req_queue; pub(crate) mod request_metrics;
use std::{ use std::{
borrow::Cow, borrow::Cow,
@ -17,11 +17,13 @@ use std::{
}; };
use crossbeam_channel::{never, select, unbounded, RecvError, Sender}; use crossbeam_channel::{never, select, unbounded, RecvError, Sender};
use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; use lsp_server::{
Connection, ErrorCode, Message, Notification, ReqQueue, Request, RequestId, Response,
};
use lsp_types::{ use lsp_types::{
DidChangeTextDocumentParams, NumberOrString, TextDocumentContentChangeEvent, WorkDoneProgress, request::Request as _, DidChangeTextDocumentParams, NumberOrString,
WorkDoneProgressBegin, WorkDoneProgressCreateParams, WorkDoneProgressEnd, TextDocumentContentChangeEvent, WorkDoneProgress, WorkDoneProgressBegin,
WorkDoneProgressReport, WorkDoneProgressCreateParams, WorkDoneProgressEnd, WorkDoneProgressReport,
}; };
use ra_flycheck::{CheckTask, Status}; use ra_flycheck::{CheckTask, Status};
use ra_ide::{Canceled, FileId, LineIndex}; use ra_ide::{Canceled, FileId, LineIndex};
@ -37,10 +39,9 @@ use crate::{
from_proto, from_proto,
global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot}, global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot},
lsp_ext, lsp_ext,
main_loop::subscriptions::Subscriptions, main_loop::{request_metrics::RequestMetrics, subscriptions::Subscriptions},
Result, Result,
}; };
use req_queue::ReqQueue;
#[derive(Debug)] #[derive(Debug)]
pub struct LspError { pub struct LspError {
@ -150,10 +151,11 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
register_options: Some(serde_json::to_value(registration_options).unwrap()), register_options: Some(serde_json::to_value(registration_options).unwrap()),
}; };
let params = lsp_types::RegistrationParams { registrations: vec![registration] }; let params = lsp_types::RegistrationParams { registrations: vec![registration] };
let request = loop_state let request = loop_state.req_queue.outgoing.register(
.req_queue lsp_types::request::RegisterCapability::METHOD.to_string(),
.outgoing params,
.register::<lsp_types::request::RegisterCapability>(params, |_, _| ()); DO_NOTHING,
);
connection.sender.send(request.into()).unwrap(); connection.sender.send(request.into()).unwrap();
} }
@ -261,9 +263,13 @@ impl fmt::Debug for Event {
} }
} }
type ReqHandler = fn(&mut GlobalState, Response);
const DO_NOTHING: ReqHandler = |_, _| ();
type Incoming = lsp_server::Incoming<(&'static str, Instant)>;
#[derive(Default)] #[derive(Default)]
struct LoopState { struct LoopState {
req_queue: ReqQueue<fn(&mut GlobalState, lsp_server::Response)>, req_queue: ReqQueue<(&'static str, Instant), ReqHandler>,
subscriptions: Subscriptions, subscriptions: Subscriptions,
workspace_loaded: bool, workspace_loaded: bool,
roots_progress_reported: Option<usize>, roots_progress_reported: Option<usize>,
@ -367,14 +373,19 @@ fn loop_turn(
fn on_task( fn on_task(
task: Task, task: Task,
msg_sender: &Sender<Message>, msg_sender: &Sender<Message>,
incoming_requests: &mut req_queue::Incoming, incoming_requests: &mut Incoming,
state: &mut GlobalState, state: &mut GlobalState,
) { ) {
match task { match task {
Task::Respond(response) => { Task::Respond(response) => {
if let Some(completed) = incoming_requests.complete(response.id.clone()) { if let Some((method, start)) = incoming_requests.complete(response.id.clone()) {
log::info!("handled req#{} in {:?}", completed.id, completed.duration); let duration = start.elapsed();
state.complete_request(completed); log::info!("handled req#{} in {:?}", response.id, duration);
state.complete_request(RequestMetrics {
id: response.id.clone(),
method: method.to_string(),
duration,
});
msg_sender.send(response.into()).unwrap(); msg_sender.send(response.into()).unwrap();
} }
} }
@ -387,7 +398,7 @@ fn on_task(
fn on_request( fn on_request(
global_state: &mut GlobalState, global_state: &mut GlobalState,
incoming_requests: &mut req_queue::Incoming, incoming_requests: &mut Incoming,
pool: &ThreadPool, pool: &ThreadPool,
task_sender: &Sender<Task>, task_sender: &Sender<Task>,
msg_sender: &Sender<Message>, msg_sender: &Sender<Message>,
@ -527,10 +538,8 @@ fn on_notification(
Ok(_) => { Ok(_) => {
// As stated in https://github.com/microsoft/language-server-protocol/issues/676, // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
// this notification's parameters should be ignored and the actual config queried separately. // this notification's parameters should be ignored and the actual config queried separately.
let request = loop_state let request = loop_state.req_queue.outgoing.register(
.req_queue lsp_types::request::WorkspaceConfiguration::METHOD.to_string(),
.outgoing
.register::<lsp_types::request::WorkspaceConfiguration>(
lsp_types::ConfigurationParams { lsp_types::ConfigurationParams {
items: vec![lsp_types::ConfigurationItem { items: vec![lsp_types::ConfigurationItem {
scope_uri: None, scope_uri: None,
@ -552,9 +561,9 @@ fn on_notification(
global_state.update_configuration(config); global_state.update_configuration(config);
} }
} }
(None, None) => log::error!( (None, None) => {
"received empty server settings response from the client" log::error!("received empty server settings response from the client")
), }
} }
}, },
); );
@ -727,14 +736,12 @@ fn send_startup_progress(sender: &Sender<Message>, loop_state: &mut LoopState) {
match (prev, loop_state.workspace_loaded) { match (prev, loop_state.workspace_loaded) {
(None, false) => { (None, false) => {
let request = loop_state let request = loop_state.req_queue.outgoing.register(
.req_queue lsp_types::request::WorkDoneProgressCreate::METHOD.to_string(),
.outgoing
.register::<lsp_types::request::WorkDoneProgressCreate>(
WorkDoneProgressCreateParams { WorkDoneProgressCreateParams {
token: lsp_types::ProgressToken::String("rustAnalyzer/startup".into()), token: lsp_types::ProgressToken::String("rustAnalyzer/startup".into()),
}, },
|_, _| (), DO_NOTHING,
); );
sender.send(request.into()).unwrap(); sender.send(request.into()).unwrap();
send_startup_progress_notif( send_startup_progress_notif(
@ -778,7 +785,7 @@ struct PoolDispatcher<'a> {
req: Option<Request>, req: Option<Request>,
pool: &'a ThreadPool, pool: &'a ThreadPool,
global_state: &'a mut GlobalState, global_state: &'a mut GlobalState,
incoming_requests: &'a mut req_queue::Incoming, incoming_requests: &'a mut Incoming,
msg_sender: &'a Sender<Message>, msg_sender: &'a Sender<Message>,
task_sender: &'a Sender<Task>, task_sender: &'a Sender<Task>,
request_received: Instant, request_received: Instant,
@ -854,11 +861,7 @@ impl<'a> PoolDispatcher<'a> {
return None; return None;
} }
}; };
self.incoming_requests.register(req_queue::PendingInRequest { self.incoming_requests.register(id.clone(), (R::METHOD, self.request_received));
id: id.clone(),
method: R::METHOD.to_string(),
received: self.request_received,
});
Some((id, params)) Some((id, params))
} }

View file

@ -1,123 +0,0 @@
//! Manages the set of in-flight requests in both directions.
use std::time::{Duration, Instant};
use lsp_server::RequestId;
use rustc_hash::FxHashMap;
use serde::Serialize;
#[derive(Debug)]
pub(crate) struct ReqQueue<H> {
pub(crate) incoming: Incoming,
pub(crate) outgoing: Outgoing<H>,
}
impl<H> Default for ReqQueue<H> {
fn default() -> Self {
ReqQueue { incoming: Incoming::default(), outgoing: Outgoing::default() }
}
}
#[derive(Debug)]
pub(crate) struct Outgoing<H> {
next: u64,
pending: FxHashMap<RequestId, H>,
}
impl<H> Default for Outgoing<H> {
fn default() -> Self {
Outgoing { next: 0, pending: FxHashMap::default() }
}
}
impl<H> Outgoing<H> {
pub(crate) fn register<R>(&mut self, params: R::Params, handler: H) -> lsp_server::Request
where
R: lsp_types::request::Request,
R::Params: Serialize,
{
let id = RequestId::from(self.next);
self.next += 1;
self.pending.insert(id.clone(), handler);
lsp_server::Request::new(id, R::METHOD.to_string(), params)
}
pub(crate) fn complete(&mut self, id: RequestId) -> H {
self.pending.remove(&id).unwrap()
}
}
#[derive(Debug)]
pub(crate) struct CompletedInRequest {
pub(crate) id: RequestId,
pub(crate) method: String,
pub(crate) duration: Duration,
}
#[derive(Debug)]
pub(crate) struct PendingInRequest {
pub(crate) id: RequestId,
pub(crate) method: String,
pub(crate) received: Instant,
}
impl From<PendingInRequest> for CompletedInRequest {
fn from(pending: PendingInRequest) -> CompletedInRequest {
CompletedInRequest {
id: pending.id,
method: pending.method,
duration: pending.received.elapsed(),
}
}
}
#[derive(Debug, Default)]
pub(crate) struct Incoming {
pending: FxHashMap<RequestId, PendingInRequest>,
}
impl Incoming {
pub(crate) fn register(&mut self, request: PendingInRequest) {
let id = request.id.clone();
let prev = self.pending.insert(id.clone(), request);
assert!(prev.is_none(), "duplicate request with id {}", id);
}
pub(crate) fn cancel(&mut self, id: RequestId) -> Option<lsp_server::Response> {
if self.pending.remove(&id).is_some() {
Some(lsp_server::Response::new_err(
id,
lsp_server::ErrorCode::RequestCanceled as i32,
"canceled by client".to_string(),
))
} else {
None
}
}
pub(crate) fn complete(&mut self, id: RequestId) -> Option<CompletedInRequest> {
self.pending.remove(&id).map(CompletedInRequest::from)
}
}
const N_COMPLETED_REQUESTS: usize = 10;
#[derive(Debug, Default)]
pub struct LatestRequests {
// hand-rolling VecDeque here to print things in a nicer way
buf: [Option<CompletedInRequest>; N_COMPLETED_REQUESTS],
idx: usize,
}
impl LatestRequests {
pub(crate) fn record(&mut self, request: CompletedInRequest) {
// special case: don't track status request itself
if request.method == "rust-analyzer/analyzerStatus" {
return;
}
let idx = self.idx;
self.buf[idx] = Some(request);
self.idx = (idx + 1) % N_COMPLETED_REQUESTS;
}
pub(crate) fn iter(&self) -> impl Iterator<Item = (bool, &CompletedInRequest)> {
let idx = self.idx;
self.buf.iter().enumerate().filter_map(move |(i, req)| Some((i == idx, req.as_ref()?)))
}
}

View file

@ -0,0 +1,37 @@
//! Records stats about requests
use std::time::Duration;
use lsp_server::RequestId;
#[derive(Debug)]
pub(crate) struct RequestMetrics {
pub(crate) id: RequestId,
pub(crate) method: String,
pub(crate) duration: Duration,
}
const N_COMPLETED_REQUESTS: usize = 10;
#[derive(Debug, Default)]
pub(crate) struct LatestRequests {
// hand-rolling VecDeque here to print things in a nicer way
buf: [Option<RequestMetrics>; N_COMPLETED_REQUESTS],
idx: usize,
}
impl LatestRequests {
pub(crate) fn record(&mut self, request: RequestMetrics) {
// special case: don't track status request itself
if request.method == "rust-analyzer/analyzerStatus" {
return;
}
let idx = self.idx;
self.buf[idx] = Some(request);
self.idx = (idx + 1) % N_COMPLETED_REQUESTS;
}
pub(crate) fn iter(&self) -> impl Iterator<Item = (bool, &RequestMetrics)> {
let idx = self.idx;
self.buf.iter().enumerate().filter_map(move |(i, req)| Some((i == idx, req.as_ref()?)))
}
}