Connection Established!

This commit is contained in:
Daniel Akhterov 2019-06-22 02:06:03 -07:00 committed by Daniel Akhterov
parent 2a2941e1a5
commit 3a07e393e8
2 changed files with 57 additions and 72 deletions

View file

@ -15,6 +15,7 @@ use runtime::{net::TcpStream, task::JoinHandle};
use std::io;
use failure::Error;
use failure::err_msg;
use byteorder::{ByteOrder, LittleEndian};
mod establish;
// mod query;
@ -73,6 +74,7 @@ async fn receiver(
) -> Result<(), Error> {
let mut rbuf = BytesMut::with_capacity(0);
let mut len = 0;
let mut first_packet = true;
loop {
// This uses an adaptive system to extend the vector when it fills. We want to
@ -96,17 +98,35 @@ async fn receiver(
// TODO: Need a select! on a channel that I can trigger to cancel this
let bytes_read = reader.read(&mut rbuf[len..]).await?;
// Read 0 bytes from the server; end-of-stream
if bytes_read <= 0 {
if bytes_read > 0 {
len += bytes_read;
} else {
// Read 0 bytes from the server; end-of-stream
break;
}
while bytes_read > 0 {
// let message = ServerMessage::init(&mut rbuf)?;
// println!("{:?}", rbuf);
// let message = InitialHandshakePacket::deserialize(&mut rbuf.to_vec())?;
sender.send(ServerMessage::InitialHandshakePacket(InitialHandshakePacket::default())).await?;
// len += bytes_read;
while len > 0 {
let size = rbuf.len();
println!("Buffer: {:?}", rbuf);
let message = if first_packet {
println!("init");
ServerMessage::init(&mut rbuf)?
} else {
println!("deser");
ServerMessage::deserialize(&mut rbuf)?
};
println!("Message: {:?}", message);
len -= size - rbuf.len();
if let Some(message) = message {
first_packet = false;
sender.send(message).await.unwrap();
} else {
// Did not receive enough bytes to
// deserialize a complete message
break;
}
}

View file

@ -17,6 +17,7 @@ pub enum Message {
}
bitflags! {
// 1111011111111110
pub struct Capabilities: u128 {
const CLIENT_MYSQL = 1;
const FOUND_ROWS = 2;
@ -183,8 +184,11 @@ impl Message {
// let sequence_number = buf[3];
Ok(None)
}
pub fn init(buf: &mut BytesMut) -> Result<Self, Error> {
Ok(Message::InitialHandshakePacket(InitialHandshakePacket::deserialize(&mut buf.to_vec())?))
pub fn init(buf: &mut BytesMut) -> Result<Option<Self>, Error> {
match InitialHandshakePacket::deserialize(&mut buf.to_vec()) {
Ok(v) => Ok(Some(Message::InitialHandshakePacket(v))),
Err(_) => Ok(None),
}
}
}
@ -194,7 +198,7 @@ impl Deserialize for InitialHandshakePacket {
let length = deserialize_int_3(&buf, &mut index);
if buf.len() != length as usize {
if buf.len() < length as usize {
return Err(err_msg("Lengths to do not match"));
}
@ -213,14 +217,13 @@ impl Deserialize for InitialHandshakePacket {
index += 1;
let mut capabilities =
Capabilities::from_bits(deserialize_int_2(&buf, &mut index).into()).unwrap();
Capabilities::from_bits_truncate(deserialize_int_2(&buf, &mut index).into());
let collation = deserialize_int_1(&buf, &mut index);
let status = deserialize_int_2(&buf, &mut index);
capabilities |=
Capabilities::from_bits(((deserialize_int_2(&buf, &mut index) as u32) << 16).into())
.unwrap();
Capabilities::from_bits_truncate(((deserialize_int_2(&buf, &mut index) as u32) << 16).into());
let mut plugin_data_length = 0;
if !(capabilities & Capabilities::PLUGIN_AUTH).is_empty() {
@ -234,10 +237,9 @@ impl Deserialize for InitialHandshakePacket {
index += 6;
if (capabilities & Capabilities::CLIENT_MYSQL).is_empty() {
capabilities |= Capabilities::from_bits(
capabilities |= Capabilities::from_bits_truncate(
((deserialize_int_4(&buf, &mut index) as u128) << 32).into(),
)
.unwrap();
);
} else {
// Skip filler
index += 4;
@ -292,7 +294,7 @@ impl Deserialize for OkPacket {
let affected_rows = deserialize_int_lenenc(&buf, &mut index);
let last_insert_id = deserialize_int_lenenc(&buf, &mut index);
let server_status = ServerStatusFlag::from_bits(deserialize_int_2(&buf, &mut index).into()).unwrap();
let server_status = ServerStatusFlag::from_bits_truncate(deserialize_int_2(&buf, &mut index).into());
let warning_count = deserialize_int_2(&buf, &mut index);
// Assuming CLIENT_SESSION_TRACK is unsupported
@ -376,70 +378,33 @@ mod test {
#[test]
fn it_decodes_capabilities() {
let buf = b"\x00\x10".to_vec();
let buf = b"\xfe\xf7".to_vec();
let mut index = 0;
Capabilities::from_bits(deserialize_int_2(&buf, &mut index).into()).unwrap();
Capabilities::from_bits_truncate(deserialize_int_2(&buf, &mut index).into());
}
#[test]
fn it_decodes_initialhandshakepacket() -> Result<(), Error> {
let mut buf = b"\
\x54\x00\x00\
\0\
\x01\
5.5.5-7\0\
\x01\0\0\0\
authseed\
\0\
\x00\x20\
\0\
\x00\x00\
\x08\x00\
\x0A\
\0\0\0\0\0\0\
\x01\x00\x00\x00\
scrambled2nd\
\0\
authentication_plugin_name\0\
"
.to_vec();
let message = InitialHandshakePacket::deserialize(&mut buf)?;
assert_eq!(message.protocol_version, 1);
assert_eq!(message.server_version, b"5.5.5-7".to_vec());
assert_eq!(message.auth_seed, b"authseed".to_vec());
assert_eq!(message.scramble, Some(Bytes::from(b"scrambled2nd".to_vec())));
assert_eq!(
message.auth_plugin_name,
Some(Bytes::from(b"authentication_plugin_name".to_vec()))
);
assert!(!(message.capabilities & Capabilities::SECURE_CONNECTION).is_empty());
assert!(!(message.capabilities & Capabilities::PLUGIN_AUTH).is_empty());
assert!(!(message.capabilities & Capabilities::MARIA_DB_CLIENT_PROGRESS).is_empty());
Ok(())
}
#[test]
fn it_decodes_initialhandshakepacket_real() -> Result<(), Error> {
let mut buf = b"\
n\0\0\
\0\
\n\
5.5.5-10.4.6-MariaDB-1:10.4\0".to_vec();
5.5.5-10.4.6-MariaDB-1:10.4.6+maria~bionic\0\
\x13\0\0\0\
?~~|vZAu\
\0\
\xfe\xf7\
\x08\
\x02\0\
\xff\x81\
\x15\
\0\0\0\0\0\0\
\x07\0\0\0\
JQ8cihP4Q}Dx\
\0\
mysql_native_password\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0".to_vec();
let message = InitialHandshakePacket::deserialize(&mut buf)?;
// assert_eq!(message.protocol_version, 1);
// assert_eq!(message.server_version, b"5.5.5-7".to_vec());
// assert_eq!(message.auth_seed, b"authseed".to_vec());
// assert_eq!(message.scramble, Some(Bytes::from(b"scrambled2nd".to_vec())));
// assert_eq!(
// message.auth_plugin_name,
// Some(Bytes::from(b"authentication_plugin_name".to_vec()))
// );
// assert!(!(message.capabilities & Capabilities::SECURE_CONNECTION).is_empty());
// assert!(!(message.capabilities & Capabilities::PLUGIN_AUTH).is_empty());
// assert!(!(message.capabilities & Capabilities::MARIA_DB_CLIENT_PROGRESS).is_empty());
let _message = InitialHandshakePacket::deserialize(&mut buf)?;
Ok(())
}