From 72f3942c377586c7ecee700647d367dc59a30829 Mon Sep 17 00:00:00 2001 From: Devyn Cairns Date: Thu, 2 May 2024 22:31:33 -0700 Subject: [PATCH] Upgrade to interprocess 2.0.0 (#12729) # Description This fixes #12724. NetBSD confirmed to work with this change. The update also behaves a bit better in some ways - it automatically unlinks and reclaims sockets on Unix, and doesn't try to flush/sync the socket on Windows, so I was able to remove that platform-specific logic. They also have a way to split the socket so I could just use one socket now, but I haven't tried to do that yet. That would be more of a breaking change but I think it's more straightforward. # User-Facing Changes - Hopefully more platforms work # Tests + Formatting - :green_circle: `toolkit fmt` - :green_circle: `toolkit clippy` - :green_circle: `toolkit test` - :green_circle: `toolkit test stdlib` --- Cargo.lock | 181 ++---------------- Cargo.toml | 2 +- .../communication_mode/local_socket/mod.rs | 56 ++---- .../src/communication_mode/mod.rs | 61 +++--- crates/nu_plugin_stress_internals/src/main.rs | 39 ++-- 5 files changed, 79 insertions(+), 260 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 170f242459..0e7c9f9203 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,30 +240,6 @@ dependencies = [ "wait-timeout", ] -[[package]] -name = "async-channel" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" -dependencies = [ - "concurrent-queue", - "event-listener 5.3.0", - "event-listener-strategy 0.5.1", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-lock" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" -dependencies = [ - "event-listener 4.0.3", - "event-listener-strategy 0.4.0", - "pin-project-lite", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -286,12 +262,6 @@ dependencies = [ "syn 2.0.58", ] -[[package]] -name = "async-task" -version = "4.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" - [[package]] name = "async-trait" version = "0.1.79" @@ -318,12 +288,6 @@ version = "0.15.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ae037714f313c1353189ead58ef9eec30a8e8dc101b2622d461418fd59e28a9" -[[package]] -name = "atomic-waker" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" - [[package]] name = "autocfg" version = "1.2.0" @@ -472,22 +436,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" -dependencies = [ - "async-channel", - "async-lock", - "async-task", - "fastrand", - "futures-io", - "futures-lite", - "piper", - "tracing", -] - [[package]] name = "borsh" version = "1.4.0" @@ -905,15 +853,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "concurrent-queue" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "condtype" version = "1.3.0" @@ -1437,48 +1376,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b90ca2580b73ab6a1f724b76ca11ab632df820fd6040c336200d2c1df7b3c82c" -[[package]] -name = "event-listener" -version = "4.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener" -version = "5.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d9944b8ca13534cdfb2800775f8dd4902ff3fc75a50101466decadfdf322a24" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" -dependencies = [ - "event-listener 4.0.3", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "332f51cb23d20b0de8458b86580878211da09bcd4503cb579c225b3d124cabb3" -dependencies = [ - "event-listener 5.3.0", - "pin-project-lite", -] - [[package]] name = "fallible-iterator" version = "0.3.0" @@ -1695,16 +1592,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" -[[package]] -name = "futures-lite" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" -dependencies = [ - "futures-core", - "pin-project-lite", -] - [[package]] name = "futures-macro" version = "0.3.30" @@ -2126,30 +2013,16 @@ dependencies = [ [[package]] name = "interprocess" -version = "1.2.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81f2533f3be42fffe3b5e63b71aeca416c1c3bc33e4e27be018521e76b1f38fb" +checksum = "6d5f0e3c218e7a86a6712fd3adc84672304f9e839402b866685b9117a077c37f" dependencies = [ - "blocking", - "cfg-if", - "futures-core", - "futures-io", - "intmap", "libc", - "once_cell", - "rustc_version", - "spinning", - "thiserror", - "to_method", - "winapi", + "recvmsg", + "widestring", + "windows-sys 0.52.0", ] -[[package]] -name = "intmap" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae52f28f45ac2bc96edb7714de995cffc174a395fb0abf5bff453587c980d7b9" - [[package]] name = "inventory" version = "0.3.15" @@ -3830,12 +3703,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "parking" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" - [[package]] name = "parking_lot" version = "0.12.1" @@ -4059,17 +3926,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "piper" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" -dependencies = [ - "atomic-waker", - "fastrand", - "futures-io", -] - [[package]] name = "pkg-config" version = "0.3.30" @@ -4865,6 +4721,12 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "recvmsg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175" + [[package]] name = "redox_syscall" version = "0.4.1" @@ -5605,15 +5467,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "spinning" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d4f0e86297cad2658d92a707320d87bf4e6ae1050287f51d19b67ef3f153a7b" -dependencies = [ - "lock_api", -] - [[package]] name = "sqlparser" version = "0.39.0" @@ -6051,12 +5904,6 @@ dependencies = [ "regex", ] -[[package]] -name = "to_method" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c4ceeeca15c8384bbc3e011dbd8fccb7f068a440b752b7d9b32ceb0ca0e2e8" - [[package]] name = "tokio" version = "1.37.0" @@ -6762,6 +6609,12 @@ dependencies = [ "winsafe", ] +[[package]] +name = "widestring" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311" + [[package]] name = "wild" version = "2.2.1" diff --git a/Cargo.toml b/Cargo.toml index 8382ebec4a..7fbc95a239 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ heck = "0.5.0" human-date-parser = "0.1.1" indexmap = "2.2" indicatif = "0.17" -interprocess = "1.2.1" +interprocess = "2.0.0" is_executable = "1.0" itertools = "0.12" libc = "0.2" diff --git a/crates/nu-plugin-core/src/communication_mode/local_socket/mod.rs b/crates/nu-plugin-core/src/communication_mode/local_socket/mod.rs index e550892fe1..10f71b8e2b 100644 --- a/crates/nu-plugin-core/src/communication_mode/local_socket/mod.rs +++ b/crates/nu-plugin-core/src/communication_mode/local_socket/mod.rs @@ -1,4 +1,4 @@ -use std::ffi::OsString; +use std::ffi::{OsStr, OsString}; #[cfg(test)] pub(crate) mod tests; @@ -23,6 +23,16 @@ pub fn make_local_socket_name(unique_id: &str) -> OsString { base.into() } +/// Interpret a local socket name for use with `interprocess`. +#[cfg(unix)] +pub fn interpret_local_socket_name( + name: &OsStr, +) -> Result { + use interprocess::local_socket::{GenericFilePath, ToFsName}; + + name.to_fs_name::() +} + /// Generate a name to be used for a local socket specific to this `nu` process, described by the /// given `unique_id`, which should be unique to the purpose of the socket. /// @@ -33,6 +43,16 @@ pub fn make_local_socket_name(unique_id: &str) -> OsString { format!("nu.{}.{}", std::process::id(), unique_id).into() } +/// Interpret a local socket name for use with `interprocess`. +#[cfg(windows)] +pub fn interpret_local_socket_name( + name: &OsStr, +) -> Result { + use interprocess::local_socket::{GenericNamespaced, ToNsName}; + + name.to_ns_name::() +} + /// Determine if the error is just due to the listener not being ready yet in asynchronous mode #[cfg(not(windows))] pub fn is_would_block_err(err: &std::io::Error) -> bool { @@ -48,37 +68,3 @@ pub fn is_would_block_err(err: &std::io::Error) -> bool { e as i64 == windows::Win32::Foundation::ERROR_PIPE_LISTENING.0 as i64 }) } - -/// Wraps the `interprocess` local socket stream for greater compatibility -#[derive(Debug)] -pub struct LocalSocketStream(pub interprocess::local_socket::LocalSocketStream); - -impl From for LocalSocketStream { - fn from(value: interprocess::local_socket::LocalSocketStream) -> Self { - LocalSocketStream(value) - } -} - -impl std::io::Read for LocalSocketStream { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - self.0.read(buf) - } -} - -impl std::io::Write for LocalSocketStream { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.0.write(buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - // We don't actually flush the underlying socket on Windows. The flush operation on a - // Windows named pipe actually synchronizes with read on the other side, and won't finish - // until the other side is empty. This isn't how most of our other I/O methods work, so we - // just won't do it. The BufWriter above this will have still made a write call with the - // contents of the buffer, which should be good enough. - if cfg!(not(windows)) { - self.0.flush()?; - } - Ok(()) - } -} diff --git a/crates/nu-plugin-core/src/communication_mode/mod.rs b/crates/nu-plugin-core/src/communication_mode/mod.rs index 5d5fd03dd0..1eb72d6ac8 100644 --- a/crates/nu-plugin-core/src/communication_mode/mod.rs +++ b/crates/nu-plugin-core/src/communication_mode/mod.rs @@ -4,9 +4,6 @@ use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; use nu_protocol::ShellError; -#[cfg(feature = "local-socket")] -use interprocess::local_socket::LocalSocketListener; - #[cfg(feature = "local-socket")] mod local_socket; @@ -83,15 +80,14 @@ impl CommunicationMode { // For sockets: we need to create the server so that the child won't fail to connect. #[cfg(feature = "local-socket")] CommunicationMode::LocalSocket(name) => { - let listener = LocalSocketListener::bind(name.as_os_str()).map_err(|err| { - ShellError::IOError { + use interprocess::local_socket::ListenerOptions; + + let listener = interpret_local_socket_name(name) + .and_then(|name| ListenerOptions::new().name(name).create_sync()) + .map_err(|err| ShellError::IOError { msg: format!("failed to open socket for plugin: {err}"), - } - })?; - Ok(PreparedServerCommunication::LocalSocket { - name: name.clone(), - listener, - }) + })?; + Ok(PreparedServerCommunication::LocalSocket { listener }) } } } @@ -107,11 +103,13 @@ impl CommunicationMode { // Connect to the specified socket. let get_socket = || { use interprocess::local_socket as ls; - ls::LocalSocketStream::connect(name.as_os_str()) + use ls::traits::Stream; + + interpret_local_socket_name(name) + .and_then(|name| ls::Stream::connect(name)) .map_err(|err| ShellError::IOError { msg: format!("failed to connect to socket: {err}"), }) - .map(LocalSocketStream::from) }; // Reverse order from the server: read in, write out let read_in = get_socket()?; @@ -133,9 +131,7 @@ pub enum PreparedServerCommunication { /// Contains the listener to accept connections on. On Unix, the socket is unlinked on `Drop`. #[cfg(feature = "local-socket")] LocalSocket { - #[cfg_attr(windows, allow(dead_code))] // not used on Windows - name: std::ffi::OsString, - listener: LocalSocketListener, + listener: interprocess::local_socket::Listener, }, } @@ -161,6 +157,9 @@ impl PreparedServerCommunication { } #[cfg(feature = "local-socket")] PreparedServerCommunication::LocalSocket { listener, .. } => { + use interprocess::local_socket::traits::{ + Listener, ListenerNonblockingMode, Stream, + }; use std::time::{Duration, Instant}; const RETRY_PERIOD: Duration = Duration::from_millis(1); @@ -170,13 +169,16 @@ impl PreparedServerCommunication { // Use a loop to try to get two clients from the listener: one for read (the plugin // output) and one for write (the plugin input) - listener.set_nonblocking(true)?; + // + // Be non-blocking on Accept only, so we can timeout. + listener.set_nonblocking(ListenerNonblockingMode::Accept)?; let mut get_socket = || { let mut result = None; while let Ok(None) = child.try_wait() { match listener.accept() { Ok(stream) => { - // Success! But make sure the stream is in blocking mode. + // Success! Ensure the stream is in nonblocking mode though, for + // good measure. Had an issue without this on macOS. stream.set_nonblocking(false)?; result = Some(stream); break; @@ -198,7 +200,7 @@ impl PreparedServerCommunication { } } if let Some(stream) = result { - Ok(LocalSocketStream(stream)) + Ok(stream) } else { // The process may have exited Err(ShellError::PluginFailedToLoad { @@ -215,26 +217,13 @@ impl PreparedServerCommunication { } } -impl Drop for PreparedServerCommunication { - fn drop(&mut self) { - match self { - #[cfg(all(unix, feature = "local-socket"))] - PreparedServerCommunication::LocalSocket { name: path, .. } => { - // Just try to remove the socket file, it's ok if this fails - let _ = std::fs::remove_file(path); - } - _ => (), - } - } -} - /// The required streams for communication from the engine side, i.e. the server in socket terms. pub enum ServerCommunicationIo { Stdio(ChildStdin, ChildStdout), #[cfg(feature = "local-socket")] LocalSocket { - read_out: LocalSocketStream, - write_in: LocalSocketStream, + read_out: interprocess::local_socket::Stream, + write_in: interprocess::local_socket::Stream, }, } @@ -243,7 +232,7 @@ pub enum ClientCommunicationIo { Stdio(Stdin, Stdout), #[cfg(feature = "local-socket")] LocalSocket { - read_in: LocalSocketStream, - write_out: LocalSocketStream, + read_in: interprocess::local_socket::Stream, + write_out: interprocess::local_socket::Stream, }, } diff --git a/crates/nu_plugin_stress_internals/src/main.rs b/crates/nu_plugin_stress_internals/src/main.rs index 2ce2a5536e..bdbe5c8943 100644 --- a/crates/nu_plugin_stress_internals/src/main.rs +++ b/crates/nu_plugin_stress_internals/src/main.rs @@ -1,9 +1,12 @@ use std::{ error::Error, + ffi::OsStr, io::{BufRead, BufReader, Write}, }; -use interprocess::local_socket::LocalSocketStream; +use interprocess::local_socket::{ + self, traits::Stream, GenericFilePath, GenericNamespaced, ToFsName, ToNsName, +}; use serde::Deserialize; use serde_json::{json, Value}; @@ -35,9 +38,6 @@ pub fn main() -> Result<(), Box> { local_socket_path: None, }; - #[allow(unused_mut)] - let mut should_flush = true; - let (mut input, mut output): (Box, Box) = match args.get(1).map(|s| s.as_str()) { Some("--stdio") => ( @@ -49,14 +49,13 @@ pub fn main() -> Result<(), Box> { if opts.refuse_local_socket { std::process::exit(1) } else { - let in_socket = LocalSocketStream::connect(args[2].as_str())?; - let out_socket = LocalSocketStream::connect(args[2].as_str())?; - - #[cfg(windows)] - { - // Flushing on a socket on Windows is weird and waits for the other side - should_flush = false; - } + let name = if cfg!(windows) { + OsStr::new(&args[2]).to_ns_name::()? + } else { + OsStr::new(&args[2]).to_fs_name::()? + }; + let in_socket = local_socket::Stream::connect(name.clone())?; + let out_socket = local_socket::Stream::connect(name)?; (Box::new(BufReader::new(in_socket)), Box::new(out_socket)) } @@ -73,9 +72,7 @@ pub fn main() -> Result<(), Box> { // Send encoding format output.write_all(b"\x04json")?; - if should_flush { - output.flush()?; - } + output.flush()?; // Test exiting without `Hello` if opts.exit_before_hello { @@ -91,7 +88,6 @@ pub fn main() -> Result<(), Box> { // Send `Hello` message write( &mut output, - should_flush, &json!({ "Hello": { "protocol": "nu-plugin", @@ -117,7 +113,7 @@ pub fn main() -> Result<(), Box> { // Parse incoming messages loop { match Value::deserialize(&mut de) { - Ok(message) => handle_message(&mut output, should_flush, &opts, &message)?, + Ok(message) => handle_message(&mut output, &opts, &message)?, Err(err) => { if err.is_eof() { break; @@ -135,7 +131,6 @@ pub fn main() -> Result<(), Box> { fn handle_message( output: &mut impl Write, - should_flush: bool, opts: &Options, message: &Value, ) -> Result<(), Box> { @@ -144,7 +139,6 @@ fn handle_message( if plugin_call.as_str() == Some("Signature") { write( output, - should_flush, &json!({ "CallResponse": [ id, @@ -165,7 +159,6 @@ fn handle_message( }); write( output, - should_flush, &json!({ "CallResponse": [ id, @@ -212,11 +205,9 @@ fn signatures() -> Vec { })] } -fn write(output: &mut impl Write, should_flush: bool, value: &Value) -> Result<(), Box> { +fn write(output: &mut impl Write, value: &Value) -> Result<(), Box> { serde_json::to_writer(&mut *output, value)?; output.write_all(b"\n")?; - if should_flush { - output.flush()?; - } + output.flush()?; Ok(()) }