Fix externals busy waiting (#3280)

This commit is contained in:
Jonathan Turner 2021-04-08 07:25:15 +12:00 committed by GitHub
parent 42fac722bb
commit 5fcf11fcb0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 166 deletions

View file

@ -1,7 +1,7 @@
use crate::futures::ThreadedReceiver;
use crate::prelude::*; use crate::prelude::*;
use nu_engine::{evaluate_baseline_expr, BufCodecReader}; use nu_engine::{evaluate_baseline_expr, BufCodecReader};
use nu_engine::{MaybeTextCodec, StringOrBinary}; use nu_engine::{MaybeTextCodec, StringOrBinary};
use parking_lot::Mutex;
use std::io::Write; use std::io::Write;
use std::ops::Deref; use std::ops::Deref;
@ -431,7 +431,7 @@ fn spawn(
Ok(()) Ok(())
}); });
let stream = ThreadedReceiver::new(rx); let stream = ChannelReceiver::new(rx);
Ok(stream.to_input_stream()) Ok(stream.to_input_stream())
} else { } else {
Err(ShellError::labeled_error( Err(ShellError::labeled_error(
@ -442,6 +442,30 @@ fn spawn(
} }
} }
struct ChannelReceiver {
rx: Arc<Mutex<mpsc::Receiver<Result<Value, ShellError>>>>,
}
impl ChannelReceiver {
pub fn new(rx: mpsc::Receiver<Result<Value, ShellError>>) -> Self {
Self {
rx: Arc::new(Mutex::new(rx)),
}
}
}
impl Iterator for ChannelReceiver {
type Item = Result<Value, ShellError>;
fn next(&mut self) -> Option<Self::Item> {
let rx = self.rx.lock();
match rx.recv() {
Ok(v) => Some(v),
Err(_) => None,
}
}
}
fn expand_tilde<SI: ?Sized, P, HD>(input: &SI, home_dir: HD) -> std::borrow::Cow<str> fn expand_tilde<SI: ?Sized, P, HD>(input: &SI, home_dir: HD) -> std::borrow::Cow<str>
where where
SI: AsRef<str>, SI: AsRef<str>,

View file

@ -1,163 +0,0 @@
use std::sync::{mpsc, Arc, Mutex};
use std::task::Waker;
use std::thread;
#[allow(clippy::option_option)]
struct SharedState<T: Send + 'static> {
result: Option<Option<T>>,
kill: bool,
waker: Option<Waker>,
}
pub struct ThreadedReceiver<T: Send + 'static> {
shared_state: Arc<Mutex<SharedState<T>>>,
}
impl<T: Send + 'static> ThreadedReceiver<T> {
pub fn new(recv: mpsc::Receiver<T>) -> ThreadedReceiver<T> {
let shared_state = Arc::new(Mutex::new(SharedState {
result: None,
kill: false,
waker: None,
}));
// Clone everything to avoid lifetimes
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
loop {
let result = recv.recv();
{
let mut shared_state = thread_shared_state
.lock()
.expect("ThreadedFuture shared state shouldn't be poisoned");
if let Ok(result) = result {
shared_state.result = Some(Some(result));
} else {
break;
}
}
// Don't attempt to recv anything else until consumed
loop {
let mut shared_state = thread_shared_state
.lock()
.expect("ThreadedFuture shared state shouldn't be poisoned");
if shared_state.kill {
return;
}
if shared_state.result.is_some() {
if let Some(waker) = shared_state.waker.take() {
waker.wake();
}
} else {
break;
}
}
}
// Let the Stream implementation know that we're done
let mut shared_state = thread_shared_state
.lock()
.expect("ThreadedFuture shared state shouldn't be poisoned");
shared_state.result = Some(None);
if let Some(waker) = shared_state.waker.take() {
waker.wake();
}
});
ThreadedReceiver { shared_state }
}
}
// impl<T: Send + 'static> Stream for ThreadedReceiver<T> {
// type Item = T;
// fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
// let mut shared_state = self
// .shared_state
// .lock()
// .expect("ThreadedFuture shared state shouldn't be poisoned");
// if let Some(result) = shared_state.result.take() {
// Poll::Ready(result)
// } else {
// shared_state.waker = Some(cx.waker().clone());
// Poll::Pending
// }
// }
// }
impl<T: Send + 'static> Iterator for ThreadedReceiver<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
loop {
let mut shared_state = self
.shared_state
.lock()
.expect("ThreadedFuture shared state shouldn't be poisoned");
if let Some(result) = shared_state.result.take() {
return result;
}
}
}
}
impl<T: Send + 'static> Drop for ThreadedReceiver<T> {
fn drop(&mut self) {
// Setting the kill flag to true will cause the thread spawned in `new` to exit, which
// will cause the `Receiver` argument to get dropped. This can allow senders to
// potentially clean up.
match self.shared_state.lock() {
Ok(mut state) => state.kill = true,
Err(mut poisoned_err) => poisoned_err.get_mut().kill = true,
}
}
}
#[cfg(test)]
mod tests {
mod threaded_receiver {
use super::super::ThreadedReceiver;
use std::sync::mpsc;
#[test]
fn returns_expected_result() {
let (tx, rx) = mpsc::sync_channel(0);
std::thread::spawn(move || {
let _ = tx.send(1);
let _ = tx.send(2);
let _ = tx.send(3);
});
let stream = ThreadedReceiver::new(rx);
let mut result = stream;
assert_eq!(Some(1), result.next());
assert_eq!(Some(2), result.next());
assert_eq!(Some(3), result.next());
assert_eq!(None, result.next());
}
#[test]
fn drops_receiver_when_stream_dropped() {
let (tx, rx) = mpsc::sync_channel(0);
let th = std::thread::spawn(move || {
tx.send(1).and_then(|_| tx.send(2)).and_then(|_| tx.send(3))
});
{
let stream = ThreadedReceiver::new(rx);
let mut result = stream;
assert_eq!(Some(1), result.next());
}
let result = th.join();
assert_eq!(true, result.unwrap().is_err());
}
}
}

View file

@ -7,7 +7,6 @@ extern crate indexmap;
#[macro_use] #[macro_use]
mod prelude; mod prelude;
pub mod commands; pub mod commands;
mod futures;
pub mod utils; pub mod utils;
#[cfg(test)] #[cfg(test)]