From c82dfce24643ccd57707f343a9e251ea5f91efd9 Mon Sep 17 00:00:00 2001 From: Devyn Cairns Date: Fri, 5 Apr 2024 19:57:00 -0700 Subject: [PATCH] Fix deadlock on PluginCustomValue drop (#12418) # Description Because the plugin interface reader thread can be responsible for sending a drop notification, it's possible for it to end up in a deadlock where it's waiting for the response to the drop notification call. I decided that the best way to address this is to just discard the response and not wait for it. It's not really important to synchronize with the response to `Dropped`, so this is probably faster anyway. cc @ayax79, this is your issue where polars is getting stuck # User-Facing Changes - A bug fix - Custom value plugin: `custom-value handle update` command # Tests + Formatting Tried to add a test with a long pipeline with a lot of drops and run it over and over to reproduce the deadlock. - :green_circle: `toolkit fmt` - :green_circle: `toolkit clippy` - :green_circle: `toolkit test` - :green_circle: `toolkit test stdlib` --- .../nu-plugin/src/plugin/interface/plugin.rs | 33 ++++--- .../src/plugin/interface/plugin/tests.rs | 4 + .../src/handle_update.rs | 90 +++++++++++++++++++ crates/nu_plugin_custom_values/src/main.rs | 3 + tests/plugins/custom_values.rs | 22 +++++ 5 files changed, 141 insertions(+), 11 deletions(-) create mode 100644 crates/nu_plugin_custom_values/src/handle_update.rs diff --git a/crates/nu-plugin/src/plugin/interface/plugin.rs b/crates/nu-plugin/src/plugin/interface/plugin.rs index 91e64f1407..3e234d3a76 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin.rs @@ -93,6 +93,9 @@ impl std::fmt::Debug for PluginInterfaceState { struct PluginCallState { /// The sender back to the thread that is waiting for the plugin call response sender: Option>, + /// Don't try to send the plugin call response. This is only used for `Dropped` to avoid an + /// error + dont_send_response: bool, /// Interrupt signal to be used for stream iterators ctrlc: Option>, /// Channel to receive context on to be used if needed @@ -244,11 +247,12 @@ impl PluginInterfaceManager { // Remove the subscription sender, since this will be the last message. // // We can spawn a new one if we need it for engine calls. - if e.get_mut() - .sender - .take() - .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok()) - .is_none() + if !e.get().dont_send_response + && e.get_mut() + .sender + .take() + .and_then(|s| s.send(ReceivedPluginCallMessage::Response(response)).ok()) + .is_none() { log::warn!("Received a plugin call response for id={id}, but the caller hung up"); } @@ -688,13 +692,18 @@ impl PluginInterface { } }; + // Don't try to send a response for a Dropped call. + let dont_send_response = + matches!(call, PluginCall::CustomValueOp(_, CustomValueOp::Dropped)); + // Register the subscription to the response, and the context self.state .plugin_call_subscription_sender .send(( id, PluginCallState { - sender: Some(tx), + sender: Some(tx).filter(|_| !dont_send_response), + dont_send_response, ctrlc, context_rx: Some(context_rx), keep_plugin_custom_values, @@ -938,13 +947,15 @@ impl PluginInterface { /// Notify the plugin about a dropped custom value. pub fn custom_value_dropped(&self, value: PluginCustomValue) -> Result<(), ShellError> { + // Make sure we don't block here. This can happen on the receiver thread, which would cause a deadlock. We should not try to receive the response - just let it be discarded. + // // Note: the protocol is always designed to have a span with the custom value, but this // operation doesn't support one. - self.custom_value_op_expecting_value( - value.into_spanned(Span::unknown()), - CustomValueOp::Dropped, - ) - .map(|_| ()) + drop(self.write_plugin_call( + PluginCall::CustomValueOp(value.into_spanned(Span::unknown()), CustomValueOp::Dropped), + None, + )?); + Ok(()) } } diff --git a/crates/nu-plugin/src/plugin/interface/plugin/tests.rs b/crates/nu-plugin/src/plugin/interface/plugin/tests.rs index 3d66fe9624..5ca7209b16 100644 --- a/crates/nu-plugin/src/plugin/interface/plugin/tests.rs +++ b/crates/nu-plugin/src/plugin/interface/plugin/tests.rs @@ -193,6 +193,7 @@ fn fake_plugin_call( id, PluginCallState { sender: Some(tx), + dont_send_response: false, ctrlc: None, context_rx: None, keep_plugin_custom_values: mpsc::channel(), @@ -496,6 +497,7 @@ fn manager_handle_engine_call_after_response_received() -> Result<(), ShellError 0, PluginCallState { sender: None, + dont_send_response: false, ctrlc: None, context_rx: Some(context_rx), keep_plugin_custom_values: mpsc::channel(), @@ -560,6 +562,7 @@ fn manager_send_plugin_call_response_removes_context_only_if_no_streams_to_read( n, PluginCallState { sender: None, + dont_send_response: false, ctrlc: None, context_rx: None, keep_plugin_custom_values: mpsc::channel(), @@ -594,6 +597,7 @@ fn manager_consume_stream_end_removes_context_only_if_last_stream() -> Result<() n, PluginCallState { sender: None, + dont_send_response: false, ctrlc: None, context_rx: None, keep_plugin_custom_values: mpsc::channel(), diff --git a/crates/nu_plugin_custom_values/src/handle_update.rs b/crates/nu_plugin_custom_values/src/handle_update.rs new file mode 100644 index 0000000000..8256464ca7 --- /dev/null +++ b/crates/nu_plugin_custom_values/src/handle_update.rs @@ -0,0 +1,90 @@ +use std::sync::atomic; + +use nu_plugin::{EngineInterface, EvaluatedCall, SimplePluginCommand}; +use nu_protocol::{ + engine::Closure, LabeledError, ShellError, Signature, Spanned, SyntaxShape, Type, Value, +}; + +use crate::{handle_custom_value::HandleCustomValue, CustomValuePlugin}; + +pub struct HandleUpdate; + +impl SimplePluginCommand for HandleUpdate { + type Plugin = CustomValuePlugin; + + fn name(&self) -> &str { + "custom-value handle update" + } + + fn signature(&self) -> Signature { + Signature::build(self.name()) + .input_output_type( + Type::Custom("HandleCustomValue".into()), + Type::Custom("HandleCustomValue".into()), + ) + .required( + "closure", + SyntaxShape::Closure(Some(vec![SyntaxShape::Any])), + "the closure to run on the value", + ) + } + + fn usage(&self) -> &str { + "Update the value in a handle and return a new handle with the result" + } + + fn run( + &self, + plugin: &Self::Plugin, + engine: &EngineInterface, + call: &EvaluatedCall, + input: &Value, + ) -> Result { + let closure: Spanned = call.req(0)?; + + if let Some(handle) = input + .as_custom_value()? + .as_any() + .downcast_ref::() + { + // Find the handle + let value = plugin + .handles + .lock() + .map_err(|err| LabeledError::new(err.to_string()))? + .get(&handle.0) + .cloned(); + + if let Some(value) = value { + // Call the closure with the value + let new_value = engine.eval_closure(&closure, vec![value.clone()], Some(value))?; + + // Generate an id and store in the plugin. + let new_id = plugin.counter.fetch_add(1, atomic::Ordering::Relaxed); + + plugin + .handles + .lock() + .map_err(|err| LabeledError::new(err.to_string()))? + .insert(new_id, new_value); + + Ok(Value::custom( + Box::new(HandleCustomValue(new_id)), + call.head, + )) + } else { + Err(LabeledError::new("Handle expired") + .with_label("this handle is no longer valid", input.span()) + .with_help("the plugin may have exited, or there was a bug")) + } + } else { + Err(ShellError::UnsupportedInput { + msg: "requires HandleCustomValue".into(), + input: format!("got {}", input.get_type()), + msg_span: call.head, + input_span: input.span(), + } + .into()) + } + } +} diff --git a/crates/nu_plugin_custom_values/src/main.rs b/crates/nu_plugin_custom_values/src/main.rs index c2df39115f..9ab69135fb 100644 --- a/crates/nu_plugin_custom_values/src/main.rs +++ b/crates/nu_plugin_custom_values/src/main.rs @@ -15,6 +15,7 @@ mod generate; mod generate2; mod handle_get; mod handle_make; +mod handle_update; mod update; mod update_arg; @@ -23,6 +24,7 @@ use generate::Generate; use generate2::Generate2; use handle_get::HandleGet; use handle_make::HandleMake; +use handle_update::HandleUpdate; use nu_protocol::{CustomValue, LabeledError, Spanned, Value}; use update::Update; use update_arg::UpdateArg; @@ -49,6 +51,7 @@ impl Plugin for CustomValuePlugin { Box::new(DropCheck), Box::new(HandleGet), Box::new(HandleMake), + Box::new(HandleUpdate), ] } diff --git a/tests/plugins/custom_values.rs b/tests/plugins/custom_values.rs index 359e532449..16b0c332e3 100644 --- a/tests/plugins/custom_values.rs +++ b/tests/plugins/custom_values.rs @@ -195,6 +195,28 @@ fn handle_make_then_get_success() { assert!(actual.status.success()); } +#[test] +fn handle_update_several_times_doesnt_deadlock() { + // Do this in a loop to try to provoke a deadlock on drop + for _ in 0..10 { + let actual = nu_with_plugins!( + cwd: "tests", + plugin: ("nu_plugin_custom_values"), + r#" + "hEllO" | + custom-value handle make | + custom-value handle update { str upcase } | + custom-value handle update { str downcase } | + custom-value handle update { str title-case } | + custom-value handle get + "# + ); + + assert_eq!(actual.out, "Hello"); + assert!(actual.status.success()); + } +} + #[test] fn custom_value_in_example_is_rendered() { let actual = nu_with_plugins!(