Add decoding support for NoData and NotificationResponse

This commit is contained in:
Zachery Gyurkovitz 2019-07-17 11:02:40 -07:00
parent c040c97cb3
commit e0d1adec88
3 changed files with 100 additions and 4 deletions

View file

@ -9,6 +9,7 @@ mod decode;
mod encode; mod encode;
mod execute; mod execute;
mod message; mod message;
mod notification_response;
mod parameter_status; mod parameter_status;
mod parse; mod parse;
mod password_message; mod password_message;
@ -30,6 +31,7 @@ pub use self::{
encode::Encode, encode::Encode,
execute::Execute, execute::Execute,
message::Message, message::Message,
notification_response::NotificationResponse,
parameter_status::ParameterStatus, parameter_status::ParameterStatus,
parse::Parse, parse::Parse,
password_message::PasswordMessage, password_message::PasswordMessage,

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
Authentication, BackendKeyData, CommandComplete, DataRow, Decode, ParameterStatus, Authentication, BackendKeyData, CommandComplete, DataRow, Decode, NotificationResponse,
ReadyForQuery, Response, RowDescription, ParameterStatus, ReadyForQuery, Response, RowDescription,
}; };
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::BytesMut; use bytes::BytesMut;
@ -15,9 +15,11 @@ pub enum Message {
CommandComplete(CommandComplete), CommandComplete(CommandComplete),
RowDescription(RowDescription), RowDescription(RowDescription),
DataRow(DataRow), DataRow(DataRow),
Response(Box<Response>), Response(Response),
NotificationResponse(NotificationResponse),
ParseComplete, ParseComplete,
BindComplete, BindComplete,
NoData,
} }
impl Message { impl Message {
@ -49,7 +51,7 @@ impl Message {
let src = src.split_to(len + 1).freeze().slice_from(5); let src = src.split_to(len + 1).freeze().slice_from(5);
Ok(Some(match token { Ok(Some(match token {
b'N' | b'E' => Message::Response(Box::new(Response::decode(src)?)), b'N' | b'E' => Message::Response(Response::decode(src)?),
b'S' => Message::ParameterStatus(ParameterStatus::decode(src)?), b'S' => Message::ParameterStatus(ParameterStatus::decode(src)?),
b'Z' => Message::ReadyForQuery(ReadyForQuery::decode(src)?), b'Z' => Message::ReadyForQuery(ReadyForQuery::decode(src)?),
b'R' => Message::Authentication(Authentication::decode(src)?), b'R' => Message::Authentication(Authentication::decode(src)?),
@ -57,8 +59,10 @@ impl Message {
b'T' => Message::RowDescription(RowDescription::decode(src)?), b'T' => Message::RowDescription(RowDescription::decode(src)?),
b'D' => Message::DataRow(DataRow::decode(src)?), b'D' => Message::DataRow(DataRow::decode(src)?),
b'C' => Message::CommandComplete(CommandComplete::decode(src)?), b'C' => Message::CommandComplete(CommandComplete::decode(src)?),
b'A' => Message::NotificationResponse(NotificationResponse::decode(src)?),
b'1' => Message::ParseComplete, b'1' => Message::ParseComplete,
b'2' => Message::BindComplete, b'2' => Message::BindComplete,
b'n' => Message::NoData,
_ => unimplemented!("decode not implemented for token: {}", token as char), _ => unimplemented!("decode not implemented for token: {}", token as char),
})) }))

View file

@ -0,0 +1,90 @@
use crate::{decode::get_str, Decode};
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use std::{fmt, io, pin::Pin, ptr::NonNull};
pub struct NotificationResponse {
#[used]
storage: Pin<Bytes>,
pid: u32,
channel_name: NonNull<str>,
message: NonNull<str>,
}
impl NotificationResponse {
#[inline]
pub fn pid(&self) -> u32 {
self.pid
}
#[inline]
pub fn channel_name(&self) -> &str {
// SAFE: Memory is pinned
unsafe { self.channel_name.as_ref() }
}
#[inline]
pub fn message(&self) -> &str {
// SAFE: Memory is pinned
unsafe { self.message.as_ref() }
}
}
// SAFE: Raw pointers point to pinned memory inside the struct
unsafe impl Send for NotificationResponse {}
unsafe impl Sync for NotificationResponse {}
impl fmt::Debug for NotificationResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("NotificationResponse")
.field("pid", &self.pid())
.field("channel_name", &self.channel_name())
.field("message", &self.message())
.finish()
}
}
impl Decode for NotificationResponse {
fn decode(src: Bytes) -> io::Result<Self> {
let storage = Pin::new(src);
let pid = BigEndian::read_u32(&*storage);
// offset from pid=4
let channel_name = get_str(&storage[4..])?;
// offset = pid + channel_name.len() + \0
let message = get_str(&storage[(4 + channel_name.len() + 1)..])?;
let channel_name = NonNull::from(channel_name);
let message = NonNull::from(message);
Ok(Self {
storage,
pid,
channel_name,
message,
})
}
}
#[cfg(test)]
mod tests {
use super::NotificationResponse;
use crate::Decode;
use bytes::Bytes;
use std::io;
const NOTIFICATION_RESPONSE: &[u8] = b"\x34\x20\x10\x02TEST-CHANNEL\0THIS IS A TEST\0";
#[test]
fn it_decodes_notification_response() -> io::Result<()> {
let src = Bytes::from_static(NOTIFICATION_RESPONSE);
let message = NotificationResponse::decode(src)?;
assert_eq!(message.pid(), 0x34201002);
assert_eq!(message.channel_name(), "TEST-CHANNEL");
assert_eq!(message.message(), "THIS IS A TEST");
Ok(())
}
}