rust-analyzer/lib/lsp-server/src/lib.rs
2024-05-30 16:18:49 -07:00

440 lines
16 KiB
Rust

//! A language server scaffold, exposing a synchronous crossbeam-channel based API.
//! This crate handles protocol handshaking and parsing messages, while you
//! control the message dispatch loop yourself.
//!
//! Run with `RUST_LOG=lsp_server=debug` to see all the messages.
#![warn(rust_2018_idioms, unused_lifetimes)]
#![allow(clippy::print_stdout, clippy::disallowed_types)]
mod error;
mod msg;
mod req_queue;
mod socket;
mod stdio;
use std::{
io,
net::{TcpListener, TcpStream, ToSocketAddrs},
};
use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
pub use crate::{
error::{ExtractError, ProtocolError},
msg::{ErrorCode, Message, Notification, Request, RequestId, Response, ResponseError},
req_queue::{Incoming, Outgoing, ReqQueue},
stdio::IoThreads,
};
/// Connection is just a pair of channels of LSP messages.
pub struct Connection {
pub sender: Sender<Message>,
pub receiver: Receiver<Message>,
}
impl Connection {
/// Create connection over standard in/standard out.
///
/// Use this to create a real language server.
pub fn stdio() -> (Connection, IoThreads) {
let (sender, receiver, io_threads) = stdio::stdio_transport();
(Connection { sender, receiver }, io_threads)
}
/// Open a connection over tcp.
/// This call blocks until a connection is established.
///
/// Use this to create a real language server.
pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> {
let stream = TcpStream::connect(addr)?;
let (sender, receiver, io_threads) = socket::socket_transport(stream);
Ok((Connection { sender, receiver }, io_threads))
}
/// Listen for a connection over tcp.
/// This call blocks until a connection is established.
///
/// Use this to create a real language server.
pub fn listen<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> {
let listener = TcpListener::bind(addr)?;
let (stream, _) = listener.accept()?;
let (sender, receiver, io_threads) = socket::socket_transport(stream);
Ok((Connection { sender, receiver }, io_threads))
}
/// Creates a pair of connected connections.
///
/// Use this for testing.
pub fn memory() -> (Connection, Connection) {
let (s1, r1) = crossbeam_channel::unbounded();
let (s2, r2) = crossbeam_channel::unbounded();
(Connection { sender: s1, receiver: r2 }, Connection { sender: s2, receiver: r1 })
}
/// Starts the initialization process by waiting for an initialize
/// request from the client. Use this for more advanced customization than
/// `initialize` can provide.
///
/// Returns the request id and serialized `InitializeParams` from the client.
///
/// # Example
///
/// ```no_run
/// use std::error::Error;
/// use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities};
///
/// use lsp_server::{Connection, Message, Request, RequestId, Response};
///
/// fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
/// // Create the transport. Includes the stdio (stdin and stdout) versions but this could
/// // also be implemented to use sockets or HTTP.
/// let (connection, io_threads) = Connection::stdio();
///
/// // Run the server
/// let (id, params) = connection.initialize_start()?;
///
/// let init_params: InitializeParams = serde_json::from_value(params).unwrap();
/// let client_capabilities: ClientCapabilities = init_params.capabilities;
/// let server_capabilities = ServerCapabilities::default();
///
/// let initialize_data = serde_json::json!({
/// "capabilities": server_capabilities,
/// "serverInfo": {
/// "name": "lsp-server-test",
/// "version": "0.1"
/// }
/// });
///
/// connection.initialize_finish(id, initialize_data)?;
///
/// // ... Run main loop ...
///
/// Ok(())
/// }
/// ```
pub fn initialize_start(&self) -> Result<(RequestId, serde_json::Value), ProtocolError> {
self.initialize_start_while(|| true)
}
/// Starts the initialization process by waiting for an initialize as described in
/// [`Self::initialize_start`] as long as `running` returns
/// `true` while the return value can be changed through a sig handler such as `CTRL + C`.
///
/// # Example
///
/// ```rust
/// use std::sync::atomic::{AtomicBool, Ordering};
/// use std::sync::Arc;
/// # use std::error::Error;
/// # use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities};
/// # use lsp_server::{Connection, Message, Request, RequestId, Response};
/// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
/// let running = Arc::new(AtomicBool::new(true));
/// # running.store(true, Ordering::SeqCst);
/// let r = running.clone();
///
/// ctrlc::set_handler(move || {
/// r.store(false, Ordering::SeqCst);
/// }).expect("Error setting Ctrl-C handler");
///
/// let (connection, io_threads) = Connection::stdio();
///
/// let res = connection.initialize_start_while(|| running.load(Ordering::SeqCst));
/// # assert!(res.is_err());
///
/// # Ok(())
/// # }
/// ```
pub fn initialize_start_while<C>(
&self,
running: C,
) -> Result<(RequestId, serde_json::Value), ProtocolError>
where
C: Fn() -> bool,
{
while running() {
let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) {
Ok(msg) => msg,
Err(RecvTimeoutError::Timeout) => {
continue;
}
Err(RecvTimeoutError::Disconnected) => return Err(ProtocolError::disconnected()),
};
match msg {
Message::Request(req) if req.is_initialize() => return Ok((req.id, req.params)),
// Respond to non-initialize requests with ServerNotInitialized
Message::Request(req) => {
let resp = Response::new_err(
req.id.clone(),
ErrorCode::ServerNotInitialized as i32,
format!("expected initialize request, got {req:?}"),
);
self.sender.send(resp.into()).unwrap();
continue;
}
Message::Notification(n) if !n.is_exit() => {
continue;
}
msg => {
return Err(ProtocolError::new(format!(
"expected initialize request, got {msg:?}"
)));
}
};
}
Err(ProtocolError::new(String::from(
"Initialization has been aborted during initialization",
)))
}
/// Finishes the initialization process by sending an `InitializeResult` to the client
pub fn initialize_finish(
&self,
initialize_id: RequestId,
initialize_result: serde_json::Value,
) -> Result<(), ProtocolError> {
let resp = Response::new_ok(initialize_id, initialize_result);
self.sender.send(resp.into()).unwrap();
match &self.receiver.recv() {
Ok(Message::Notification(n)) if n.is_initialized() => Ok(()),
Ok(msg) => Err(ProtocolError::new(format!(
r#"expected initialized notification, got: {msg:?}"#
))),
Err(RecvError) => Err(ProtocolError::disconnected()),
}
}
/// Finishes the initialization process as described in [`Self::initialize_finish`] as
/// long as `running` returns `true` while the return value can be changed through a sig
/// handler such as `CTRL + C`.
pub fn initialize_finish_while<C>(
&self,
initialize_id: RequestId,
initialize_result: serde_json::Value,
running: C,
) -> Result<(), ProtocolError>
where
C: Fn() -> bool,
{
let resp = Response::new_ok(initialize_id, initialize_result);
self.sender.send(resp.into()).unwrap();
while running() {
let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) {
Ok(msg) => msg,
Err(RecvTimeoutError::Timeout) => {
continue;
}
Err(RecvTimeoutError::Disconnected) => {
return Err(ProtocolError::disconnected());
}
};
match msg {
Message::Notification(n) if n.is_initialized() => {
return Ok(());
}
msg => {
return Err(ProtocolError::new(format!(
r#"expected initialized notification, got: {msg:?}"#
)));
}
}
}
Err(ProtocolError::new(String::from(
"Initialization has been aborted during initialization",
)))
}
/// Initialize the connection. Sends the server capabilities
/// to the client and returns the serialized client capabilities
/// on success. If more fine-grained initialization is required use
/// `initialize_start`/`initialize_finish`.
///
/// # Example
///
/// ```no_run
/// use std::error::Error;
/// use lsp_types::ServerCapabilities;
///
/// use lsp_server::{Connection, Message, Request, RequestId, Response};
///
/// fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
/// // Create the transport. Includes the stdio (stdin and stdout) versions but this could
/// // also be implemented to use sockets or HTTP.
/// let (connection, io_threads) = Connection::stdio();
///
/// // Run the server
/// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap();
/// let initialization_params = connection.initialize(server_capabilities)?;
///
/// // ... Run main loop ...
///
/// Ok(())
/// }
/// ```
pub fn initialize(
&self,
server_capabilities: serde_json::Value,
) -> Result<serde_json::Value, ProtocolError> {
let (id, params) = self.initialize_start()?;
let initialize_data = serde_json::json!({
"capabilities": server_capabilities,
});
self.initialize_finish(id, initialize_data)?;
Ok(params)
}
/// Initialize the connection as described in [`Self::initialize`] as long as `running` returns
/// `true` while the return value can be changed through a sig handler such as `CTRL + C`.
///
/// # Example
///
/// ```rust
/// use std::sync::atomic::{AtomicBool, Ordering};
/// use std::sync::Arc;
/// # use std::error::Error;
/// # use lsp_types::ServerCapabilities;
/// # use lsp_server::{Connection, Message, Request, RequestId, Response};
///
/// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
/// let running = Arc::new(AtomicBool::new(true));
/// # running.store(true, Ordering::SeqCst);
/// let r = running.clone();
///
/// ctrlc::set_handler(move || {
/// r.store(false, Ordering::SeqCst);
/// }).expect("Error setting Ctrl-C handler");
///
/// let (connection, io_threads) = Connection::stdio();
///
/// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap();
/// let initialization_params = connection.initialize_while(
/// server_capabilities,
/// || running.load(Ordering::SeqCst)
/// );
///
/// # assert!(initialization_params.is_err());
/// # Ok(())
/// # }
/// ```
pub fn initialize_while<C>(
&self,
server_capabilities: serde_json::Value,
running: C,
) -> Result<serde_json::Value, ProtocolError>
where
C: Fn() -> bool,
{
let (id, params) = self.initialize_start_while(&running)?;
let initialize_data = serde_json::json!({
"capabilities": server_capabilities,
});
self.initialize_finish_while(id, initialize_data, running)?;
Ok(params)
}
/// If `req` is `Shutdown`, respond to it and return `true`, otherwise return `false`
pub fn handle_shutdown(&self, req: &Request) -> Result<bool, ProtocolError> {
if !req.is_shutdown() {
return Ok(false);
}
let resp = Response::new_ok(req.id.clone(), ());
let _ = self.sender.send(resp.into());
match &self.receiver.recv_timeout(std::time::Duration::from_secs(30)) {
Ok(Message::Notification(n)) if n.is_exit() => (),
Ok(msg) => {
return Err(ProtocolError::new(format!(
"unexpected message during shutdown: {msg:?}"
)))
}
Err(RecvTimeoutError::Timeout) => {
return Err(ProtocolError::new(
"timed out waiting for exit notification".to_owned(),
))
}
Err(RecvTimeoutError::Disconnected) => {
return Err(ProtocolError::new(
"channel disconnected waiting for exit notification".to_owned(),
))
}
}
Ok(true)
}
}
#[cfg(test)]
mod tests {
use crossbeam_channel::unbounded;
use lsp_types::notification::{Exit, Initialized, Notification};
use lsp_types::request::{Initialize, Request};
use lsp_types::{InitializeParams, InitializedParams};
use serde_json::to_value;
use crate::{Connection, Message, ProtocolError, RequestId};
struct TestCase {
test_messages: Vec<Message>,
expected_resp: Result<(RequestId, serde_json::Value), ProtocolError>,
}
fn initialize_start_test(test_case: TestCase) {
let (reader_sender, reader_receiver) = unbounded::<Message>();
let (writer_sender, writer_receiver) = unbounded::<Message>();
let conn = Connection { sender: writer_sender, receiver: reader_receiver };
for msg in test_case.test_messages {
assert!(reader_sender.send(msg).is_ok());
}
let resp = conn.initialize_start();
assert_eq!(test_case.expected_resp, resp);
assert!(writer_receiver.recv_timeout(std::time::Duration::from_secs(1)).is_err());
}
#[test]
fn not_exit_notification() {
let notification = crate::Notification {
method: Initialized::METHOD.to_owned(),
params: to_value(InitializedParams {}).unwrap(),
};
let params_as_value = to_value(InitializeParams::default()).unwrap();
let req_id = RequestId::from(234);
let request = crate::Request {
id: req_id.clone(),
method: Initialize::METHOD.to_owned(),
params: params_as_value.clone(),
};
initialize_start_test(TestCase {
test_messages: vec![notification.into(), request.into()],
expected_resp: Ok((req_id, params_as_value)),
});
}
#[test]
fn exit_notification() {
let notification =
crate::Notification { method: Exit::METHOD.to_owned(), params: to_value(()).unwrap() };
let notification_msg = Message::from(notification);
initialize_start_test(TestCase {
test_messages: vec![notification_msg.clone()],
expected_resp: Err(ProtocolError::new(format!(
"expected initialize request, got {notification_msg:?}"
))),
});
}
}