nushell/crates/nu-plugin-engine/src/persistent.rs
Ian Manske 03ee54a4df
Fix try not working with let, etc. (#13885)
# Description
Partialy addresses #13868. `try` does not catch non-zero exit code
errors from the last command in a pipeline if the result is assigned to
a variable using `let` (or `mut`).

This was fixed by adding a new `OutDest::Value` case. This is used when
the pipeline is in a "value" position. I.e., it will be collected into a
value. This ended up replacing most of the usages of `OutDest::Capture`.
So, this PR also renames `OutDest::Capture` to `OutDest::PipeSeparate`
to better fit the few remaining use cases for it.

# User-Facing Changes
Bug fix.

# Tests + Formatting
Added two tests.
2024-09-23 06:44:25 -05:00

364 lines
13 KiB
Rust

use crate::{
init::{create_command, make_plugin_interface},
PluginGc,
};
use super::{PluginInterface, PluginSource};
use nu_plugin_core::CommunicationMode;
use nu_protocol::{
engine::{EngineState, Stack},
HandlerGuard, Handlers, PluginGcConfig, PluginIdentity, PluginMetadata, RegisteredPlugin,
ShellError,
};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
/// A box that can keep a plugin that was spawned persistent for further uses. The plugin may or
/// may not be currently running. [`.get()`] gets the currently running plugin, or spawns it if it's
/// not running.
#[derive(Debug)]
pub struct PersistentPlugin {
/// Identity (filename, shell, name) of the plugin
identity: PluginIdentity,
/// Mutable state
mutable: Mutex<MutableState>,
}
/// The mutable state for the persistent plugin. This should all be behind one lock to prevent lock
/// order problems.
#[derive(Debug)]
struct MutableState {
/// Reference to the plugin if running
running: Option<RunningPlugin>,
/// Metadata for the plugin, e.g. version.
metadata: Option<PluginMetadata>,
/// Plugin's preferred communication mode (if known)
preferred_mode: Option<PreferredCommunicationMode>,
/// Garbage collector config
gc_config: PluginGcConfig,
/// RAII guard for this plugin's signal handler
signal_guard: Option<HandlerGuard>,
}
#[derive(Debug, Clone, Copy)]
enum PreferredCommunicationMode {
Stdio,
#[cfg(feature = "local-socket")]
LocalSocket,
}
#[derive(Debug)]
struct RunningPlugin {
/// Interface (which can be cloned) to the running plugin
interface: PluginInterface,
/// Garbage collector for the plugin
gc: PluginGc,
}
impl PersistentPlugin {
/// Create a new persistent plugin. The plugin will not be spawned immediately.
pub fn new(identity: PluginIdentity, gc_config: PluginGcConfig) -> PersistentPlugin {
PersistentPlugin {
identity,
mutable: Mutex::new(MutableState {
running: None,
metadata: None,
preferred_mode: None,
gc_config,
signal_guard: None,
}),
}
}
/// Get the plugin interface of the running plugin, or spawn it if it's not currently running.
///
/// Will call `envs` to get environment variables to spawn the plugin if the plugin needs to be
/// spawned.
pub fn get(
self: Arc<Self>,
envs: impl FnOnce() -> Result<HashMap<String, String>, ShellError>,
) -> Result<PluginInterface, ShellError> {
let mut mutable = self.mutable.lock().map_err(|_| ShellError::NushellFailed {
msg: format!(
"plugin `{}` mutex poisoned, probably panic during spawn",
self.identity.name()
),
})?;
if let Some(ref running) = mutable.running {
// It exists, so just clone the interface
Ok(running.interface.clone())
} else {
// Try to spawn. On success, `mutable.running` should have been set to the new running
// plugin by `spawn()` so we just then need to clone the interface from there.
//
// We hold the lock the whole time to prevent others from trying to spawn and ending
// up with duplicate plugins
//
// TODO: We should probably store the envs somewhere, in case we have to launch without
// envs (e.g. from a custom value)
let envs = envs()?;
let result = self.clone().spawn(&envs, &mut mutable);
// Check if we were using an alternate communication mode and may need to fall back to
// stdio.
if result.is_err()
&& !matches!(
mutable.preferred_mode,
Some(PreferredCommunicationMode::Stdio)
)
{
log::warn!("{}: Trying again with stdio communication because mode {:?} failed with {result:?}",
self.identity.name(),
mutable.preferred_mode);
// Reset to stdio and try again, but this time don't catch any error
mutable.preferred_mode = Some(PreferredCommunicationMode::Stdio);
self.clone().spawn(&envs, &mut mutable)?;
}
Ok(mutable
.running
.as_ref()
.ok_or_else(|| ShellError::NushellFailed {
msg: "spawn() succeeded but didn't set interface".into(),
})?
.interface
.clone())
}
}
/// Run the plugin command, then set up and set `mutable.running` to the new running plugin.
fn spawn(
self: Arc<Self>,
envs: &HashMap<String, String>,
mutable: &mut MutableState,
) -> Result<(), ShellError> {
// Make sure `running` is set to None to begin
if let Some(running) = mutable.running.take() {
// Stop the GC if there was a running plugin
running.gc.stop_tracking();
}
let source_file = self.identity.filename();
// Determine the mode to use based on the preferred mode
let mode = match mutable.preferred_mode {
// If not set, we try stdio first and then might retry if another mode is supported
Some(PreferredCommunicationMode::Stdio) | None => CommunicationMode::Stdio,
// Local socket only if enabled
#[cfg(feature = "local-socket")]
Some(PreferredCommunicationMode::LocalSocket) => {
CommunicationMode::local_socket(source_file)
}
};
let mut plugin_cmd = create_command(source_file, self.identity.shell(), &mode);
// We need the current environment variables for `python` based plugins
// Or we'll likely have a problem when a plugin is implemented in a virtual Python environment.
plugin_cmd.envs(envs);
let program_name = plugin_cmd.get_program().to_os_string().into_string();
// Before running the command, prepare communication
let comm = mode.serve()?;
// Run the plugin command
let child = plugin_cmd.spawn().map_err(|err| {
let error_msg = match err.kind() {
std::io::ErrorKind::NotFound => match program_name {
Ok(prog_name) => {
format!("Can't find {prog_name}, please make sure that {prog_name} is in PATH.")
}
_ => {
format!("Error spawning child process: {err}")
}
},
_ => {
format!("Error spawning child process: {err}")
}
};
ShellError::PluginFailedToLoad { msg: error_msg }
})?;
// Start the plugin garbage collector
let gc = PluginGc::new(mutable.gc_config.clone(), &self)?;
let pid = child.id();
let interface = make_plugin_interface(
child,
comm,
Arc::new(PluginSource::new(self.clone())),
Some(pid),
Some(gc.clone()),
)?;
// If our current preferred mode is None, check to see if the plugin might support another
// mode. If so, retry spawn() with that mode
#[cfg(feature = "local-socket")]
if mutable.preferred_mode.is_none()
&& interface
.protocol_info()?
.supports_feature(&nu_plugin_protocol::Feature::LocalSocket)
{
log::trace!(
"{}: Attempting to upgrade to local socket mode",
self.identity.name()
);
// Stop the GC we just created from tracking so that we don't accidentally try to
// stop the new plugin
gc.stop_tracking();
// Set the mode and try again
mutable.preferred_mode = Some(PreferredCommunicationMode::LocalSocket);
return self.spawn(envs, mutable);
}
mutable.running = Some(RunningPlugin { interface, gc });
Ok(())
}
fn stop_internal(&self, reset: bool) -> Result<(), ShellError> {
let mut mutable = self.mutable.lock().map_err(|_| ShellError::NushellFailed {
msg: format!(
"plugin `{}` mutable mutex poisoned, probably panic during spawn",
self.identity.name()
),
})?;
// If the plugin is running, stop its GC, so that the GC doesn't accidentally try to stop
// a future plugin
if let Some(ref running) = mutable.running {
running.gc.stop_tracking();
}
// We don't try to kill the process or anything, we just drop the RunningPlugin. It should
// exit soon after
mutable.running = None;
// If this is a reset, we should also reset other learned attributes like preferred_mode
if reset {
mutable.preferred_mode = None;
}
Ok(())
}
}
impl RegisteredPlugin for PersistentPlugin {
fn identity(&self) -> &PluginIdentity {
&self.identity
}
fn is_running(&self) -> bool {
// If the lock is poisoned, we return false here. That may not be correct, but this is a
// failure state anyway that would be noticed at some point
self.mutable
.lock()
.map(|m| m.running.is_some())
.unwrap_or(false)
}
fn pid(&self) -> Option<u32> {
// Again, we return None for a poisoned lock.
self.mutable
.lock()
.ok()
.and_then(|r| r.running.as_ref().and_then(|r| r.interface.pid()))
}
fn stop(&self) -> Result<(), ShellError> {
self.stop_internal(false)
}
fn reset(&self) -> Result<(), ShellError> {
self.stop_internal(true)
}
fn metadata(&self) -> Option<PluginMetadata> {
self.mutable.lock().ok().and_then(|m| m.metadata.clone())
}
fn set_metadata(&self, metadata: Option<PluginMetadata>) {
if let Ok(mut mutable) = self.mutable.lock() {
mutable.metadata = metadata;
}
}
fn set_gc_config(&self, gc_config: &PluginGcConfig) {
if let Ok(mut mutable) = self.mutable.lock() {
// Save the new config for future calls
mutable.gc_config = gc_config.clone();
// If the plugin is already running, propagate the config change to the running GC
if let Some(gc) = mutable.running.as_ref().map(|running| running.gc.clone()) {
// We don't want to get caught holding the lock
drop(mutable);
gc.set_config(gc_config.clone());
gc.flush();
}
}
}
fn as_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync> {
self
}
fn configure_signal_handler(self: Arc<Self>, handlers: &Handlers) -> Result<(), ShellError> {
let guard = {
// We take a weakref to the plugin so that we don't create a cycle to the
// RAII guard that will be stored on the plugin.
let plugin = Arc::downgrade(&self);
handlers.register(Box::new(move |action| {
// write a signal packet through the PluginInterface if the plugin is alive and
// running
if let Some(plugin) = plugin.upgrade() {
if let Ok(mutable) = plugin.mutable.lock() {
if let Some(ref running) = mutable.running {
let _ = running.interface.signal(action);
}
}
}
}))?
};
if let Ok(mut mutable) = self.mutable.lock() {
mutable.signal_guard = Some(guard);
}
Ok(())
}
}
/// Anything that can produce a plugin interface.
pub trait GetPlugin: RegisteredPlugin {
/// Retrieve or spawn a [`PluginInterface`]. The `context` may be used for determining
/// environment variables to launch the plugin with.
fn get_plugin(
self: Arc<Self>,
context: Option<(&EngineState, &mut Stack)>,
) -> Result<PluginInterface, ShellError>;
}
impl GetPlugin for PersistentPlugin {
fn get_plugin(
self: Arc<Self>,
mut context: Option<(&EngineState, &mut Stack)>,
) -> Result<PluginInterface, ShellError> {
self.get(|| {
// Get envs from the context if provided.
let envs = context
.as_mut()
.map(|(engine_state, stack)| {
// We need the current environment variables for `python` based plugins. Or
// we'll likely have a problem when a plugin is implemented in a virtual Python
// environment.
let stack = &mut stack.start_collect_value();
nu_engine::env::env_to_strings(engine_state, stack)
})
.transpose()?;
Ok(envs.unwrap_or_default())
})
}
}