//! A language server scaffold, exposing a synchronous crossbeam-channel based API. //! This crate handles protocol handshaking and parsing messages, while you //! control the message dispatch loop yourself. //! //! Run with `RUST_LOG=lsp_server=debug` to see all the messages. #![warn(rust_2018_idioms, unused_lifetimes)] mod msg; mod stdio; mod error; mod socket; mod req_queue; use std::{ io, net::{TcpListener, TcpStream, ToSocketAddrs}, }; use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender}; pub use crate::{ error::{ExtractError, ProtocolError}, msg::{ErrorCode, Message, Notification, Request, RequestId, Response, ResponseError}, req_queue::{Incoming, Outgoing, ReqQueue}, stdio::IoThreads, }; /// Connection is just a pair of channels of LSP messages. pub struct Connection { pub sender: Sender, pub receiver: Receiver, } impl Connection { /// Create connection over standard in/standard out. /// /// Use this to create a real language server. pub fn stdio() -> (Connection, IoThreads) { let (sender, receiver, io_threads) = stdio::stdio_transport(); (Connection { sender, receiver }, io_threads) } /// Open a connection over tcp. /// This call blocks until a connection is established. /// /// Use this to create a real language server. pub fn connect(addr: A) -> io::Result<(Connection, IoThreads)> { let stream = TcpStream::connect(addr)?; let (sender, receiver, io_threads) = socket::socket_transport(stream); Ok((Connection { sender, receiver }, io_threads)) } /// Listen for a connection over tcp. /// This call blocks until a connection is established. /// /// Use this to create a real language server. pub fn listen(addr: A) -> io::Result<(Connection, IoThreads)> { let listener = TcpListener::bind(addr)?; let (stream, _) = listener.accept()?; let (sender, receiver, io_threads) = socket::socket_transport(stream); Ok((Connection { sender, receiver }, io_threads)) } /// Creates a pair of connected connections. /// /// Use this for testing. pub fn memory() -> (Connection, Connection) { let (s1, r1) = crossbeam_channel::unbounded(); let (s2, r2) = crossbeam_channel::unbounded(); (Connection { sender: s1, receiver: r2 }, Connection { sender: s2, receiver: r1 }) } /// Starts the initialization process by waiting for an initialize /// request from the client. Use this for more advanced customization than /// `initialize` can provide. /// /// Returns the request id and serialized `InitializeParams` from the client. /// /// # Example /// /// ```no_run /// use std::error::Error; /// use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities}; /// /// use lsp_server::{Connection, Message, Request, RequestId, Response}; /// /// fn main() -> Result<(), Box> { /// // Create the transport. Includes the stdio (stdin and stdout) versions but this could /// // also be implemented to use sockets or HTTP. /// let (connection, io_threads) = Connection::stdio(); /// /// // Run the server /// let (id, params) = connection.initialize_start()?; /// /// let init_params: InitializeParams = serde_json::from_value(params).unwrap(); /// let client_capabilities: ClientCapabilities = init_params.capabilities; /// let server_capabilities = ServerCapabilities::default(); /// /// let initialize_data = serde_json::json!({ /// "capabilities": server_capabilities, /// "serverInfo": { /// "name": "lsp-server-test", /// "version": "0.1" /// } /// }); /// /// connection.initialize_finish(id, initialize_data)?; /// /// // ... Run main loop ... /// /// Ok(()) /// } /// ``` pub fn initialize_start(&self) -> Result<(RequestId, serde_json::Value), ProtocolError> { self.initialize_start_while(|| true) } /// Starts the initialization process by waiting for an initialize as described in /// [`Self::initialize_start`] as long as `running` returns /// `true` while the return value can be changed through a sig handler such as `CTRL + C`. /// /// # Example /// /// ```rust /// use std::sync::atomic::{AtomicBool, Ordering}; /// use std::sync::Arc; /// # use std::error::Error; /// # use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities}; /// # use lsp_server::{Connection, Message, Request, RequestId, Response}; /// # fn main() -> Result<(), Box> { /// let running = Arc::new(AtomicBool::new(true)); /// # running.store(true, Ordering::SeqCst); /// let r = running.clone(); /// /// ctrlc::set_handler(move || { /// r.store(false, Ordering::SeqCst); /// }).expect("Error setting Ctrl-C handler"); /// /// let (connection, io_threads) = Connection::stdio(); /// /// let res = connection.initialize_start_while(|| running.load(Ordering::SeqCst)); /// # assert!(res.is_err()); /// /// # Ok(()) /// # } /// ``` pub fn initialize_start_while( &self, running: C, ) -> Result<(RequestId, serde_json::Value), ProtocolError> where C: Fn() -> bool, { while running() { let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) { Ok(msg) => msg, Err(RecvTimeoutError::Timeout) => { continue; } Err(RecvTimeoutError::Disconnected) => return Err(ProtocolError::disconnected()), }; match msg { Message::Request(req) if req.is_initialize() => return Ok((req.id, req.params)), // Respond to non-initialize requests with ServerNotInitialized Message::Request(req) => { let resp = Response::new_err( req.id.clone(), ErrorCode::ServerNotInitialized as i32, format!("expected initialize request, got {req:?}"), ); self.sender.send(resp.into()).unwrap(); continue; } Message::Notification(n) if !n.is_exit() => { continue; } msg => { return Err(ProtocolError::new(format!( "expected initialize request, got {msg:?}" ))); } }; } Err(ProtocolError::new(String::from( "Initialization has been aborted during initialization", ))) } /// Finishes the initialization process by sending an `InitializeResult` to the client pub fn initialize_finish( &self, initialize_id: RequestId, initialize_result: serde_json::Value, ) -> Result<(), ProtocolError> { let resp = Response::new_ok(initialize_id, initialize_result); self.sender.send(resp.into()).unwrap(); match &self.receiver.recv() { Ok(Message::Notification(n)) if n.is_initialized() => Ok(()), Ok(msg) => Err(ProtocolError::new(format!( r#"expected initialized notification, got: {msg:?}"# ))), Err(RecvError) => Err(ProtocolError::disconnected()), } } /// Finishes the initialization process as described in [`Self::initialize_finish`] as /// long as `running` returns `true` while the return value can be changed through a sig /// handler such as `CTRL + C`. pub fn initialize_finish_while( &self, initialize_id: RequestId, initialize_result: serde_json::Value, running: C, ) -> Result<(), ProtocolError> where C: Fn() -> bool, { let resp = Response::new_ok(initialize_id, initialize_result); self.sender.send(resp.into()).unwrap(); while running() { let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) { Ok(msg) => msg, Err(RecvTimeoutError::Timeout) => { continue; } Err(RecvTimeoutError::Disconnected) => { return Err(ProtocolError::disconnected()); } }; match msg { Message::Notification(n) if n.is_initialized() => { return Ok(()); } msg => { return Err(ProtocolError::new(format!( r#"expected initialized notification, got: {msg:?}"# ))); } } } Err(ProtocolError::new(String::from( "Initialization has been aborted during initialization", ))) } /// Initialize the connection. Sends the server capabilities /// to the client and returns the serialized client capabilities /// on success. If more fine-grained initialization is required use /// `initialize_start`/`initialize_finish`. /// /// # Example /// /// ```no_run /// use std::error::Error; /// use lsp_types::ServerCapabilities; /// /// use lsp_server::{Connection, Message, Request, RequestId, Response}; /// /// fn main() -> Result<(), Box> { /// // Create the transport. Includes the stdio (stdin and stdout) versions but this could /// // also be implemented to use sockets or HTTP. /// let (connection, io_threads) = Connection::stdio(); /// /// // Run the server /// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap(); /// let initialization_params = connection.initialize(server_capabilities)?; /// /// // ... Run main loop ... /// /// Ok(()) /// } /// ``` pub fn initialize( &self, server_capabilities: serde_json::Value, ) -> Result { let (id, params) = self.initialize_start()?; let initialize_data = serde_json::json!({ "capabilities": server_capabilities, }); self.initialize_finish(id, initialize_data)?; Ok(params) } /// Initialize the connection as described in [`Self::initialize`] as long as `running` returns /// `true` while the return value can be changed through a sig handler such as `CTRL + C`. /// /// # Example /// /// ```rust /// use std::sync::atomic::{AtomicBool, Ordering}; /// use std::sync::Arc; /// # use std::error::Error; /// # use lsp_types::ServerCapabilities; /// # use lsp_server::{Connection, Message, Request, RequestId, Response}; /// /// # fn main() -> Result<(), Box> { /// let running = Arc::new(AtomicBool::new(true)); /// # running.store(true, Ordering::SeqCst); /// let r = running.clone(); /// /// ctrlc::set_handler(move || { /// r.store(false, Ordering::SeqCst); /// }).expect("Error setting Ctrl-C handler"); /// /// let (connection, io_threads) = Connection::stdio(); /// /// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap(); /// let initialization_params = connection.initialize_while( /// server_capabilities, /// || running.load(Ordering::SeqCst) /// ); /// /// # assert!(initialization_params.is_err()); /// # Ok(()) /// # } /// ``` pub fn initialize_while( &self, server_capabilities: serde_json::Value, running: C, ) -> Result where C: Fn() -> bool, { let (id, params) = self.initialize_start_while(&running)?; let initialize_data = serde_json::json!({ "capabilities": server_capabilities, }); self.initialize_finish_while(id, initialize_data, running)?; Ok(params) } /// If `req` is `Shutdown`, respond to it and return `true`, otherwise return `false` pub fn handle_shutdown(&self, req: &Request) -> Result { if !req.is_shutdown() { return Ok(false); } let resp = Response::new_ok(req.id.clone(), ()); let _ = self.sender.send(resp.into()); match &self.receiver.recv_timeout(std::time::Duration::from_secs(30)) { Ok(Message::Notification(n)) if n.is_exit() => (), Ok(msg) => { return Err(ProtocolError::new(format!( "unexpected message during shutdown: {msg:?}" ))) } Err(RecvTimeoutError::Timeout) => { return Err(ProtocolError::new( "timed out waiting for exit notification".to_string(), )) } Err(RecvTimeoutError::Disconnected) => { return Err(ProtocolError::new( "channel disconnected waiting for exit notification".to_string(), )) } } Ok(true) } } #[cfg(test)] mod tests { use crossbeam_channel::unbounded; use lsp_types::notification::{Exit, Initialized, Notification}; use lsp_types::request::{Initialize, Request}; use lsp_types::{InitializeParams, InitializedParams}; use serde_json::to_value; use crate::{Connection, Message, ProtocolError, RequestId}; struct TestCase { test_messages: Vec, expected_resp: Result<(RequestId, serde_json::Value), ProtocolError>, } fn initialize_start_test(test_case: TestCase) { let (reader_sender, reader_receiver) = unbounded::(); let (writer_sender, writer_receiver) = unbounded::(); let conn = Connection { sender: writer_sender, receiver: reader_receiver }; for msg in test_case.test_messages { assert!(reader_sender.send(msg).is_ok()); } let resp = conn.initialize_start(); assert_eq!(test_case.expected_resp, resp); assert!(writer_receiver.recv_timeout(std::time::Duration::from_secs(1)).is_err()); } #[test] fn not_exit_notification() { let notification = crate::Notification { method: Initialized::METHOD.to_string(), params: to_value(InitializedParams {}).unwrap(), }; let params_as_value = to_value(InitializeParams::default()).unwrap(); let req_id = RequestId::from(234); let request = crate::Request { id: req_id.clone(), method: Initialize::METHOD.to_string(), params: params_as_value.clone(), }; initialize_start_test(TestCase { test_messages: vec![notification.into(), request.into()], expected_resp: Ok((req_id, params_as_value)), }); } #[test] fn exit_notification() { let notification = crate::Notification { method: Exit::METHOD.to_string(), params: to_value(()).unwrap() }; let notification_msg = Message::from(notification); initialize_start_test(TestCase { test_messages: vec![notification_msg.clone()], expected_resp: Err(ProtocolError::new(format!( "expected initialize request, got {:?}", notification_msg ))), }); } }