Make some of the stream tests more robust against bad timeouts (#12271)

# Description

@WindSoilder [reported on
Discord](https://discord.com/channels/601130461678272522/855947301380947968/1221233630093901834)
that some plugin stream tests have been failing on the CI. It seems to
just be a timing thing (probably due to busy CI), so this extends the
amount of time that we can wait for a condition to be true.

# User-Facing Changes
None

# Tests + Formatting
- 🟢 `toolkit fmt`
- 🟢 `toolkit clippy`
- 🟢 `toolkit test`
- 🟢 `toolkit test stdlib`
This commit is contained in:
Devyn Cairns 2024-03-23 16:41:27 -07:00 committed by GitHub
parent 78be67f0c6
commit 544e7bcb5e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -3,7 +3,7 @@ use std::{
atomic::{AtomicBool, Ordering::Relaxed}, atomic::{AtomicBool, Ordering::Relaxed},
mpsc, Arc, mpsc, Arc,
}, },
time::Duration, time::{Duration, Instant},
}; };
use nu_protocol::{ShellError, Value}; use nu_protocol::{ShellError, Value};
@ -16,6 +16,36 @@ use super::{StreamManager, StreamReader, StreamWriter, StreamWriterSignal, Write
// slow to complete. 10 ms is a pretty long time // slow to complete. 10 ms is a pretty long time
const WAIT_DURATION: Duration = Duration::from_millis(10); const WAIT_DURATION: Duration = Duration::from_millis(10);
// Maximum time to wait for a condition to be true
const MAX_WAIT_DURATION: Duration = Duration::from_millis(500);
/// Wait for a condition to be true, or panic if the duration exceeds MAX_WAIT_DURATION
#[track_caller]
fn wait_for_condition(mut cond: impl FnMut() -> bool, message: &str) {
// Early check
if cond() {
return;
}
let start = Instant::now();
loop {
std::thread::sleep(Duration::from_millis(10));
if cond() {
return;
}
let elapsed = Instant::now().saturating_duration_since(start);
if elapsed > MAX_WAIT_DURATION {
panic!(
"{message}: Waited {:.2}sec, which is more than the maximum of {:.2}sec",
elapsed.as_secs_f64(),
MAX_WAIT_DURATION.as_secs_f64(),
);
}
}
}
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
struct TestSink(Vec<StreamMessage>); struct TestSink(Vec<StreamMessage>);
@ -301,8 +331,7 @@ fn signal_wait_for_drain_blocks_on_unacknowledged() -> Result<(), ShellError> {
for _ in 0..100 { for _ in 0..100 {
signal.notify_acknowledged()?; signal.notify_acknowledged()?;
} }
std::thread::sleep(WAIT_DURATION); wait_for_condition(|| spawned.is_finished(), "blocked at end");
assert!(spawned.is_finished(), "blocked at end");
spawned.join().unwrap() spawned.join().unwrap()
}) })
} }
@ -322,8 +351,7 @@ fn signal_wait_for_drain_unblocks_on_dropped() -> Result<(), ShellError> {
std::thread::sleep(WAIT_DURATION); std::thread::sleep(WAIT_DURATION);
assert!(!spawned.is_finished(), "didn't block"); assert!(!spawned.is_finished(), "didn't block");
signal.set_dropped()?; signal.set_dropped()?;
std::thread::sleep(WAIT_DURATION); wait_for_condition(|| spawned.is_finished(), "still blocked at end");
assert!(spawned.is_finished(), "still blocked at end");
spawned.join().unwrap() spawned.join().unwrap()
}) })
} }