From 1772eb0f1a5c714c91f8ae45cc67cbae6b7ff348 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Tue, 20 Apr 2021 16:06:20 +0300 Subject: [PATCH] fix: no longer get stuck on windows reading both stdout & stderr is a common gotcha, you need to drain them concurrently to avoid deadlocks. Not sure why I didn't do the right thing from the start. Seems like I assumed the stderr is short? That's not the case when cargo spams `compiling xyz` messages --- Cargo.lock | 3 + crates/project_model/src/build_data.rs | 132 ++++++++------ crates/stdx/Cargo.toml | 5 + crates/stdx/src/lib.rs | 15 +- crates/stdx/src/process.rs | 238 +++++++++++++++++++++++++ 5 files changed, 329 insertions(+), 64 deletions(-) create mode 100644 crates/stdx/src/process.rs diff --git a/Cargo.lock b/Cargo.lock index 1bb66c66e0..14decc14ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1573,6 +1573,9 @@ version = "0.0.0" dependencies = [ "always-assert", "backtrace", + "libc", + "miow", + "winapi", ] [[package]] diff --git a/crates/project_model/src/build_data.rs b/crates/project_model/src/build_data.rs index ab5cc8c491..faca336de0 100644 --- a/crates/project_model/src/build_data.rs +++ b/crates/project_model/src/build_data.rs @@ -1,7 +1,6 @@ //! Handles build script specific information use std::{ - io::BufReader, path::PathBuf, process::{Command, Stdio}, sync::Arc, @@ -13,7 +12,8 @@ use cargo_metadata::{BuildScript, Message}; use itertools::Itertools; use paths::{AbsPath, AbsPathBuf}; use rustc_hash::FxHashMap; -use stdx::{format_to, JodChild}; +use serde::Deserialize; +use stdx::format_to; use crate::{cfg_flag::CfgFlag, CargoConfig}; @@ -171,67 +171,86 @@ impl WorkspaceBuildData { cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null()); - let mut child = cmd.spawn().map(JodChild)?; - let child_stdout = child.stdout.take().unwrap(); - let stdout = BufReader::new(child_stdout); - let mut res = WorkspaceBuildData::default(); - for message in cargo_metadata::Message::parse_stream(stdout).flatten() { - match message { - Message::BuildScriptExecuted(BuildScript { - package_id, - out_dir, - cfgs, - env, - .. - }) => { - let cfgs = { - let mut acc = Vec::new(); - for cfg in cfgs { - match cfg.parse::() { - Ok(it) => acc.push(it), - Err(err) => { - anyhow::bail!("invalid cfg from cargo-metadata: {}", err) - } - }; - } - acc - }; - let package_build_data = - res.per_package.entry(package_id.repr.clone()).or_default(); - // cargo_metadata crate returns default (empty) path for - // older cargos, which is not absolute, so work around that. - if !out_dir.as_str().is_empty() { - let out_dir = AbsPathBuf::assert(PathBuf::from(out_dir.into_os_string())); - package_build_data.out_dir = Some(out_dir); - package_build_data.cfgs = cfgs; - } - package_build_data.envs = env; + let mut callback_err = None; + let output = stdx::process::streaming_output( + cmd, + &mut |line| { + if callback_err.is_some() { + return; } - Message::CompilerArtifact(message) => { - progress(format!("metadata {}", message.target.name)); - if message.target.kind.contains(&"proc-macro".to_string()) { - let package_id = message.package_id; - // Skip rmeta file - if let Some(filename) = message.filenames.iter().find(|name| is_dylib(name)) - { - let filename = AbsPathBuf::assert(PathBuf::from(&filename)); - let package_build_data = - res.per_package.entry(package_id.repr.clone()).or_default(); - package_build_data.proc_macro_dylib_path = Some(filename); + // Copy-pasted from existing cargo_metadata. It seems like we + // should be using sered_stacker here? + let mut deserializer = serde_json::Deserializer::from_str(&line); + deserializer.disable_recursion_limit(); + let message = Message::deserialize(&mut deserializer) + .unwrap_or(Message::TextLine(line.to_string())); + + match message { + Message::BuildScriptExecuted(BuildScript { + package_id, + out_dir, + cfgs, + env, + .. + }) => { + let cfgs = { + let mut acc = Vec::new(); + for cfg in cfgs { + match cfg.parse::() { + Ok(it) => acc.push(it), + Err(err) => { + callback_err = Some(anyhow::format_err!( + "invalid cfg from cargo-metadata: {}", + err + )); + return; + } + }; + } + acc + }; + let package_build_data = + res.per_package.entry(package_id.repr.clone()).or_default(); + // cargo_metadata crate returns default (empty) path for + // older cargos, which is not absolute, so work around that. + if !out_dir.as_str().is_empty() { + let out_dir = + AbsPathBuf::assert(PathBuf::from(out_dir.into_os_string())); + package_build_data.out_dir = Some(out_dir); + package_build_data.cfgs = cfgs; + } + + package_build_data.envs = env; + } + Message::CompilerArtifact(message) => { + progress(format!("metadata {}", message.target.name)); + + if message.target.kind.contains(&"proc-macro".to_string()) { + let package_id = message.package_id; + // Skip rmeta file + if let Some(filename) = + message.filenames.iter().find(|name| is_dylib(name)) + { + let filename = AbsPathBuf::assert(PathBuf::from(&filename)); + let package_build_data = + res.per_package.entry(package_id.repr.clone()).or_default(); + package_build_data.proc_macro_dylib_path = Some(filename); + } } } + Message::CompilerMessage(message) => { + progress(message.target.name.clone()); + } + Message::BuildFinished(_) => {} + Message::TextLine(_) => {} + _ => {} } - Message::CompilerMessage(message) => { - progress(message.target.name.clone()); - } - Message::BuildFinished(_) => {} - Message::TextLine(_) => {} - _ => {} - } - } + }, + &mut |_| (), + )?; for package in packages { let package_build_data = res.per_package.entry(package.id.repr.clone()).or_default(); @@ -244,7 +263,6 @@ impl WorkspaceBuildData { } } - let output = child.into_inner().wait_with_output()?; if !output.status.success() { let mut stderr = String::from_utf8(output.stderr).unwrap_or_default(); if stderr.is_empty() { diff --git a/crates/stdx/Cargo.toml b/crates/stdx/Cargo.toml index d28b5e6581..f78c5da7c8 100644 --- a/crates/stdx/Cargo.toml +++ b/crates/stdx/Cargo.toml @@ -10,10 +10,15 @@ edition = "2018" doctest = false [dependencies] +libc = "0.2.93" backtrace = { version = "0.3.44", optional = true } always-assert = { version = "0.1.2", features = ["log"] } # Think twice before adding anything here +[target.'cfg(windows)'.dependencies] +miow = "0.3.6" +winapi = "0.3.9" + [features] # Uncomment to enable for the whole crate graph # default = [ "backtrace" ] diff --git a/crates/stdx/src/lib.rs b/crates/stdx/src/lib.rs index b0a18d58de..e3eb109156 100644 --- a/crates/stdx/src/lib.rs +++ b/crates/stdx/src/lib.rs @@ -1,7 +1,8 @@ //! Missing batteries for standard libraries. -use std::{cmp::Ordering, ops, process, time::Instant}; +use std::{cmp::Ordering, ops, time::Instant}; mod macros; +pub mod process; pub mod panic_context; pub use always_assert::{always, never}; @@ -179,17 +180,17 @@ where } #[repr(transparent)] -pub struct JodChild(pub process::Child); +pub struct JodChild(pub std::process::Child); impl ops::Deref for JodChild { - type Target = process::Child; - fn deref(&self) -> &process::Child { + type Target = std::process::Child; + fn deref(&self) -> &std::process::Child { &self.0 } } impl ops::DerefMut for JodChild { - fn deref_mut(&mut self) -> &mut process::Child { + fn deref_mut(&mut self) -> &mut std::process::Child { &mut self.0 } } @@ -202,9 +203,9 @@ impl Drop for JodChild { } impl JodChild { - pub fn into_inner(self) -> process::Child { + pub fn into_inner(self) -> std::process::Child { // SAFETY: repr transparent - unsafe { std::mem::transmute::(self) } + unsafe { std::mem::transmute::(self) } } } diff --git a/crates/stdx/src/process.rs b/crates/stdx/src/process.rs new file mode 100644 index 0000000000..b0fa12f762 --- /dev/null +++ b/crates/stdx/src/process.rs @@ -0,0 +1,238 @@ +//! Read both stdout and stderr of child without deadlocks. +//! +//! https://github.com/rust-lang/cargo/blob/905af549966f23a9288e9993a85d1249a5436556/crates/cargo-util/src/read2.rs +//! https://github.com/rust-lang/cargo/blob/58a961314437258065e23cb6316dfc121d96fb71/crates/cargo-util/src/process_builder.rs#L231 + +use std::{ + io, + process::{Command, Output, Stdio}, +}; + +pub fn streaming_output( + mut cmd: Command, + on_stdout_line: &mut dyn FnMut(&str), + on_stderr_line: &mut dyn FnMut(&str), +) -> io::Result { + let mut stdout = Vec::new(); + let mut stderr = Vec::new(); + + let cmd = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null()); + + let status = { + let mut child = cmd.spawn()?; + let out = child.stdout.take().unwrap(); + let err = child.stderr.take().unwrap(); + imp::read2(out, err, &mut |is_out, data, eof| { + let idx = if eof { + data.len() + } else { + match data.iter().rposition(|b| *b == b'\n') { + Some(i) => i + 1, + None => return, + } + }; + { + // scope for new_lines + let new_lines = { + let dst = if is_out { &mut stdout } else { &mut stderr }; + let start = dst.len(); + let data = data.drain(..idx); + dst.extend(data); + &dst[start..] + }; + for line in String::from_utf8_lossy(new_lines).lines() { + if is_out { + on_stdout_line(line) + } else { + on_stderr_line(line) + } + } + } + })?; + child.wait()? + }; + + Ok(Output { status, stdout, stderr }) +} + +#[cfg(unix)] +mod imp { + use std::{ + io::{self, prelude::*}, + mem, + os::unix::prelude::*, + process::{ChildStderr, ChildStdout}, + }; + + pub(crate) fn read2( + mut out_pipe: ChildStdout, + mut err_pipe: ChildStderr, + data: &mut dyn FnMut(bool, &mut Vec, bool), + ) -> io::Result<()> { + unsafe { + libc::fcntl(out_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK); + libc::fcntl(err_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK); + } + + let mut out_done = false; + let mut err_done = false; + let mut out = Vec::new(); + let mut err = Vec::new(); + + let mut fds: [libc::pollfd; 2] = unsafe { mem::zeroed() }; + fds[0].fd = out_pipe.as_raw_fd(); + fds[0].events = libc::POLLIN; + fds[1].fd = err_pipe.as_raw_fd(); + fds[1].events = libc::POLLIN; + let mut nfds = 2; + let mut errfd = 1; + + while nfds > 0 { + // wait for either pipe to become readable using `select` + let r = unsafe { libc::poll(fds.as_mut_ptr(), nfds, -1) }; + if r == -1 { + let err = io::Error::last_os_error(); + if err.kind() == io::ErrorKind::Interrupted { + continue; + } + return Err(err); + } + + // Read as much as we can from each pipe, ignoring EWOULDBLOCK or + // EAGAIN. If we hit EOF, then this will happen because the underlying + // reader will return Ok(0), in which case we'll see `Ok` ourselves. In + // this case we flip the other fd back into blocking mode and read + // whatever's leftover on that file descriptor. + let handle = |res: io::Result<_>| match res { + Ok(_) => Ok(true), + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + Ok(false) + } else { + Err(e) + } + } + }; + if !err_done && fds[errfd].revents != 0 && handle(err_pipe.read_to_end(&mut err))? { + err_done = true; + nfds -= 1; + } + data(false, &mut err, err_done); + if !out_done && fds[0].revents != 0 && handle(out_pipe.read_to_end(&mut out))? { + out_done = true; + fds[0].fd = err_pipe.as_raw_fd(); + errfd = 0; + nfds -= 1; + } + data(true, &mut out, out_done); + } + Ok(()) + } +} + +#[cfg(windows)] +mod imp { + use std::{ + io, + os::windows::prelude::*, + process::{ChildStderr, ChildStdout}, + slice, + }; + + use miow::{ + iocp::{CompletionPort, CompletionStatus}, + pipe::NamedPipe, + Overlapped, + }; + use winapi::shared::winerror::ERROR_BROKEN_PIPE; + + struct Pipe<'a> { + dst: &'a mut Vec, + overlapped: Overlapped, + pipe: NamedPipe, + done: bool, + } + + pub(crate) fn read2( + out_pipe: ChildStdout, + err_pipe: ChildStderr, + data: &mut dyn FnMut(bool, &mut Vec, bool), + ) -> io::Result<()> { + let mut out = Vec::new(); + let mut err = Vec::new(); + + let port = CompletionPort::new(1)?; + port.add_handle(0, &out_pipe)?; + port.add_handle(1, &err_pipe)?; + + unsafe { + let mut out_pipe = Pipe::new(out_pipe, &mut out); + let mut err_pipe = Pipe::new(err_pipe, &mut err); + + out_pipe.read()?; + err_pipe.read()?; + + let mut status = [CompletionStatus::zero(), CompletionStatus::zero()]; + + while !out_pipe.done || !err_pipe.done { + for status in port.get_many(&mut status, None)? { + if status.token() == 0 { + out_pipe.complete(status); + data(true, out_pipe.dst, out_pipe.done); + out_pipe.read()?; + } else { + err_pipe.complete(status); + data(false, err_pipe.dst, err_pipe.done); + err_pipe.read()?; + } + } + } + + Ok(()) + } + } + + impl<'a> Pipe<'a> { + unsafe fn new(p: P, dst: &'a mut Vec) -> Pipe<'a> { + Pipe { + dst, + pipe: NamedPipe::from_raw_handle(p.into_raw_handle()), + overlapped: Overlapped::zero(), + done: false, + } + } + + unsafe fn read(&mut self) -> io::Result<()> { + let dst = slice_to_end(self.dst); + match self.pipe.read_overlapped(dst, self.overlapped.raw()) { + Ok(_) => Ok(()), + Err(e) => { + if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) { + self.done = true; + Ok(()) + } else { + Err(e) + } + } + } + } + + unsafe fn complete(&mut self, status: &CompletionStatus) { + let prev = self.dst.len(); + self.dst.set_len(prev + status.bytes_transferred() as usize); + if status.bytes_transferred() == 0 { + self.done = true; + } + } + } + + unsafe fn slice_to_end(v: &mut Vec) -> &mut [u8] { + if v.capacity() == 0 { + v.reserve(16); + } + if v.capacity() == v.len() { + v.reserve(1); + } + slice::from_raw_parts_mut(v.as_mut_ptr().add(v.len()), v.capacity() - v.len()) + } +}