Improve error messages for plugin protocol by removing #[serde(untagged)] (#12510)

# Description

In the plugin protocol, I had used `#[serde(untagged)]` on the `Stream`
variant to make it smaller and include all of the stream messages at the
top level, but unfortunately this causes serde to make really unhelpful
errors if anything fails to decode anywhere:

```
Error: nu:🐚:plugin_failed_to_decode

  × Plugin failed to decode: data did not match any variant of untagged enum PluginOutput
```

If you are trying to develop something using the plugin protocol
directly, this error is incredibly unhelpful. Even as a user, this
basically just says 'something is wrong'. With this change, the errors
are much better:

```
Error: nu:🐚:plugin_failed_to_decode

  × Plugin failed to decode: unknown variant `PipelineDatra`, expected one of `Error`, `Signature`, `Ordering`, `PipelineData` at line 2 column 37
```

The only downside is it means I have to duplicate all of the
`StreamMessage` variants manually, but there's only 4 of them and
they're small.

This doesn't actually change the protocol at all - everything is still
identical on the wire.

# Tests + Formatting
- 🟢 `toolkit fmt`
- 🟢 `toolkit clippy`
- 🟢 `toolkit test`
- 🟢 `toolkit test stdlib`
This commit is contained in:
Devyn Cairns 2024-04-14 08:55:18 -07:00 committed by GitHub
parent 50b2dac8d7
commit af72a18785
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 99 additions and 62 deletions

View file

