Fix deadlock on PluginCustomValue drop (#12418)

# Description
Because the plugin interface reader thread can be responsible for
sending a drop notification, it's possible for it to end up in a
deadlock where it's waiting for the response to the drop notification
call.

I decided that the best way to address this is to just discard the
response and not wait for it. It's not really important to synchronize
with the response to `Dropped`, so this is probably faster anyway.

cc @ayax79, this is your issue where polars is getting stuck

# User-Facing Changes
- A bug fix
- Custom value plugin: `custom-value handle update` command

# Tests + Formatting

Tried to add a test with a long pipeline with a lot of drops and run it
over and over to reproduce the deadlock.

- 🟢 `toolkit fmt`
- 🟢 `toolkit clippy`
- 🟢 `toolkit test`
- 🟢 `toolkit test stdlib`
This commit is contained in:
Devyn Cairns 2024-04-05 19:57:00 -07:00 committed by GitHub
parent 82b7548c0c
commit c82dfce246
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 141 additions and 11 deletions

View file

@ -93,6 +93,9 @@ impl std::fmt::Debug for PluginInterfaceState {
struct PluginCallState { struct PluginCallState {
/// The sender back to the thread that is waiting for the plugin call response /// The sender back to the thread that is waiting for the plugin call response
sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>, sender: Option<mpsc::Sender<ReceivedPluginCallMessage>>,
/// Don't try to send the plugin call response. This is only used for `Dropped` to avoid an
/// error
dont_send_response: bool,
/// Interrupt signal to be used for stream iterators /// Interrupt signal to be used for stream iterators
ctrlc: Option<Arc<AtomicBool>>, ctrlc: Option<Arc<AtomicBool>>,
/// Channel to receive context on to be used if needed /// Channel to receive context on to be used if needed
@ -244,11 +247,12 @@ impl PluginInterfaceManager {
// Remove the subscription sender, since this will be the last message. // Remove the subscription sender, since this will be the last message.
// //
// We can spawn a new one if we need it for engine calls. // We can spawn a new one if we need it for engine calls.
if e.get_mut() if !e.get().dont_send_response
.sender && e.get_mut()
.take() .sender
.and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok()) .take()
.is_none() .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok())
.is_none()
{ {
log::warn!("Received a plugin call response for id={id}, but the caller hung up"); log::warn!("Received a plugin call response for id={id}, but the caller hung up");
} }
@ -688,13 +692,18 @@ impl PluginInterface {
} }
}; };
// Don't try to send a response for a Dropped call.
let dont_send_response =
matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped));
// Register the subscription to the response, and the context // Register the subscription to the response, and the context
self.state self.state
.plugin_call_subscription_sender .plugin_call_subscription_sender
.send(( .send((
id, id,
PluginCallState { PluginCallState {
sender: Some(tx), sender: Some(tx).filter(|_| !dont_send_response),
dont_send_response,
ctrlc, ctrlc,
context_rx: Some(context_rx), context_rx: Some(context_rx),
keep_plugin_custom_values, keep_plugin_custom_values,
@ -938,13 +947,15 @@ impl PluginInterface {
/// Notify the plugin about a dropped custom value. /// Notify the plugin about a dropped custom value.
pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> { pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> {
// Make sure we don't block here. This can happen on the receiver thread, which would cause a deadlock. We should not try to receive the response - just let it be discarded.
//
// Note: the protocol is always designed to have a span with the custom value, but this // Note: the protocol is always designed to have a span with the custom value, but this
// operation doesn't support one. // operation doesn't support one.
self.custom_value_op_expecting_value( drop(self.write_plugin_call(
value.into_spanned(Span::unknown()), PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped),
CustomValueOp::Dropped, None,
) )?);
.map(|_| ()) Ok(())
} }
} }

View file

@ -193,6 +193,7 @@ fn fake_plugin_call(
id, id,
PluginCallState { PluginCallState {
sender: Some(tx), sender: Some(tx),
dont_send_response: false,
ctrlc: None, ctrlc: None,
context_rx: None, context_rx: None,
keep_plugin_custom_values: mpsc::channel(), keep_plugin_custom_values: mpsc::channel(),
@ -496,6 +497,7 @@ fn manager_handle_engine_call_after_response_received() -> Result<(), ShellError
0, 0,
PluginCallState { PluginCallState {
sender: None, sender: None,
dont_send_response: false,
ctrlc: None, ctrlc: None,
context_rx: Some(context_rx), context_rx: Some(context_rx),
keep_plugin_custom_values: mpsc::channel(), keep_plugin_custom_values: mpsc::channel(),
@ -560,6 +562,7 @@ fn manager_send_plugin_call_response_removes_context_only_if_no_streams_to_read(
n, n,
PluginCallState { PluginCallState {
sender: None, sender: None,
dont_send_response: false,
ctrlc: None, ctrlc: None,
context_rx: None, context_rx: None,
keep_plugin_custom_values: mpsc::channel(), keep_plugin_custom_values: mpsc::channel(),
@ -594,6 +597,7 @@ fn manager_consume_stream_end_removes_context_only_if_last_stream() -> Result<()
n, n,
PluginCallState { PluginCallState {
sender: None, sender: None,
dont_send_response: false,
ctrlc: None, ctrlc: None,
context_rx: None, context_rx: None,
keep_plugin_custom_values: mpsc::channel(), keep_plugin_custom_values: mpsc::channel(),

View file

@ -0,0 +1,90 @@
use std::sync::atomic;
use nu_plugin::{EngineInterface, EvaluatedCall, SimplePluginCommand};
use nu_protocol::{
engine::Closure, LabeledError, ShellError, Signature, Spanned, SyntaxShape, Type, Value,
};
use crate::{handle_custom_value::HandleCustomValue, CustomValuePlugin};
pub struct HandleUpdate;
impl SimplePluginCommand for HandleUpdate {
type Plugin = CustomValuePlugin;
fn name(&self) -> &str {
"custom-value handle update"
}
fn signature(&self) -> Signature {
Signature::build(self.name())
.input_output_type(
Type::Custom("HandleCustomValue".into()),
Type::Custom("HandleCustomValue".into()),
)
.required(
"closure",
SyntaxShape::Closure(Some(vec![SyntaxShape::Any])),
"the closure to run on the value",
)
}
fn usage(&self) -> &str {
"Update the value in a handle and return a new handle with the result"
}
fn run(
&self,
plugin: &Self::Plugin,
engine: &EngineInterface,
call: &EvaluatedCall,
input: &Value,
) -> Result<Value, LabeledError> {
let closure: Spanned<Closure> = call.req(0)?;
if let Some(handle) = input
.as_custom_value()?
.as_any()
.downcast_ref::<HandleCustomValue>()
{
// Find the handle
let value = plugin
.handles
.lock()
.map_err(|err| LabeledError::new(err.to_string()))?
.get(&handle.0)
.cloned();
if let Some(value) = value {
// Call the closure with the value
let new_value = engine.eval_closure(&closure, vec![value.clone()], Some(value))?;
// Generate an id and store in the plugin.
let new_id = plugin.counter.fetch_add(1, atomic::Ordering::Relaxed);
plugin
.handles
.lock()
.map_err(|err| LabeledError::new(err.to_string()))?
.insert(new_id, new_value);
Ok(Value::custom(
Box::new(HandleCustomValue(new_id)),
call.head,
))
} else {
Err(LabeledError::new("Handle expired")
.with_label("this handle is no longer valid", input.span())
.with_help("the plugin may have exited, or there was a bug"))
}
} else {
Err(ShellError::UnsupportedInput {
msg: "requires HandleCustomValue".into(),
input: format!("got {}", input.get_type()),
msg_span: call.head,
input_span: input.span(),
}
.into())
}
}
}

View file

@ -15,6 +15,7 @@ mod generate;
mod generate2; mod generate2;
mod handle_get; mod handle_get;
mod handle_make; mod handle_make;
mod handle_update;
mod update; mod update;
mod update_arg; mod update_arg;
@ -23,6 +24,7 @@ use generate::Generate;
use generate2::Generate2; use generate2::Generate2;
use handle_get::HandleGet; use handle_get::HandleGet;
use handle_make::HandleMake; use handle_make::HandleMake;
use handle_update::HandleUpdate;
use nu_protocol::{CustomValue, LabeledError, Spanned, Value}; use nu_protocol::{CustomValue, LabeledError, Spanned, Value};
use update::Update; use update::Update;
use update_arg::UpdateArg; use update_arg::UpdateArg;
@ -49,6 +51,7 @@ impl Plugin for CustomValuePlugin {
Box::new(DropCheck), Box::new(DropCheck),
Box::new(HandleGet), Box::new(HandleGet),
Box::new(HandleMake), Box::new(HandleMake),
Box::new(HandleUpdate),
] ]
} }

View file

@ -195,6 +195,28 @@ fn handle_make_then_get_success() {
assert!(actual.status.success()); assert!(actual.status.success());
} }
#[test]
fn handle_update_several_times_doesnt_deadlock() {
// Do this in a loop to try to provoke a deadlock on drop
for _ in 0..10 {
let actual = nu_with_plugins!(
cwd: "tests",
plugin: ("nu_plugin_custom_values"),
r#"
"hEllO" |
custom-value handle make |
custom-value handle update { str upcase } |
custom-value handle update { str downcase } |
custom-value handle update { str title-case } |
custom-value handle get
"#
);
assert_eq!(actual.out, "Hello");
assert!(actual.status.success());
}
}
#[test] #[test]
fn custom_value_in_example_is_rendered() { fn custom_value_in_example_is_rendered() {
let actual = nu_with_plugins!( let actual = nu_with_plugins!(