From e0d1adec889fb1ba1aa31cf05c2f1cae73d91326 Mon Sep 17 00:00:00 2001 From: Zachery Gyurkovitz Date: Wed, 17 Jul 2019 11:02:40 -0700 Subject: [PATCH] Add decoding support for NoData and NotificationResponse --- sqlx-postgres-protocol/src/lib.rs | 2 + sqlx-postgres-protocol/src/message.rs | 12 ++- .../src/notification_response.rs | 90 +++++++++++++++++++ 3 files changed, 100 insertions(+), 4 deletions(-) create mode 100644 sqlx-postgres-protocol/src/notification_response.rs diff --git a/sqlx-postgres-protocol/src/lib.rs b/sqlx-postgres-protocol/src/lib.rs index f6111756..1c10fe24 100644 --- a/sqlx-postgres-protocol/src/lib.rs +++ b/sqlx-postgres-protocol/src/lib.rs @@ -9,6 +9,7 @@ mod decode; mod encode; mod execute; mod message; +mod notification_response; mod parameter_status; mod parse; mod password_message; @@ -30,6 +31,7 @@ pub use self::{ encode::Encode, execute::Execute, message::Message, + notification_response::NotificationResponse, parameter_status::ParameterStatus, parse::Parse, password_message::PasswordMessage, diff --git a/sqlx-postgres-protocol/src/message.rs b/sqlx-postgres-protocol/src/message.rs index 300fd64f..39aea9c8 100644 --- a/sqlx-postgres-protocol/src/message.rs +++ b/sqlx-postgres-protocol/src/message.rs @@ -1,6 +1,6 @@ use crate::{ - Authentication, BackendKeyData, CommandComplete, DataRow, Decode, ParameterStatus, - ReadyForQuery, Response, RowDescription, + Authentication, BackendKeyData, CommandComplete, DataRow, Decode, NotificationResponse, + ParameterStatus, ReadyForQuery, Response, RowDescription, }; use byteorder::{BigEndian, ByteOrder}; use bytes::BytesMut; @@ -15,9 +15,11 @@ pub enum Message { CommandComplete(CommandComplete), RowDescription(RowDescription), DataRow(DataRow), - Response(Box), + Response(Response), + NotificationResponse(NotificationResponse), ParseComplete, BindComplete, + NoData, } impl Message { @@ -49,7 +51,7 @@ impl Message { let src = src.split_to(len + 1).freeze().slice_from(5); Ok(Some(match token { - b'N' | b'E' => Message::Response(Box::new(Response::decode(src)?)), + b'N' | b'E' => Message::Response(Response::decode(src)?), b'S' => Message::ParameterStatus(ParameterStatus::decode(src)?), b'Z' => Message::ReadyForQuery(ReadyForQuery::decode(src)?), b'R' => Message::Authentication(Authentication::decode(src)?), @@ -57,8 +59,10 @@ impl Message { b'T' => Message::RowDescription(RowDescription::decode(src)?), b'D' => Message::DataRow(DataRow::decode(src)?), b'C' => Message::CommandComplete(CommandComplete::decode(src)?), + b'A' => Message::NotificationResponse(NotificationResponse::decode(src)?), b'1' => Message::ParseComplete, b'2' => Message::BindComplete, + b'n' => Message::NoData, _ => unimplemented!("decode not implemented for token: {}", token as char), })) diff --git a/sqlx-postgres-protocol/src/notification_response.rs b/sqlx-postgres-protocol/src/notification_response.rs new file mode 100644 index 00000000..7f4875f7 --- /dev/null +++ b/sqlx-postgres-protocol/src/notification_response.rs @@ -0,0 +1,90 @@ +use crate::{decode::get_str, Decode}; +use byteorder::{BigEndian, ByteOrder}; +use bytes::Bytes; + +use std::{fmt, io, pin::Pin, ptr::NonNull}; + +pub struct NotificationResponse { + #[used] + storage: Pin, + pid: u32, + channel_name: NonNull, + message: NonNull, +} + +impl NotificationResponse { + #[inline] + pub fn pid(&self) -> u32 { + self.pid + } + + #[inline] + pub fn channel_name(&self) -> &str { + // SAFE: Memory is pinned + unsafe { self.channel_name.as_ref() } + } + + #[inline] + pub fn message(&self) -> &str { + // SAFE: Memory is pinned + unsafe { self.message.as_ref() } + } +} + +// SAFE: Raw pointers point to pinned memory inside the struct +unsafe impl Send for NotificationResponse {} +unsafe impl Sync for NotificationResponse {} + +impl fmt::Debug for NotificationResponse { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("NotificationResponse") + .field("pid", &self.pid()) + .field("channel_name", &self.channel_name()) + .field("message", &self.message()) + .finish() + } +} + +impl Decode for NotificationResponse { + fn decode(src: Bytes) -> io::Result { + let storage = Pin::new(src); + let pid = BigEndian::read_u32(&*storage); + + // offset from pid=4 + let channel_name = get_str(&storage[4..])?; + + // offset = pid + channel_name.len() + \0 + let message = get_str(&storage[(4 + channel_name.len() + 1)..])?; + + let channel_name = NonNull::from(channel_name); + let message = NonNull::from(message); + + Ok(Self { + storage, + pid, + channel_name, + message, + }) + } +} + +#[cfg(test)] +mod tests { + use super::NotificationResponse; + use crate::Decode; + use bytes::Bytes; + use std::io; + + const NOTIFICATION_RESPONSE: &[u8] = b"\x34\x20\x10\x02TEST-CHANNEL\0THIS IS A TEST\0"; + + #[test] + fn it_decodes_notification_response() -> io::Result<()> { + let src = Bytes::from_static(NOTIFICATION_RESPONSE); + let message = NotificationResponse::decode(src)?; + + assert_eq!(message.pid(), 0x34201002); + assert_eq!(message.channel_name(), "TEST-CHANNEL"); + assert_eq!(message.message(), "THIS IS A TEST"); + Ok(()) + } +} \ No newline at end of file