WIP: Use BytesMut for serialization

This commit is contained in:
Daniel Akhterov 2019-06-25 19:21:02 -07:00
parent 653ead0322
commit 27a46f1202
4 changed files with 72 additions and 67 deletions

View file

@ -29,14 +29,15 @@ pub async fn establish<'a, 'b: 'a>(
}?;
conn.server_capabilities = init_packet.capabilities;
let username: &'b [u8] = &options.user.unwrap().as_bytes().clone();
let handshake = HandshakeResponsePacket {
let handshake: HandshakeResponsePacket = HandshakeResponsePacket {
// Minimum client capabilities required to establish connection
capabilities: Capabilities::CLIENT_PROTOCOL_41,
max_packet_size: 1024,
collation: 0,
extended_capabilities: Some(Capabilities::from_bits_truncate(0)),
username: Bytes::from("root"),
username: Bytes::from_static(username),
auth_data: None,
auth_response_len: None,
auth_response: None,

View file

@ -18,6 +18,7 @@ use failure::Error;
use failure::err_msg;
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use crate::protocol::serialize::serialize_length;
use bytes::BufMut;
mod establish;
// mod query;
@ -27,7 +28,7 @@ pub struct Connection {
incoming: mpsc::UnboundedReceiver<ServerMessage>,
// Buffer used when serializing outgoing messages
wbuf: Vec<u8>,
wbuf: BytesMut,
// Handle to coroutine reading messages from the stream
receiver: JoinHandle<Result<(), Error>>,
@ -43,13 +44,13 @@ pub struct Connection {
}
impl Connection {
pub async fn establish(options: ConnectOptions<'_>) -> Result<Self, Error> {
pub async fn establish(options: ConnectOptions<'static>) -> Result<Self, Error> {
let stream = TcpStream::connect((options.host, options.port)).await?;
let (reader, writer) = stream.split();
let (tx, rx) = mpsc::unbounded();
let receiver: JoinHandle<Result<(), Error>> = runtime::spawn(receiver(reader, tx));
let mut conn = Self {
wbuf: Vec::with_capacity(1024),
wbuf: BytesMut::with_capacity(1024),
writer,
receiver,
incoming: rx,
@ -70,19 +71,19 @@ impl Connection {
self.wbuf.clear();
/*
Reserve space for packet header; Packet Body Length (3 bytes) and sequence number (1 byte)
`self.wbuf.write_u32::<LittleEndian>(0_u32);`
causes compiler to panic
self.wbuf.write
rustc 1.37.0-nightly (7cdaffd79 2019-06-05) running on x86_64-unknown-linux-gnu
https://github.com/rust-lang/rust/issues/62126
*/
self.wbuf.extend_from_slice(&[0, 0, 0, 0]);
// Reserve space for packet header; Packet Body Length (3 bytes) and sequence number (1 byte)
self.wbuf.extend_from_slice(&[0; 3]);
self.wbuf.put(self.sequence_number);
self.sequence_number += 1;
message.serialize(&mut self.wbuf, &self.server_capabilities)?;
serialize_length(&mut self.wbuf);
self.wbuf[3] = self.sequence_number;
self.sequence_number += 1;
self.writer.write_all(&self.wbuf).await?;
self.writer.flush().await?;

View file

@ -12,9 +12,10 @@ use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use bytes::Bytes;
use crate::protocol::serialize::*;
use failure::Error;
use bytes::BytesMut;
pub trait Serialize {
fn serialize(&self, buf: &mut Vec<u8>, server_capabilities: &Capabilities) -> Result<(), Error>;
fn serialize(&self, buf: &mut BytesMut, server_capabilities: &Capabilities) -> Result<(), Error>;
}
pub enum TextProtocol {
@ -119,7 +120,7 @@ pub struct AuthenticationSwitchRequestPacket {
}
impl Serialize for ComQuit {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComQuit.into());
Ok(())
@ -127,7 +128,7 @@ impl Serialize for ComQuit {
}
impl Serialize for ComInitDb {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComInitDb.into());
serialize_string_null(buf, &self.schema_name);
@ -136,7 +137,7 @@ impl Serialize for ComInitDb {
}
impl Serialize for ComDebug {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComDebug.into());
Ok(())
@ -144,7 +145,7 @@ impl Serialize for ComDebug {
}
impl Serialize for ComPing {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComPing.into());
Ok(())
@ -152,7 +153,7 @@ impl Serialize for ComPing {
}
impl Serialize for ComProcessKill {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComProcessKill.into());
serialize_int_4(buf, self.process_id);
@ -161,7 +162,7 @@ impl Serialize for ComProcessKill {
}
impl Serialize for ComQuery {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComQuery.into());
serialize_string_eof(buf, &self.sql_statement);
@ -170,7 +171,7 @@ impl Serialize for ComQuery {
}
impl Serialize for ComResetConnection {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComResetConnection.into());
Ok(())
@ -178,7 +179,7 @@ impl Serialize for ComResetConnection {
}
impl Serialize for ComSetOption {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComSetOption.into());
serialize_int_2(buf, self.option.into());
@ -187,7 +188,7 @@ impl Serialize for ComSetOption {
}
impl Serialize for ComShutdown {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComShutdown.into());
serialize_int_1(buf, self.option.into());
@ -196,7 +197,7 @@ impl Serialize for ComShutdown {
}
impl Serialize for ComSleep {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComSleep.into());
Ok(())
@ -204,7 +205,7 @@ impl Serialize for ComSleep {
}
impl Serialize for ComStatistics {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, TextProtocol::ComStatistics.into());
Ok(())
@ -213,7 +214,7 @@ impl Serialize for ComStatistics {
impl Serialize for SSLRequestPacket {
fn serialize(&self, buf: &mut Vec<u8>, server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_4(buf, self.capabilities.bits() as u32);
serialize_int_4(buf, self.max_packet_size);
serialize_int_1(buf, self.collation);
@ -235,7 +236,7 @@ impl Serialize for SSLRequestPacket {
}
impl Serialize for HandshakeResponsePacket {
fn serialize(&self, buf: &mut Vec<u8>, server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_4(buf, self.capabilities.bits() as u32);
serialize_int_4(buf, self.max_packet_size);
serialize_int_1(buf, self.collation);
@ -299,7 +300,7 @@ impl Serialize for HandshakeResponsePacket {
}
impl Serialize for AuthenticationSwitchRequestPacket {
fn serialize(&self, buf: &mut Vec<u8>, _server_capabilities: &Capabilities) -> Result<(), Error> {
fn serialize(&self, buf: &mut BytesMut, _server_capabilities: &Capabilities) -> Result<(), Error> {
serialize_int_1(buf, 0xFE);
serialize_string_null(buf, &self.auth_plugin_name);
serialize_byte_eof(buf, &self.auth_plugin_data);

View file

@ -2,11 +2,13 @@ use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use bytes::Bytes;
use failure::Error;
use failure::err_msg;
use bytes::BytesMut;
use bytes::BufMut;
const U24_MAX: usize = 0xFF_FF_FF;
#[inline]
pub fn serialize_length(buf: &mut Vec<u8>) {
pub fn serialize_length(buf: &mut BytesMut) {
let mut length = [0; 3];
if buf.len() > U24_MAX {
panic!("Buffer too long");
@ -24,55 +26,55 @@ pub fn serialize_length(buf: &mut Vec<u8>) {
}
#[inline]
pub fn serialize_int_8(buf: &mut Vec<u8>, value: u64) {
buf.write_u64::<LittleEndian>(value).unwrap();
pub fn serialize_int_8(buf: &mut BytesMut, value: u64) {
LittleEndian::write_u64(buf, value);
}
#[inline]
pub fn serialize_int_4(buf: &mut Vec<u8>, value: u32) {
buf.write_u32::<LittleEndian>(value).unwrap();
pub fn serialize_int_4(buf: &mut BytesMut, value: u32) {
LittleEndian::write_u32(buf, value);
}
#[inline]
pub fn serialize_int_3(buf: &mut Vec<u8>, value: u32) {
buf.write_u24::<LittleEndian>(value).unwrap();
pub fn serialize_int_3(buf: &mut BytesMut, value: u32) {
LittleEndian::write_u24(buf, value);
}
#[inline]
pub fn serialize_int_2(buf: &mut Vec<u8>, value: u16) {
buf.write_u16::<LittleEndian>(value).unwrap();
pub fn serialize_int_2(buf: &mut BytesMut, value: u16) {
LittleEndian::write_u16(buf, value);
}
#[inline]
pub fn serialize_int_1(buf: &mut Vec<u8>, value: u8) {
buf.write_u8(value);
pub fn serialize_int_1(buf: &mut BytesMut, value: u8) {
buf.put(value);
}
#[inline]
pub fn serialize_int_lenenc(buf: &mut Vec<u8>, value: Option<&usize>) {
pub fn serialize_int_lenenc(buf: &mut BytesMut, value: Option<&usize>) {
if let Some(value) = value {
if *value > U24_MAX && *value <= std::u64::MAX as usize{
buf.write_u8(0xFE);
buf.put(0xFE_u8);
serialize_int_8(buf, *value as u64);
} else if *value > std::u16::MAX as usize && *value <= U24_MAX {
buf.write_u8(0xFD);
buf.put(0xFD_u8);
serialize_int_3(buf, *value as u32);
} else if *value > std::u8::MAX as usize && *value <= std::u16::MAX as usize{
buf.write_u8(0xFC);
buf.put(0xFC_u8);
serialize_int_2(buf, *value as u16);
} else if *value >= 0 && *value <= std::u8::MAX as usize {
buf.write_u8(0xFA);
buf.put(0xFA_u8);
serialize_int_1(buf, *value as u8);
} else {
panic!("Value is too long");
}
} else {
buf.write_u8(0xFB);
buf.put(0xFB_u8);
}
}
#[inline]
pub fn serialize_string_lenenc(buf: &mut Vec<u8>, string: &Bytes) {
pub fn serialize_string_lenenc(buf: &mut BytesMut, string: &Bytes) {
if string.len() > 0xFFF {
panic!("String inside string lenenc serialization is too long");
}
@ -84,13 +86,13 @@ pub fn serialize_string_lenenc(buf: &mut Vec<u8>, string: &Bytes) {
}
#[inline]
pub fn serialize_string_null(buf: &mut Vec<u8>, string: &Bytes) {
pub fn serialize_string_null(buf: &mut BytesMut, string: &Bytes) {
buf.extend_from_slice(string);
buf.write_u8(0);
buf.put(0_u8);
}
#[inline]
pub fn serialize_string_fix(buf: &mut Vec<u8>, bytes: &Bytes, size: usize) {
pub fn serialize_string_fix(buf: &mut BytesMut, bytes: &Bytes, size: usize) {
if size != bytes.len() {
panic!("Sizes do not match");
}
@ -99,12 +101,12 @@ pub fn serialize_string_fix(buf: &mut Vec<u8>, bytes: &Bytes, size: usize) {
}
#[inline]
pub fn serialize_string_eof(buf: &mut Vec<u8>, bytes: &Bytes) {
pub fn serialize_string_eof(buf: &mut BytesMut, bytes: &Bytes) {
buf.extend_from_slice(bytes);
}
#[inline]
pub fn serialize_byte_lenenc(buf: &mut Vec<u8>, bytes: &Bytes) {
pub fn serialize_byte_lenenc(buf: &mut BytesMut, bytes: &Bytes) {
if bytes.len() > 0xFFF {
panic!("String inside string lenenc serialization is too long");
}
@ -114,7 +116,7 @@ pub fn serialize_byte_lenenc(buf: &mut Vec<u8>, bytes: &Bytes) {
}
#[inline]
pub fn serialize_byte_fix(buf: &mut Vec<u8>, bytes: &Bytes, size: usize) {
pub fn serialize_byte_fix(buf: &mut BytesMut, bytes: &Bytes, size: usize) {
if size != bytes.len() {
panic!("Sizes do not match");
}
@ -123,7 +125,7 @@ pub fn serialize_byte_fix(buf: &mut Vec<u8>, bytes: &Bytes, size: usize) {
}
#[inline]
pub fn serialize_byte_eof(buf: &mut Vec<u8>, bytes: &Bytes) {
pub fn serialize_byte_eof(buf: &mut BytesMut, bytes: &Bytes) {
buf.extend_from_slice(bytes);
}
@ -152,7 +154,7 @@ mod tests {
#[test]
fn it_encodes_length() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
// Reserve space of length
buf.write_u24::<LittleEndian>(0);
// Sequence number; typically 0
@ -166,7 +168,7 @@ mod tests {
#[test]
fn it_encodes_int_lenenc_none() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_int_lenenc(&mut buf, None);
assert_eq!(buf, b"\xFB".to_vec());
@ -174,7 +176,7 @@ mod tests {
#[test]
fn it_encodes_int_lenenc_u8() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_int_lenenc(&mut buf, Some(&(std::u8::MAX as usize)));
assert_eq!(buf, b"\xFA\xFF".to_vec());
@ -182,7 +184,7 @@ mod tests {
#[test]
fn it_encodes_int_lenenc_u16() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_int_lenenc(&mut buf, Some(&(std::u16::MAX as usize)));
assert_eq!(buf, b"\xFC\xFF\xFF".to_vec());
@ -190,7 +192,7 @@ mod tests {
#[test]
fn it_encodes_int_lenenc_u24() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_int_lenenc(&mut buf, Some(&U24_MAX));
assert_eq!(buf, b"\xFD\xFF\xFF\xFF".to_vec());
@ -198,7 +200,7 @@ mod tests {
#[test]
fn it_encodes_int_lenenc_u64() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_int_lenenc(&mut buf, Some(&(std::u64::MAX as usize)));
assert_eq!(buf, b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF".to_vec());
@ -206,7 +208,7 @@ mod tests {
#[test]
fn it_encodes_int_u64() {
let mut buf = Vec::new();
let mut buf = BytesMut::new();
serialize_int_8(&mut buf, std::u64::MAX);
assert_eq!(buf, b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF".to_vec());
@ -215,7 +217,7 @@ mod tests {
#[test]
fn it_encodes_int_u32() {
let mut buf = Vec::new();
let mut buf = BytesMut::new();
serialize_int_4(&mut buf, std::u32::MAX);
assert_eq!(buf, b"\xFF\xFF\xFF\xFF".to_vec());
@ -224,7 +226,7 @@ mod tests {
#[test]
fn it_encodes_int_u24() {
let mut buf = Vec::new();
let mut buf = BytesMut::new();
serialize_int_3(&mut buf, U24_MAX as u32);
assert_eq!(buf.to_vec(), b"\xFF\xFF\xFF".to_vec());
@ -233,7 +235,7 @@ mod tests {
#[test]
fn it_encodes_int_u16() {
let mut buf = Vec::new();
let mut buf = BytesMut::new();
serialize_int_2(&mut buf, std::u16::MAX);
assert_eq!(buf.to_vec(), b"\xFF\xFF".to_vec());
@ -242,7 +244,7 @@ mod tests {
#[test]
fn it_encodes_int_u8() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_int_1(&mut buf, std::u8::MAX);
assert_eq!(buf, b"\xFF".to_vec());
@ -250,7 +252,7 @@ mod tests {
#[test]
fn it_encodes_string_lenenc() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_string_lenenc(&mut buf, &Bytes::from_static(b"random_string"));
assert_eq!(buf, b"\x0D\x00\x00random_string".to_vec());
@ -258,7 +260,7 @@ mod tests {
#[test]
fn it_encodes_string_fix() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_string_fix(&mut buf, &Bytes::from_static(b"random_string"), 13);
assert_eq!(buf, b"random_string".to_vec());
@ -266,7 +268,7 @@ mod tests {
#[test]
fn it_encodes_string_null() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_string_null(&mut buf, &Bytes::from_static(b"random_string"));
assert_eq!(buf, b"random_string\0".to_vec());
@ -275,7 +277,7 @@ mod tests {
#[test]
fn it_encodes_string_eof() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_string_eof(&mut buf, &Bytes::from_static(b"random_string"));
assert_eq!(buf, b"random_string".to_vec());
@ -283,7 +285,7 @@ mod tests {
#[test]
fn it_encodes_byte_lenenc() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_byte_lenenc(&mut buf, &Bytes::from("random_string"));
assert_eq!(buf, b"\x0D\x00\x00random_string".to_vec());
@ -291,7 +293,7 @@ mod tests {
#[test]
fn it_encodes_byte_fix() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_byte_fix(&mut buf, &Bytes::from("random_string"), 13);
assert_eq!(buf, b"random_string".to_vec());
@ -299,7 +301,7 @@ mod tests {
#[test]
fn it_encodes_byte_eof() {
let mut buf: Vec<u8> = Vec::new();
let mut buf = BytesMut::new();
serialize_byte_eof(&mut buf, &Bytes::from("random_string"));
assert_eq!(buf, b"random_string".to_vec());