nushell/crates/nu-plugin/src/plugin/gc.rs
Ian Manske c747ec75c9
Add command_prelude module (#12291)
# Description
When implementing a `Command`, one must also import all the types
present in the function signatures for `Command`. This makes it so that
we often import the same set of types in each command implementation
file. E.g., something like this:
```rust
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
    record, Category, Example, IntoInterruptiblePipelineData, IntoPipelineData, PipelineData,
    ShellError, Signature, Span, Type, Value,
};
```

This PR adds the `nu_engine::command_prelude` module which contains the
necessary and commonly used types to implement a `Command`:
```rust
// command_prelude.rs
pub use crate::CallExt;
pub use nu_protocol::{
    ast::{Call, CellPath},
    engine::{Command, EngineState, Stack},
    record, Category, Example, IntoInterruptiblePipelineData, IntoPipelineData, IntoSpanned,
    PipelineData, Record, ShellError, Signature, Span, Spanned, SyntaxShape, Type, Value,
};
```

This should reduce the boilerplate needed to implement a command and
also gives us a place to track the breadth of the `Command` API. I tried
to be conservative with what went into the prelude modules, since it
might be hard/annoying to remove items from the prelude in the future.
Let me know if something should be included or excluded.
2024-03-26 21:17:30 +00:00

303 lines
10 KiB
Rust

use crate::PersistentPlugin;
use nu_protocol::{PluginGcConfig, RegisteredPlugin};
use std::{
sync::{mpsc, Arc, Weak},
thread,
time::{Duration, Instant},
};
/// Plugin garbage collector
///
/// Many users don't want all of their plugins to stay running indefinitely after using them, so
/// this runs a thread that monitors the plugin's usage and stops it automatically if it meets
/// certain conditions of inactivity.
#[derive(Debug, Clone)]
pub struct PluginGc {
sender: mpsc::Sender<PluginGcMsg>,
}
impl PluginGc {
/// Start a new plugin garbage collector. Returns an error if the thread failed to spawn.
pub fn new(
config: PluginGcConfig,
plugin: &Arc<PersistentPlugin>,
) -> std::io::Result<PluginGc> {
let (sender, receiver) = mpsc::channel();
let mut state = PluginGcState {
config,
last_update: None,
locks: 0,
disabled: false,
plugin: Arc::downgrade(plugin),
name: plugin.identity().name().to_owned(),
};
thread::Builder::new()
.name(format!("plugin gc ({})", plugin.identity().name()))
.spawn(move || state.run(receiver))?;
Ok(PluginGc { sender })
}
/// Update the garbage collector config
pub fn set_config(&self, config: PluginGcConfig) {
let _ = self.sender.send(PluginGcMsg::SetConfig(config));
}
/// Ensure all GC messages have been processed
pub fn flush(&self) {
let (tx, rx) = mpsc::channel();
let _ = self.sender.send(PluginGcMsg::Flush(tx));
// This will block until the channel is dropped, which could be because the send failed, or
// because the GC got the message
let _ = rx.recv();
}
/// Increment the number of locks held by the plugin
pub fn increment_locks(&self, amount: i64) {
let _ = self.sender.send(PluginGcMsg::AddLocks(amount));
}
/// Decrement the number of locks held by the plugin
pub fn decrement_locks(&self, amount: i64) {
let _ = self.sender.send(PluginGcMsg::AddLocks(-amount));
}
/// Set whether the GC is disabled by explicit request from the plugin. This is separate from
/// the `enabled` option in the config, and overrides that option.
pub fn set_disabled(&self, disabled: bool) {
let _ = self.sender.send(PluginGcMsg::SetDisabled(disabled));
}
/// Tell the GC to stop tracking the plugin. The plugin will not be stopped. The GC cannot be
/// reactivated after this request - a new one must be created instead.
pub fn stop_tracking(&self) {
let _ = self.sender.send(PluginGcMsg::StopTracking);
}
/// Tell the GC that the plugin exited so that it can remove it from the persistent plugin.
///
/// The reason the plugin tells the GC rather than just stopping itself via `source` is that
/// it can't guarantee that the plugin currently pointed to by `source` is itself, but if the
/// GC is still running, it hasn't received [`.stop_tracking()`] yet, which means it should be
/// the right plugin.
pub fn exited(&self) {
let _ = self.sender.send(PluginGcMsg::Exited);
}
}
#[derive(Debug)]
enum PluginGcMsg {
SetConfig(PluginGcConfig),
Flush(mpsc::Sender<()>),
AddLocks(i64),
SetDisabled(bool),
StopTracking,
Exited,
}
#[derive(Debug)]
struct PluginGcState {
config: PluginGcConfig,
last_update: Option<Instant>,
locks: i64,
disabled: bool,
plugin: Weak<PersistentPlugin>,
name: String,
}
impl PluginGcState {
fn next_timeout(&self, now: Instant) -> Option<Duration> {
if self.locks <= 0 && !self.disabled {
self.last_update
.zip(self.config.enabled.then_some(self.config.stop_after))
.map(|(last_update, stop_after)| {
// If configured to stop, and used at some point, calculate the difference
let stop_after_duration = Duration::from_nanos(stop_after.max(0) as u64);
let duration_since_last_update = now.duration_since(last_update);
stop_after_duration.saturating_sub(duration_since_last_update)
})
} else {
// Don't timeout if there are locks set, or disabled
None
}
}
// returns `Some()` if the GC should not continue to operate, with `true` if it should stop the
// plugin, or `false` if it should not
fn handle_message(&mut self, msg: PluginGcMsg) -> Option<bool> {
match msg {
PluginGcMsg::SetConfig(config) => {
self.config = config;
}
PluginGcMsg::Flush(sender) => {
// Rather than sending a message, we just drop the channel, which causes the other
// side to disconnect equally well
drop(sender);
}
PluginGcMsg::AddLocks(amount) => {
self.locks += amount;
if self.locks < 0 {
log::warn!(
"Plugin GC ({name}) problem: locks count below zero after adding \
{amount}: locks={locks}",
name = self.name,
locks = self.locks,
);
}
// Any time locks are modified, that counts as activity
self.last_update = Some(Instant::now());
}
PluginGcMsg::SetDisabled(disabled) => {
self.disabled = disabled;
}
PluginGcMsg::StopTracking => {
// Immediately exit without stopping the plugin
return Some(false);
}
PluginGcMsg::Exited => {
// Exit and stop the plugin
return Some(true);
}
}
None
}
fn run(&mut self, receiver: mpsc::Receiver<PluginGcMsg>) {
let mut always_stop = false;
loop {
let Some(msg) = (match self.next_timeout(Instant::now()) {
Some(duration) => receiver.recv_timeout(duration).ok(),
None => receiver.recv().ok(),
}) else {
// If the timeout was reached, or the channel is disconnected, break the loop
break;
};
log::trace!("Plugin GC ({name}) message: {msg:?}", name = self.name);
if let Some(should_stop) = self.handle_message(msg) {
// Exit the GC
if should_stop {
// If should_stop = true, attempt to stop the plugin
always_stop = true;
break;
} else {
// Don't stop the plugin
return;
}
}
}
// Upon exiting the loop, if the timeout reached zero, or we are exiting due to an Exited
// message, stop the plugin
if always_stop
|| self
.next_timeout(Instant::now())
.is_some_and(|t| t.is_zero())
{
// We only hold a weak reference, and it's not an error if we fail to upgrade it -
// that just means the plugin is definitely stopped anyway.
if let Some(plugin) = self.plugin.upgrade() {
let name = &self.name;
if let Err(err) = plugin.stop() {
log::warn!("Plugin `{name}` failed to be stopped by GC: {err}");
} else {
log::debug!("Plugin `{name}` successfully stopped by GC");
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_state() -> PluginGcState {
PluginGcState {
config: PluginGcConfig::default(),
last_update: None,
locks: 0,
disabled: false,
plugin: Weak::new(),
name: "test".into(),
}
}
#[test]
fn timeout_configured_as_zero() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.config.stop_after = 0;
state.last_update = Some(now);
assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
}
#[test]
fn timeout_past_deadline() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
state.last_update = Some(now - Duration::from_secs(2));
assert_eq!(Some(Duration::ZERO), state.next_timeout(now));
}
#[test]
fn timeout_with_deadline_in_future() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.config.stop_after = Duration::from_secs(1).as_nanos() as i64;
state.last_update = Some(now);
assert_eq!(Some(Duration::from_secs(1)), state.next_timeout(now));
}
#[test]
fn no_timeout_if_disabled_by_config() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = false;
state.last_update = Some(now);
assert_eq!(None, state.next_timeout(now));
}
#[test]
fn no_timeout_if_disabled_by_plugin() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.disabled = true;
state.last_update = Some(now);
assert_eq!(None, state.next_timeout(now));
}
#[test]
fn no_timeout_if_locks_count_over_zero() {
let now = Instant::now();
let mut state = test_state();
state.config.enabled = true;
state.locks = 1;
state.last_update = Some(now);
assert_eq!(None, state.next_timeout(now));
}
#[test]
fn adding_locks_changes_last_update() {
let mut state = test_state();
let original_last_update = Some(Instant::now() - Duration::from_secs(1));
state.last_update = original_last_update;
state.handle_message(PluginGcMsg::AddLocks(1));
assert_ne!(original_last_update, state.last_update, "not updated");
}
}