fix: no longer get stuck on windows

reading both stdout & stderr is a common gotcha, you need to drain them
concurrently to avoid deadlocks. Not sure why I didn't do the right
thing from the start. Seems like I assumed the stderr is short? That's
not the case when cargo spams `compiling xyz` messages
This commit is contained in:
Aleksey Kladov 2021-04-20 16:06:20 +03:00
parent 1834938d6f
commit 1772eb0f1a
5 changed files with 329 additions and 64 deletions

3
Cargo.lock generated
View file

@ -1573,6 +1573,9 @@ version = "0.0.0"
dependencies = [
"always-assert",
"backtrace",
"libc",
"miow",
"winapi",
]
[[package]]

View file

@ -1,7 +1,6 @@
//! Handles build script specific information
use std::{
io::BufReader,
path::PathBuf,
process::{Command, Stdio},
sync::Arc,
@ -13,7 +12,8 @@ use cargo_metadata::{BuildScript, Message};
use itertools::Itertools;
use paths::{AbsPath, AbsPathBuf};
use rustc_hash::FxHashMap;
use stdx::{format_to, JodChild};
use serde::Deserialize;
use stdx::format_to;
use crate::{cfg_flag::CfgFlag, CargoConfig};
@ -171,67 +171,86 @@ impl WorkspaceBuildData {
cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null());
let mut child = cmd.spawn().map(JodChild)?;
let child_stdout = child.stdout.take().unwrap();
let stdout = BufReader::new(child_stdout);
let mut res = WorkspaceBuildData::default();
for message in cargo_metadata::Message::parse_stream(stdout).flatten() {
match message {
Message::BuildScriptExecuted(BuildScript {
package_id,
out_dir,
cfgs,
env,
..
}) => {
let cfgs = {
let mut acc = Vec::new();
for cfg in cfgs {
match cfg.parse::<CfgFlag>() {
Ok(it) => acc.push(it),
Err(err) => {
anyhow::bail!("invalid cfg from cargo-metadata: {}", err)
}
};
}
acc
};
let package_build_data =
res.per_package.entry(package_id.repr.clone()).or_default();
// cargo_metadata crate returns default (empty) path for
// older cargos, which is not absolute, so work around that.
if !out_dir.as_str().is_empty() {
let out_dir = AbsPathBuf::assert(PathBuf::from(out_dir.into_os_string()));
package_build_data.out_dir = Some(out_dir);
package_build_data.cfgs = cfgs;
}
package_build_data.envs = env;
let mut callback_err = None;
let output = stdx::process::streaming_output(
cmd,
&mut |line| {
if callback_err.is_some() {
return;
}
Message::CompilerArtifact(message) => {
progress(format!("metadata {}", message.target.name));
if message.target.kind.contains(&"proc-macro".to_string()) {
let package_id = message.package_id;
// Skip rmeta file
if let Some(filename) = message.filenames.iter().find(|name| is_dylib(name))
{
let filename = AbsPathBuf::assert(PathBuf::from(&filename));
let package_build_data =
res.per_package.entry(package_id.repr.clone()).or_default();
package_build_data.proc_macro_dylib_path = Some(filename);
// Copy-pasted from existing cargo_metadata. It seems like we
// should be using sered_stacker here?
let mut deserializer = serde_json::Deserializer::from_str(&line);
deserializer.disable_recursion_limit();
let message = Message::deserialize(&mut deserializer)
.unwrap_or(Message::TextLine(line.to_string()));
match message {
Message::BuildScriptExecuted(BuildScript {
package_id,
out_dir,
cfgs,
env,
..
}) => {
let cfgs = {
let mut acc = Vec::new();
for cfg in cfgs {
match cfg.parse::<CfgFlag>() {
Ok(it) => acc.push(it),
Err(err) => {
callback_err = Some(anyhow::format_err!(
"invalid cfg from cargo-metadata: {}",
err
));
return;
}
};
}
acc
};
let package_build_data =
res.per_package.entry(package_id.repr.clone()).or_default();
// cargo_metadata crate returns default (empty) path for
// older cargos, which is not absolute, so work around that.
if !out_dir.as_str().is_empty() {
let out_dir =
AbsPathBuf::assert(PathBuf::from(out_dir.into_os_string()));
package_build_data.out_dir = Some(out_dir);
package_build_data.cfgs = cfgs;
}
package_build_data.envs = env;
}
Message::CompilerArtifact(message) => {
progress(format!("metadata {}", message.target.name));
if message.target.kind.contains(&"proc-macro".to_string()) {
let package_id = message.package_id;
// Skip rmeta file
if let Some(filename) =
message.filenames.iter().find(|name| is_dylib(name))
{
let filename = AbsPathBuf::assert(PathBuf::from(&filename));
let package_build_data =
res.per_package.entry(package_id.repr.clone()).or_default();
package_build_data.proc_macro_dylib_path = Some(filename);
}
}
}
Message::CompilerMessage(message) => {
progress(message.target.name.clone());
}
Message::BuildFinished(_) => {}
Message::TextLine(_) => {}
_ => {}
}
Message::CompilerMessage(message) => {
progress(message.target.name.clone());
}
Message::BuildFinished(_) => {}
Message::TextLine(_) => {}
_ => {}
}
}
},
&mut |_| (),
)?;
for package in packages {
let package_build_data = res.per_package.entry(package.id.repr.clone()).or_default();
@ -244,7 +263,6 @@ impl WorkspaceBuildData {
}
}
let output = child.into_inner().wait_with_output()?;
if !output.status.success() {
let mut stderr = String::from_utf8(output.stderr).unwrap_or_default();
if stderr.is_empty() {

View file

@ -10,10 +10,15 @@ edition = "2018"
doctest = false
[dependencies]
libc = "0.2.93"
backtrace = { version = "0.3.44", optional = true }
always-assert = { version = "0.1.2", features = ["log"] }
# Think twice before adding anything here
[target.'cfg(windows)'.dependencies]
miow = "0.3.6"
winapi = "0.3.9"
[features]
# Uncomment to enable for the whole crate graph
# default = [ "backtrace" ]

View file

@ -1,7 +1,8 @@
//! Missing batteries for standard libraries.
use std::{cmp::Ordering, ops, process, time::Instant};
use std::{cmp::Ordering, ops, time::Instant};
mod macros;
pub mod process;
pub mod panic_context;
pub use always_assert::{always, never};
@ -179,17 +180,17 @@ where
}
#[repr(transparent)]
pub struct JodChild(pub process::Child);
pub struct JodChild(pub std::process::Child);
impl ops::Deref for JodChild {
type Target = process::Child;
fn deref(&self) -> &process::Child {
type Target = std::process::Child;
fn deref(&self) -> &std::process::Child {
&self.0
}
}
impl ops::DerefMut for JodChild {
fn deref_mut(&mut self) -> &mut process::Child {
fn deref_mut(&mut self) -> &mut std::process::Child {
&mut self.0
}
}
@ -202,9 +203,9 @@ impl Drop for JodChild {
}
impl JodChild {
pub fn into_inner(self) -> process::Child {
pub fn into_inner(self) -> std::process::Child {
// SAFETY: repr transparent
unsafe { std::mem::transmute::<JodChild, process::Child>(self) }
unsafe { std::mem::transmute::<JodChild, std::process::Child>(self) }
}
}

238
crates/stdx/src/process.rs Normal file
View file

@ -0,0 +1,238 @@
//! Read both stdout and stderr of child without deadlocks.
//!
//! https://github.com/rust-lang/cargo/blob/905af549966f23a9288e9993a85d1249a5436556/crates/cargo-util/src/read2.rs
//! https://github.com/rust-lang/cargo/blob/58a961314437258065e23cb6316dfc121d96fb71/crates/cargo-util/src/process_builder.rs#L231
use std::{
io,
process::{Command, Output, Stdio},
};
pub fn streaming_output(
mut cmd: Command,
on_stdout_line: &mut dyn FnMut(&str),
on_stderr_line: &mut dyn FnMut(&str),
) -> io::Result<Output> {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let cmd = cmd.stdout(Stdio::piped()).stderr(Stdio::piped()).stdin(Stdio::null());
let status = {
let mut child = cmd.spawn()?;
let out = child.stdout.take().unwrap();
let err = child.stderr.take().unwrap();
imp::read2(out, err, &mut |is_out, data, eof| {
let idx = if eof {
data.len()
} else {
match data.iter().rposition(|b| *b == b'\n') {
Some(i) => i + 1,
None => return,
}
};
{
// scope for new_lines
let new_lines = {
let dst = if is_out { &mut stdout } else { &mut stderr };
let start = dst.len();
let data = data.drain(..idx);
dst.extend(data);
&dst[start..]
};
for line in String::from_utf8_lossy(new_lines).lines() {
if is_out {
on_stdout_line(line)
} else {
on_stderr_line(line)
}
}
}
})?;
child.wait()?
};
Ok(Output { status, stdout, stderr })
}
#[cfg(unix)]
mod imp {
use std::{
io::{self, prelude::*},
mem,
os::unix::prelude::*,
process::{ChildStderr, ChildStdout},
};
pub(crate) fn read2(
mut out_pipe: ChildStdout,
mut err_pipe: ChildStderr,
data: &mut dyn FnMut(bool, &mut Vec<u8>, bool),
) -> io::Result<()> {
unsafe {
libc::fcntl(out_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK);
libc::fcntl(err_pipe.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK);
}
let mut out_done = false;
let mut err_done = false;
let mut out = Vec::new();
let mut err = Vec::new();
let mut fds: [libc::pollfd; 2] = unsafe { mem::zeroed() };
fds[0].fd = out_pipe.as_raw_fd();
fds[0].events = libc::POLLIN;
fds[1].fd = err_pipe.as_raw_fd();
fds[1].events = libc::POLLIN;
let mut nfds = 2;
let mut errfd = 1;
while nfds > 0 {
// wait for either pipe to become readable using `select`
let r = unsafe { libc::poll(fds.as_mut_ptr(), nfds, -1) };
if r == -1 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::Interrupted {
continue;
}
return Err(err);
}
// Read as much as we can from each pipe, ignoring EWOULDBLOCK or
// EAGAIN. If we hit EOF, then this will happen because the underlying
// reader will return Ok(0), in which case we'll see `Ok` ourselves. In
// this case we flip the other fd back into blocking mode and read
// whatever's leftover on that file descriptor.
let handle = |res: io::Result<_>| match res {
Ok(_) => Ok(true),
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock {
Ok(false)
} else {
Err(e)
}
}
};
if !err_done && fds[errfd].revents != 0 && handle(err_pipe.read_to_end(&mut err))? {
err_done = true;
nfds -= 1;
}
data(false, &mut err, err_done);
if !out_done && fds[0].revents != 0 && handle(out_pipe.read_to_end(&mut out))? {
out_done = true;
fds[0].fd = err_pipe.as_raw_fd();
errfd = 0;
nfds -= 1;
}
data(true, &mut out, out_done);
}
Ok(())
}
}
#[cfg(windows)]
mod imp {
use std::{
io,
os::windows::prelude::*,
process::{ChildStderr, ChildStdout},
slice,
};
use miow::{
iocp::{CompletionPort, CompletionStatus},
pipe::NamedPipe,
Overlapped,
};
use winapi::shared::winerror::ERROR_BROKEN_PIPE;
struct Pipe<'a> {
dst: &'a mut Vec<u8>,
overlapped: Overlapped,
pipe: NamedPipe,
done: bool,
}
pub(crate) fn read2(
out_pipe: ChildStdout,
err_pipe: ChildStderr,
data: &mut dyn FnMut(bool, &mut Vec<u8>, bool),
) -> io::Result<()> {
let mut out = Vec::new();
let mut err = Vec::new();
let port = CompletionPort::new(1)?;
port.add_handle(0, &out_pipe)?;
port.add_handle(1, &err_pipe)?;
unsafe {
let mut out_pipe = Pipe::new(out_pipe, &mut out);
let mut err_pipe = Pipe::new(err_pipe, &mut err);
out_pipe.read()?;
err_pipe.read()?;
let mut status = [CompletionStatus::zero(), CompletionStatus::zero()];
while !out_pipe.done || !err_pipe.done {
for status in port.get_many(&mut status, None)? {
if status.token() == 0 {
out_pipe.complete(status);
data(true, out_pipe.dst, out_pipe.done);
out_pipe.read()?;
} else {
err_pipe.complete(status);
data(false, err_pipe.dst, err_pipe.done);
err_pipe.read()?;
}
}
}
Ok(())
}
}
impl<'a> Pipe<'a> {
unsafe fn new<P: IntoRawHandle>(p: P, dst: &'a mut Vec<u8>) -> Pipe<'a> {
Pipe {
dst,
pipe: NamedPipe::from_raw_handle(p.into_raw_handle()),
overlapped: Overlapped::zero(),
done: false,
}
}
unsafe fn read(&mut self) -> io::Result<()> {
let dst = slice_to_end(self.dst);
match self.pipe.read_overlapped(dst, self.overlapped.raw()) {
Ok(_) => Ok(()),
Err(e) => {
if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) {
self.done = true;
Ok(())
} else {
Err(e)
}
}
}
}
unsafe fn complete(&mut self, status: &CompletionStatus) {
let prev = self.dst.len();
self.dst.set_len(prev + status.bytes_transferred() as usize);
if status.bytes_transferred() == 0 {
self.done = true;
}
}
}
unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
if v.capacity() == 0 {
v.reserve(16);
}
if v.capacity() == v.len() {
v.reserve(1);
}
slice::from_raw_parts_mut(v.as_mut_ptr().add(v.len()), v.capacity() - v.len())
}
}