@ -250,7 +250,17 @@ impl InterfaceManager for EngineInterfaceManager {
.into(),
})
}
PluginInput::Stream(message) => self.consume_stream_message(message),
// Stream messages
PluginInput::Data(..)
| PluginInput::End(..)
| PluginInput::Drop(..)
| PluginInput::Ack(..) => {
self.consume_stream_message(input.try_into().map_err(|msg| {
ShellError::NushellFailed {
msg: format!("Failed to convert message {msg:?} to StreamMessage"),
}
})?)
}
PluginInput::Call(id, call) => {
let interface = self.interface_for_context(id);
// Read streams in the input

View file

@ -5,7 +5,7 @@ use crate::{
test_util::{expected_test_custom_value, test_plugin_custom_value, TestCustomValue},
CallInfo, CustomValueOp, EngineCall, EngineCallId, EngineCallResponse, ExternalStreamInfo,
ListStreamInfo, PipelineDataHeader, PluginCall, PluginCustomValue, PluginInput, Protocol,
ProtocolInfo, RawStreamInfo, StreamData, StreamMessage,
ProtocolInfo, RawStreamInfo, StreamData,
},
EvaluatedCall, PluginCallResponse, PluginOutput,
};
@ -278,7 +278,7 @@ fn manager_consume_errors_on_sending_other_messages_before_hello() -> Result<(),
assert!(manager.protocol_info.is_none());
let error = manager
.consume(PluginInput::Stream(StreamMessage::Drop(0)))
.consume(PluginInput::Drop(0))
.expect_err("consume before Hello should cause an error");
assert!(format!("{error:?}").contains("Hello"));
@ -381,13 +381,10 @@ fn manager_consume_call_run_forwards_to_receiver_with_pipeline_data() -> Result<
))?;
for i in 0..10 {
manager.consume(PluginInput::Stream(StreamMessage::Data(
6,
Value::test_int(i).into(),
)))?;
manager.consume(PluginInput::Data(6, Value::test_int(i).into()))?;
}
manager.consume(PluginInput::Stream(StreamMessage::End(6)))?;
manager.consume(PluginInput::End(6))?;
// Make sure the streams end and we don't deadlock
drop(manager);
@ -522,13 +519,10 @@ fn manager_consume_engine_call_response_forwards_to_subscriber_with_pipeline_dat
))?;
for i in 0..2 {
manager.consume(PluginInput::Stream(StreamMessage::Data(
0,
Value::test_int(i).into(),
)))?;
manager.consume(PluginInput::Data(0, Value::test_int(i).into()))?;
}
manager.consume(PluginInput::Stream(StreamMessage::End(0)))?;
manager.consume(PluginInput::End(0))?;
// Make sure the streams end and we don't deadlock
drop(manager);
@ -710,20 +704,20 @@ fn interface_write_response_with_stream() -> Result<(), ShellError> {
for number in [3, 4, 5] {
match test.next_written().expect("missing stream Data message") {
PluginOutput::Stream(StreamMessage::Data(id, data)) => {
PluginOutput::Data(id, data) => {
assert_eq!(info.id, id, "Data id");
match data {
StreamData::List(val) => assert_eq!(number, val.as_int()?),
_ => panic!("expected List data: {data:?}"),
}
}
message => panic!("expected Stream(Data(..)): {message:?}"),
message => panic!("expected Data(..): {message:?}"),
}
}
match test.next_written().expect("missing stream End message") {
PluginOutput::Stream(StreamMessage::End(id)) => assert_eq!(info.id, id, "End id"),
message => panic!("expected Stream(Data(..)): {message:?}"),
PluginOutput::End(id) => assert_eq!(info.id, id, "End id"),
message => panic!("expected Data(..): {message:?}"),
}
assert!(!test.has_unconsumed_write());

View file

@ -480,7 +480,17 @@ impl InterfaceManager for PluginInterfaceManager {
),
})
}
PluginOutput::Stream(message) => self.consume_stream_message(message),
// Stream messages
PluginOutput::Data(..)
| PluginOutput::End(..)
| PluginOutput::Drop(..)
| PluginOutput::Ack(..) => {
self.consume_stream_message(input.try_into().map_err(|msg| {
ShellError::NushellFailed {
msg: format!("Failed to convert message {msg:?} to StreamMessage"),
}
})?)
}
PluginOutput::Option(option) => match option {
PluginOption::GcDisabled(disabled) => {
// Turn garbage collection off/on.

View file

@ -310,7 +310,7 @@ fn manager_consume_errors_on_sending_other_messages_before_hello() -> Result<(),
assert!(manager.protocol_info.is_none());
let error = manager
.consume(PluginOutput::Stream(StreamMessage::Drop(0)))
.consume(PluginOutput::Drop(0))
.expect_err("consume before Hello should cause an error");
assert!(format!("{error:?}").contains("Hello"));
@ -331,13 +331,10 @@ fn manager_consume_call_response_forwards_to_subscriber_with_pipeline_data(
))?;
for i in 0..2 {
manager.consume(PluginOutput::Stream(StreamMessage::Data(
0,
Value::test_int(i).into(),
)))?;
manager.consume(PluginOutput::Data(0, Value::test_int(i).into()))?;
}
manager.consume(PluginOutput::Stream(StreamMessage::End(0)))?;
manager.consume(PluginOutput::End(0))?;
// Make sure the streams end and we don't deadlock
drop(manager);
@ -454,12 +451,9 @@ fn manager_consume_engine_call_forwards_to_subscriber_with_pipeline_data() -> Re
})?;
for i in 0..2 {
manager.consume(PluginOutput::Stream(StreamMessage::Data(
2,
Value::test_int(i).into(),
)))?;
manager.consume(PluginOutput::Data(2, Value::test_int(i).into()))?;
}
manager.consume(PluginOutput::Stream(StreamMessage::End(2)))?;
manager.consume(PluginOutput::End(2))?;
// Make sure the streams end and we don't deadlock
drop(manager);
@ -889,7 +883,7 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel
.next_written()
.expect("failed to get Data stream message")
{
PluginInput::Stream(StreamMessage::Data(id, data)) => {
PluginInput::Data(id, data) => {
assert_eq!(info.id, id, "id");
match data {
StreamData::List(data_value) => {
@ -906,10 +900,10 @@ fn interface_write_plugin_call_writes_run_with_stream_input() -> Result<(), Shel
.next_written()
.expect("failed to get End stream message")
{
PluginInput::Stream(StreamMessage::End(id)) => {
PluginInput::End(id) => {
assert_eq!(info.id, id, "id");
}
message => panic!("expected Stream(End(_)) message: {message:?}"),
message => panic!("expected End(_) message: {message:?}"),
}
Ok(())

View file

@ -66,7 +66,14 @@ impl InterfaceManager for TestInterfaceManager {
fn consume(&mut self, input: Self::Input) -> Result<(), ShellError> {
match input {
PluginInput::Stream(msg) => self.consume_stream_message(msg),
PluginInput::Data(..)
| PluginInput::End(..)
| PluginInput::Drop(..)
| PluginInput::Ack(..) => self.consume_stream_message(
input
.try_into()
.expect("failed to convert message to StreamMessage"),
),
_ => unimplemented!(),
}
}
@ -414,7 +421,7 @@ fn write_pipeline_data_list_stream() -> Result<(), ShellError> {
// Now make sure the stream messages have been written
for value in values {
match test.next_written().expect("unexpected end of stream") {
PluginOutput::Stream(StreamMessage::Data(id, data)) => {
PluginOutput::Data(id, data) => {
assert_eq!(info.id, id, "Data id");
match data {
StreamData::List(read_value) => assert_eq!(value, read_value, "Data value"),
@ -426,7 +433,7 @@ fn write_pipeline_data_list_stream() -> Result<(), ShellError> {
}
match test.next_written().expect("unexpected end of stream") {
PluginOutput::Stream(StreamMessage::End(id)) => {
PluginOutput::End(id) => {
assert_eq!(info.id, id, "End id");
}
other => panic!("unexpected output: {other:?}"),
@ -510,7 +517,7 @@ fn write_pipeline_data_external_stream() -> Result<(), ShellError> {
// End must come after all Data
for msg in test.written() {
match msg {
PluginOutput::Stream(StreamMessage::Data(id, data)) => {
PluginOutput::Data(id, data) => {
if id == stdout_info.id {
let result: Result<Vec<u8>, ShellError> =
data.try_into().expect("wrong data in stdout stream");
@ -535,7 +542,7 @@ fn write_pipeline_data_external_stream() -> Result<(), ShellError> {
panic!("unrecognized stream id: {id}");
}
}
PluginOutput::Stream(StreamMessage::End(id)) => {
PluginOutput::End(id) => {
if id == stdout_info.id {
assert!(!stdout_ended, "double End of stdout");
assert!(stdout_iter.next().is_none(), "unexpected end of stdout");

View file

@ -208,11 +208,14 @@ pub enum PluginInput {
/// Response to an [`EngineCall`]. The ID should be the same one sent with the engine call this
/// is responding to
EngineCallResponse(EngineCallId, EngineCallResponse<PipelineDataHeader>),
/// Stream control or data message. Untagged to keep them as small as possible.
///
/// For example, `Stream(Ack(0))` is encoded as `{"Ack": 0}`
#[serde(untagged)]
Stream(StreamMessage),
/// See [`StreamMessage::Data`].
Data(StreamId, StreamData),
/// See [`StreamMessage::End`].
End(StreamId),
/// See [`StreamMessage::Drop`].
Drop(StreamId),
/// See [`StreamMessage::Ack`].
Ack(StreamId),
}
impl TryFrom<PluginInput> for StreamMessage {
@ -220,7 +223,10 @@ impl TryFrom<PluginInput> for StreamMessage {
fn try_from(msg: PluginInput) -> Result<StreamMessage, PluginInput> {
match msg {
PluginInput::Stream(stream_msg) => Ok(stream_msg),
PluginInput::Data(id, data) => Ok(StreamMessage::Data(id, data)),
PluginInput::End(id) => Ok(StreamMessage::End(id)),
PluginInput::Drop(id) => Ok(StreamMessage::Drop(id)),
PluginInput::Ack(id) => Ok(StreamMessage::Ack(id)),
_ => Err(msg),
}
}
@ -228,7 +234,12 @@ impl TryFrom<PluginInput> for StreamMessage {
impl From<StreamMessage> for PluginInput {
fn from(stream_msg: StreamMessage) -> PluginInput {
PluginInput::Stream(stream_msg)
match stream_msg {
StreamMessage::Data(id, data) => PluginInput::Data(id, data),
StreamMessage::End(id) => PluginInput::End(id),
StreamMessage::Drop(id) => PluginInput::Drop(id),
StreamMessage::Ack(id) => PluginInput::Ack(id),
}
}
}
@ -420,11 +431,14 @@ pub enum PluginOutput {
id: EngineCallId,
call: EngineCall<PipelineDataHeader>,
},
/// Stream control or data message. Untagged to keep them as small as possible.
///
/// For example, `Stream(Ack(0))` is encoded as `{"Ack": 0}`
#[serde(untagged)]
Stream(StreamMessage),
/// See [`StreamMessage::Data`].
Data(StreamId, StreamData),
/// See [`StreamMessage::End`].
End(StreamId),
/// See [`StreamMessage::Drop`].
Drop(StreamId),
/// See [`StreamMessage::Ack`].
Ack(StreamId),
}
impl TryFrom<PluginOutput> for StreamMessage {
@ -432,7 +446,10 @@ impl TryFrom<PluginOutput> for StreamMessage {
fn try_from(msg: PluginOutput) -> Result<StreamMessage, PluginOutput> {
match msg {
PluginOutput::Stream(stream_msg) => Ok(stream_msg),
PluginOutput::Data(id, data) => Ok(StreamMessage::Data(id, data)),
PluginOutput::End(id) => Ok(StreamMessage::End(id)),
PluginOutput::Drop(id) => Ok(StreamMessage::Drop(id)),
PluginOutput::Ack(id) => Ok(StreamMessage::Ack(id)),
_ => Err(msg),
}
}
@ -440,7 +457,12 @@ impl TryFrom<PluginOutput> for StreamMessage {
impl From<StreamMessage> for PluginOutput {
fn from(stream_msg: StreamMessage) -> PluginOutput {
PluginOutput::Stream(stream_msg)
match stream_msg {
StreamMessage::Data(id, data) => PluginOutput::Data(id, data),
StreamMessage::End(id) => PluginOutput::End(id),
StreamMessage::Drop(id) => PluginOutput::Drop(id),
StreamMessage::Ack(id) => PluginOutput::Ack(id),
}
}
}

View file

@ -117,14 +117,14 @@ mod tests {
fn json_has_no_other_newlines() {
let mut out = vec![];
// use something deeply nested, to try to trigger any pretty printing
let output = PluginOutput::Stream(StreamMessage::Data(
let output = PluginOutput::Data(
0,
StreamData::List(Value::test_list(vec![
Value::test_int(4),
// in case escaping failed
Value::test_string("newline\ncontaining\nstring"),
])),
));
);
JsonSerializer {}
.encode(&output, &mut out)
.expect("serialization error");

View file

@ -3,7 +3,7 @@ macro_rules! generate_tests {
use crate::protocol::{
CallInfo, CustomValueOp, EvaluatedCall, PipelineDataHeader, PluginCall,
PluginCallResponse, PluginCustomValue, PluginInput, PluginOption, PluginOutput,
StreamData, StreamMessage,
StreamData,
};
use nu_protocol::{
LabeledError, PluginSignature, Signature, Span, Spanned, SyntaxShape, Value,
@ -429,7 +429,7 @@ macro_rules! generate_tests {
let item = Value::int(1, span);
let stream_data = StreamData::List(item.clone());
let plugin_input = PluginInput::Stream(StreamMessage::Data(0, stream_data));
let plugin_input = PluginInput::Data(0, stream_data);
let encoder = $encoder;
let mut buffer: Vec<u8> = Vec::new();
@ -442,7 +442,7 @@ macro_rules! generate_tests {
.expect("eof");
match returned {
PluginInput::Stream(StreamMessage::Data(id, StreamData::List(list_data))) => {
PluginInput::Data(id, StreamData::List(list_data)) => {
assert_eq!(0, id);
assert_eq!(item, list_data);
}
@ -455,7 +455,7 @@ macro_rules! generate_tests {
let data = b"Hello world";
let stream_data = StreamData::Raw(Ok(data.to_vec()));
let plugin_input = PluginInput::Stream(StreamMessage::Data(1, stream_data));
let plugin_input = PluginInput::Data(1, stream_data);
let encoder = $encoder;
let mut buffer: Vec<u8> = Vec::new();
@ -468,7 +468,7 @@ macro_rules! generate_tests {
.expect("eof");
match returned {
PluginInput::Stream(StreamMessage::Data(id, StreamData::Raw(bytes))) => {
PluginInput::Data(id, StreamData::Raw(bytes)) => {
assert_eq!(1, id);
match bytes {
Ok(bytes) => assert_eq!(data, &bytes[..]),
@ -485,7 +485,7 @@ macro_rules! generate_tests {
let item = Value::int(1, span);
let stream_data = StreamData::List(item.clone());
let plugin_output = PluginOutput::Stream(StreamMessage::Data(4, stream_data));
let plugin_output = PluginOutput::Data(4, stream_data);
let encoder = $encoder;
let mut buffer: Vec<u8> = Vec::new();
@ -498,7 +498,7 @@ macro_rules! generate_tests {
.expect("eof");
match returned {
PluginOutput::Stream(StreamMessage::Data(id, StreamData::List(list_data))) => {
PluginOutput::Data(id, StreamData::List(list_data)) => {
assert_eq!(4, id);
assert_eq!(item, list_data);
}
@ -511,7 +511,7 @@ macro_rules! generate_tests {
let data = b"Hello world";
let stream_data = StreamData::Raw(Ok(data.to_vec()));
let plugin_output = PluginOutput::Stream(StreamMessage::Data(5, stream_data));
let plugin_output = PluginOutput::Data(5, stream_data);
let encoder = $encoder;
let mut buffer: Vec<u8> = Vec::new();
@ -524,7 +524,7 @@ macro_rules! generate_tests {
.expect("eof");
match returned {
PluginOutput::Stream(StreamMessage::Data(id, StreamData::Raw(bytes))) => {
PluginOutput::Data(id, StreamData::Raw(bytes)) => {
assert_eq!(5, id);
match bytes {
Ok(bytes) => assert_eq!(data, &bytes[..]),