1742: cleanup main loop r=matklad a=matklad



Co-authored-by: Aleksey Kladov <aleksey.kladov@gmail.com>
This commit is contained in:
bors[bot] 2019-08-31 11:48:00 +00:00 committed by GitHub
commit f2a200c1ee
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 172 additions and 221 deletions

View file

@ -1,10 +1,7 @@
use crate::{
project_model::{self, TargetKind},
world::WorldSnapshot,
Result,
};
use ra_ide_api::{FileId, RunnableKind}; use ra_ide_api::{FileId, RunnableKind};
use ra_project_model::{self, ProjectWorkspace, TargetKind};
use crate::{world::WorldSnapshot, Result};
pub(crate) fn runnable_args( pub(crate) fn runnable_args(
world: &WorldSnapshot, world: &WorldSnapshot,
@ -66,7 +63,7 @@ impl CargoTargetSpec {
let file_id = world.analysis().crate_root(crate_id)?; let file_id = world.analysis().crate_root(crate_id)?;
let path = world.vfs.read().file2path(ra_vfs::VfsFile(file_id.0)); let path = world.vfs.read().file2path(ra_vfs::VfsFile(file_id.0));
let res = world.workspaces.iter().find_map(|ws| match ws { let res = world.workspaces.iter().find_map(|ws| match ws {
project_model::ProjectWorkspace::Cargo { cargo, .. } => { ProjectWorkspace::Cargo { cargo, .. } => {
let tgt = cargo.target_by_root(&path)?; let tgt = cargo.target_by_root(&path)?;
Some(CargoTargetSpec { Some(CargoTargetSpec {
package: tgt.package(&cargo).name(&cargo).to_string(), package: tgt.package(&cargo).name(&cargo).to_string(),
@ -74,7 +71,7 @@ impl CargoTargetSpec {
target_kind: tgt.kind(&cargo), target_kind: tgt.kind(&cargo),
}) })
} }
project_model::ProjectWorkspace::Json { .. } => None, ProjectWorkspace::Json { .. } => None,
}); });
Ok(res) Ok(res)
} }

View file

@ -4,11 +4,9 @@ mod cargo_target_spec;
mod conv; mod conv;
mod main_loop; mod main_loop;
mod markdown; mod markdown;
mod project_model;
pub mod req; pub mod req;
pub mod config; pub mod config;
mod world; mod world;
mod thread_worker;
pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
pub use crate::{ pub use crate::{

View file

@ -5,37 +5,37 @@ use ra_lsp_server::{show_message, Result, ServerConfig};
use ra_prof; use ra_prof;
fn main() -> Result<()> { fn main() -> Result<()> {
setup_logging()?;
run_server()?;
Ok(())
}
fn setup_logging() -> Result<()> {
std::env::set_var("RUST_BACKTRACE", "short"); std::env::set_var("RUST_BACKTRACE", "short");
let logger = Logger::with_env_or_str("error").duplicate_to_stderr(Duplicate::All); let logger = Logger::with_env_or_str("error").duplicate_to_stderr(Duplicate::All);
match std::env::var("RA_LOG_DIR") { match std::env::var("RA_LOG_DIR") {
Ok(ref v) if v == "1" => logger.log_to_file().directory("log").start()?, Ok(ref v) if v == "1" => logger.log_to_file().directory("log").start()?,
_ => logger.start()?, _ => logger.start()?,
}; };
ra_prof::set_filter(match std::env::var("RA_PROFILE") { ra_prof::set_filter(match std::env::var("RA_PROFILE") {
Ok(spec) => ra_prof::Filter::from_spec(&spec), Ok(spec) => ra_prof::Filter::from_spec(&spec),
Err(_) => ra_prof::Filter::disabled(), Err(_) => ra_prof::Filter::disabled(),
}); });
log::info!("lifecycle: server started"); Ok(())
match std::panic::catch_unwind(main_inner) {
Ok(res) => {
log::info!("lifecycle: terminating process with {:?}", res);
res
}
Err(_) => {
log::error!("server panicked");
Err("server panicked")?
}
}
} }
fn main_inner() -> Result<()> { fn run_server() -> Result<()> {
let cwd = std::env::current_dir()?; log::info!("lifecycle: server started");
let (connection, io_threads) = Connection::stdio(); let (connection, io_threads) = Connection::stdio();
let server_capabilities = serde_json::to_value(ra_lsp_server::server_capabilities()).unwrap(); let server_capabilities = serde_json::to_value(ra_lsp_server::server_capabilities()).unwrap();
let initialize_params = connection.initialize(server_capabilities)?; let initialize_params = connection.initialize(server_capabilities)?;
let initialize_params: lsp_types::InitializeParams = serde_json::from_value(initialize_params)?; let initialize_params: lsp_types::InitializeParams = serde_json::from_value(initialize_params)?;
let cwd = std::env::current_dir()?;
let root = initialize_params.root_uri.and_then(|it| it.to_file_path().ok()).unwrap_or(cwd); let root = initialize_params.root_uri.and_then(|it| it.to_file_path().ok()).unwrap_or(cwd);
let workspace_roots = initialize_params let workspace_roots = initialize_params

View file

@ -4,12 +4,13 @@ pub(crate) mod pending_requests;
use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Instant}; use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Instant};
use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender}; use crossbeam_channel::{select, unbounded, RecvError, Sender};
use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
use lsp_types::{ClientCapabilities, NumberOrString}; use lsp_types::{ClientCapabilities, NumberOrString};
use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData}; use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId};
use ra_prof::profile; use ra_prof::profile;
use ra_vfs::VfsTask; use ra_vfs::VfsTask;
use relative_path::RelativePathBuf;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use threadpool::ThreadPool; use threadpool::ThreadPool;
@ -18,7 +19,6 @@ use crate::{
pending_requests::{PendingRequest, PendingRequests}, pending_requests::{PendingRequest, PendingRequests},
subscriptions::Subscriptions, subscriptions::Subscriptions,
}, },
project_model::workspace_loader,
req, req,
world::{Options, WorldSnapshot, WorldState}, world::{Options, WorldSnapshot, WorldState},
Result, ServerConfig, Result, ServerConfig,
@ -54,14 +54,17 @@ pub fn main_loop(
connection: &Connection, connection: &Connection,
) -> Result<()> { ) -> Result<()> {
log::info!("server_config: {:#?}", config); log::info!("server_config: {:#?}", config);
// FIXME: support dynamic workspace loading. // FIXME: support dynamic workspace loading.
let workspaces = { let workspaces = {
let ws_worker = workspace_loader(config.with_sysroot);
let mut loaded_workspaces = Vec::new(); let mut loaded_workspaces = Vec::new();
for ws_root in &ws_roots { for ws_root in &ws_roots {
ws_worker.sender().send(ws_root.clone()).unwrap(); let workspace = ra_project_model::ProjectWorkspace::discover_with_sysroot(
match ws_worker.receiver().recv().unwrap() { ws_root.as_path(),
Ok(ws) => loaded_workspaces.push(ws), config.with_sysroot,
);
match workspace {
Ok(workspace) => loaded_workspaces.push(workspace),
Err(e) => { Err(e) => {
log::error!("loading workspace failed: {}", e); log::error!("loading workspace failed: {}", e);
@ -75,11 +78,13 @@ pub fn main_loop(
} }
loaded_workspaces loaded_workspaces
}; };
let globs = config let globs = config
.exclude_globs .exclude_globs
.iter() .iter()
.map(|glob| ra_vfs_glob::Glob::new(glob)) .map(|glob| ra_vfs_glob::Glob::new(glob))
.collect::<std::result::Result<Vec<_>, _>>()?; .collect::<std::result::Result<Vec<_>, _>>()?;
let feature_flags = { let feature_flags = {
let mut ff = FeatureFlags::default(); let mut ff = FeatureFlags::default();
for (flag, value) in config.feature_flags { for (flag, value) in config.feature_flags {
@ -95,7 +100,8 @@ pub fn main_loop(
ff ff
}; };
log::info!("feature_flags: {:#?}", feature_flags); log::info!("feature_flags: {:#?}", feature_flags);
let mut state = WorldState::new(
let mut world_state = WorldState::new(
ws_roots, ws_roots,
workspaces, workspaces,
config.lru_capacity, config.lru_capacity,
@ -113,31 +119,58 @@ pub fn main_loop(
let pool = ThreadPool::new(THREADPOOL_SIZE); let pool = ThreadPool::new(THREADPOOL_SIZE);
let (task_sender, task_receiver) = unbounded::<Task>(); let (task_sender, task_receiver) = unbounded::<Task>();
let mut pending_requests = PendingRequests::default(); let (libdata_sender, libdata_receiver) = unbounded::<LibraryData>();
let mut loop_state = LoopState::default();
log::info!("server initialized, serving requests"); log::info!("server initialized, serving requests");
let main_res = main_loop_inner( {
&pool, let task_sender = task_sender;
connection, let libdata_sender = libdata_sender;
task_sender, loop {
task_receiver.clone(), log::trace!("selecting");
&mut state, let event = select! {
&mut pending_requests, recv(&connection.receiver) -> msg => match msg {
); Ok(msg) => Event::Msg(msg),
Err(RecvError) => Err("client exited without shutdown")?,
},
recv(task_receiver) -> task => Event::Task(task.unwrap()),
recv(world_state.task_receiver) -> task => match task {
Ok(task) => Event::Vfs(task),
Err(RecvError) => Err("vfs died")?,
},
recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
};
if let Event::Msg(Message::Request(req)) = &event {
if connection.handle_shutdown(&req)? {
break;
};
}
loop_turn(
&pool,
&task_sender,
&libdata_sender,
connection,
&mut world_state,
&mut loop_state,
event,
)?;
}
}
log::info!("waiting for tasks to finish..."); log::info!("waiting for tasks to finish...");
task_receiver task_receiver.into_iter().for_each(|task| {
.into_iter() on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state)
.for_each(|task| on_task(task, &connection.sender, &mut pending_requests, &mut state)); });
libdata_receiver.into_iter().for_each(|lib| drop(lib));
log::info!("...tasks have finished"); log::info!("...tasks have finished");
log::info!("joining threadpool..."); log::info!("joining threadpool...");
drop(pool); drop(pool);
log::info!("...threadpool has finished"); log::info!("...threadpool has finished");
let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead"); let vfs = Arc::try_unwrap(world_state.vfs).expect("all snapshots should be dead");
drop(vfs); drop(vfs);
main_res Ok(())
} }
#[derive(Debug)] #[derive(Debug)]
@ -192,121 +225,113 @@ impl fmt::Debug for Event {
} }
} }
fn main_loop_inner( #[derive(Debug, Default)]
pool: &ThreadPool, struct LoopState {
connection: &Connection, pending_requests: PendingRequests,
task_sender: Sender<Task>, subscriptions: Subscriptions,
task_receiver: Receiver<Task>,
state: &mut WorldState,
pending_requests: &mut PendingRequests,
) -> Result<()> {
let mut subs = Subscriptions::default();
// We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same
// time to always have a thread ready to react to input. // time to always have a thread ready to react to input.
let mut in_flight_libraries = 0; in_flight_libraries: usize,
let mut pending_libraries = Vec::new(); pending_libraries: Vec<(SourceRootId, Vec<(FileId, RelativePathBuf, Arc<String>)>)>,
let mut send_workspace_notification = true; workspace_loaded: bool,
}
let (libdata_sender, libdata_receiver) = unbounded(); fn loop_turn(
loop { pool: &ThreadPool,
log::trace!("selecting"); task_sender: &Sender<Task>,
let event = select! { libdata_sender: &Sender<LibraryData>,
recv(&connection.receiver) -> msg => match msg { connection: &Connection,
Ok(msg) => Event::Msg(msg), world_state: &mut WorldState,
Err(RecvError) => Err("client exited without shutdown")?, loop_state: &mut LoopState,
}, event: Event,
recv(task_receiver) -> task => Event::Task(task.unwrap()), ) -> Result<()> {
recv(state.task_receiver) -> task => match task { let loop_start = Instant::now();
Ok(task) => Event::Vfs(task),
Err(RecvError) => Err("vfs died")?,
},
recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
};
let loop_start = Instant::now();
// NOTE: don't count blocking select! call as a loop-turn time // NOTE: don't count blocking select! call as a loop-turn time
let _p = profile("main_loop_inner/loop-turn"); let _p = profile("main_loop_inner/loop-turn");
log::info!("loop turn = {:?}", event); log::info!("loop turn = {:?}", event);
let queue_count = pool.queued_count(); let queue_count = pool.queued_count();
if queue_count > 0 { if queue_count > 0 {
log::info!("queued count = {}", queue_count); log::info!("queued count = {}", queue_count);
}
let mut state_changed = false;
match event {
Event::Task(task) => {
on_task(task, &connection.sender, &mut loop_state.pending_requests, world_state);
world_state.maybe_collect_garbage();
} }
Event::Vfs(task) => {
let mut state_changed = false; world_state.vfs.write().handle_task(task);
match event { state_changed = true;
Event::Task(task) => { }
on_task(task, &connection.sender, pending_requests, state); Event::Lib(lib) => {
state.maybe_collect_garbage(); world_state.add_lib(lib);
} world_state.maybe_collect_garbage();
Event::Vfs(task) => { loop_state.in_flight_libraries -= 1;
state.vfs.write().handle_task(task); }
Event::Msg(msg) => match msg {
Message::Request(req) => on_request(
world_state,
&mut loop_state.pending_requests,
pool,
task_sender,
&connection.sender,
loop_start,
req,
)?,
Message::Notification(not) => {
on_notification(
&connection.sender,
world_state,
&mut loop_state.pending_requests,
&mut loop_state.subscriptions,
not,
)?;
state_changed = true; state_changed = true;
} }
Event::Lib(lib) => { Message::Response(resp) => log::error!("unexpected response: {:?}", resp),
state.add_lib(lib); },
state.maybe_collect_garbage(); };
in_flight_libraries -= 1;
}
Event::Msg(msg) => match msg {
Message::Request(req) => {
if connection.handle_shutdown(&req)? {
return Ok(());
};
on_request(
state,
pending_requests,
pool,
&task_sender,
&connection.sender,
loop_start,
req,
)?
}
Message::Notification(not) => {
on_notification(&connection.sender, state, pending_requests, &mut subs, not)?;
state_changed = true;
}
Message::Response(resp) => log::error!("unexpected response: {:?}", resp),
},
};
pending_libraries.extend(state.process_changes()); loop_state.pending_libraries.extend(world_state.process_changes());
while in_flight_libraries < MAX_IN_FLIGHT_LIBS && !pending_libraries.is_empty() { while loop_state.in_flight_libraries < MAX_IN_FLIGHT_LIBS
let (root, files) = pending_libraries.pop().unwrap(); && !loop_state.pending_libraries.is_empty()
in_flight_libraries += 1; {
let sender = libdata_sender.clone(); let (root, files) = loop_state.pending_libraries.pop().unwrap();
pool.execute(move || { loop_state.in_flight_libraries += 1;
log::info!("indexing {:?} ... ", root); let sender = libdata_sender.clone();
let _p = profile(&format!("indexed {:?}", root)); pool.execute(move || {
let data = LibraryData::prepare(root, files); log::info!("indexing {:?} ... ", root);
sender.send(data).unwrap(); let _p = profile(&format!("indexed {:?}", root));
}); let data = LibraryData::prepare(root, files);
} sender.send(data).unwrap();
});
}
if send_workspace_notification if !loop_state.workspace_loaded
&& state.roots_to_scan == 0 && world_state.roots_to_scan == 0
&& pending_libraries.is_empty() && loop_state.pending_libraries.is_empty()
&& in_flight_libraries == 0 && loop_state.in_flight_libraries == 0
{ {
let n_packages: usize = state.workspaces.iter().map(|it| it.n_packages()).sum(); loop_state.workspace_loaded = true;
if state.feature_flags().get("notifications.workspace-loaded") { let n_packages: usize = world_state.workspaces.iter().map(|it| it.n_packages()).sum();
let msg = format!("workspace loaded, {} rust packages", n_packages); if world_state.feature_flags().get("notifications.workspace-loaded") {
show_message(req::MessageType::Info, msg, &connection.sender); let msg = format!("workspace loaded, {} rust packages", n_packages);
} show_message(req::MessageType::Info, msg, &connection.sender);
// Only send the notification first time
send_workspace_notification = false;
}
if state_changed {
update_file_notifications_on_threadpool(
pool,
state.snapshot(),
state.options.publish_decorations,
task_sender.clone(),
subs.subscriptions(),
)
} }
} }
if state_changed {
update_file_notifications_on_threadpool(
pool,
world_state.snapshot(),
world_state.options.publish_decorations,
task_sender.clone(),
loop_state.subscriptions.subscriptions(),
)
}
Ok(())
} }
fn on_task( fn on_task(

View file

@ -1,7 +1,7 @@
use ra_ide_api::FileId; use ra_ide_api::FileId;
use rustc_hash::FxHashSet; use rustc_hash::FxHashSet;
#[derive(Default)] #[derive(Default, Debug)]
pub(crate) struct Subscriptions { pub(crate) struct Subscriptions {
subs: FxHashSet<FileId>, subs: FxHashSet<FileId>,
} }

View file

@ -1,21 +0,0 @@
use std::path::PathBuf;
use crate::{thread_worker::Worker, Result};
pub use ra_project_model::{
CargoWorkspace, Package, ProjectWorkspace, Sysroot, Target, TargetKind,
};
pub fn workspace_loader(with_sysroot: bool) -> Worker<PathBuf, Result<ProjectWorkspace>> {
Worker::<PathBuf, Result<ProjectWorkspace>>::spawn(
"workspace loader",
1,
move |input_receiver, output_sender| {
input_receiver
.into_iter()
.map(|path| ProjectWorkspace::discover_with_sysroot(path.as_path(), with_sysroot))
.try_for_each(|it| output_sender.send(it))
.unwrap()
},
)
}

View file

@ -1,49 +0,0 @@
//! Small utility to correctly spawn crossbeam-channel based worker threads.
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
/// A wrapper around event-processing thread with automatic shutdown semantics.
pub struct Worker<I, O> {
// XXX: field order is significant here.
//
// In Rust, fields are dropped in the declaration order, and we rely on this
// here. We must close input first, so that the `thread` (who holds the
// opposite side of the channel) noticed shutdown. Then, we must join the
// thread, but we must keep out alive so that the thread does not panic.
//
// Note that a potential problem here is that we might drop some messages
// from receiver on the floor. This is ok for rust-analyzer: we have only a
// single client, so, if we are shutting down, nobody is interested in the
// unfinished work anyway!
sender: Sender<I>,
_thread: jod_thread::JoinHandle<()>,
receiver: Receiver<O>,
}
impl<I, O> Worker<I, O> {
pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> Worker<I, O>
where
F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
I: Send + 'static,
O: Send + 'static,
{
// Set up worker channels in a deadlock-avoiding way. If one sets both input
// and output buffers to a fixed size, a worker might get stuck.
let (sender, input_receiver) = bounded::<I>(buf);
let (output_sender, receiver) = unbounded::<O>();
let _thread = jod_thread::Builder::new()
.name(name.to_string())
.spawn(move || f(input_receiver, output_sender))
.expect("failed to spawn a thread");
Worker { sender, _thread, receiver }
}
}
impl<I, O> Worker<I, O> {
pub fn sender(&self) -> &Sender<I> {
&self.sender
}
pub fn receiver(&self) -> &Receiver<O> {
&self.receiver
}
}

View file

@ -11,13 +11,13 @@ use ra_ide_api::{
Analysis, AnalysisChange, AnalysisHost, CrateGraph, FeatureFlags, FileId, LibraryData, Analysis, AnalysisChange, AnalysisHost, CrateGraph, FeatureFlags, FileId, LibraryData,
SourceRootId, SourceRootId,
}; };
use ra_project_model::ProjectWorkspace;
use ra_vfs::{LineEndings, RootEntry, Vfs, VfsChange, VfsFile, VfsRoot, VfsTask}; use ra_vfs::{LineEndings, RootEntry, Vfs, VfsChange, VfsFile, VfsRoot, VfsTask};
use ra_vfs_glob::{Glob, RustPackageFilterBuilder}; use ra_vfs_glob::{Glob, RustPackageFilterBuilder};
use relative_path::RelativePathBuf; use relative_path::RelativePathBuf;
use crate::{ use crate::{
main_loop::pending_requests::{CompletedRequest, LatestRequests}, main_loop::pending_requests::{CompletedRequest, LatestRequests},
project_model::ProjectWorkspace,
LspError, Result, LspError, Result,
}; };
@ -35,6 +35,7 @@ pub struct Options {
#[derive(Debug)] #[derive(Debug)]
pub struct WorldState { pub struct WorldState {
pub options: Options, pub options: Options,
//FIXME: this belongs to `LoopState` rather than to `WorldState`
pub roots_to_scan: usize, pub roots_to_scan: usize,
pub roots: Vec<PathBuf>, pub roots: Vec<PathBuf>,
pub workspaces: Arc<Vec<ProjectWorkspace>>, pub workspaces: Arc<Vec<ProjectWorkspace>>,