WIP: MariaDb fetch

This commit is contained in:
Daniel Akhterov 2019-12-02 20:47:38 -08:00
parent f8f71b1b70
commit 55da9daaf1
7 changed files with 157 additions and 193 deletions

View file

@ -1,3 +1,5 @@
#![recursion_limit="256"]
#[macro_use]
mod macros;

View file

@ -2,199 +2,34 @@ use super::{MariaDb, MariaDbQueryParameters, MariaDbRow};
use crate::{
backend::Backend,
describe::{Describe, ResultField},
mariadb::protocol::{
Capabilities, ColumnCountPacket, ColumnDefinitionPacket, ComStmtExecute, EofPacket,
ErrPacket, OkPacket, ResultRow, StmtExecFlag,
},
};
use futures_core::stream::BoxStream;
use futures_core::future::BoxFuture;
use crate::url::Url;
impl Backend for MariaDb {
type QueryParameters = MariaDbQueryParameters;
type Row = MariaDbRow;
type TableIdent = String;
async fn open(url: &str) -> crate::Result<Self>
where
Self: Sized,
{
MariaDb::open(url).await
}
fn open(url: &str) -> BoxFuture<'static, crate::Result<Self>> {
let url = Url::parse(url);
async fn close(mut self) -> crate::Result<()> {
self.close().await
}
async fn ping(&mut self) -> crate::Result<()> {
self.ping().await
}
async fn execute(&mut self, query: &str, params: MariaDbQueryParameters) -> crate::Result<u64> {
// Write prepare statement to buffer
self.start_sequence();
let prepare_ok = self.send_prepare(query).await?;
// SEND ================
self.start_sequence();
self.execute(prepare_ok.statement_id, params).await?;
// =====================
// Row Counter, used later
let mut rows = 0u64;
let capabilities = self.capabilities;
let has_eof = capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF);
let packet = self.receive().await?;
if packet[0] == 0x00 {
let _ok = OkPacket::decode(packet, capabilities)?;
} else if packet[0] == 0xFF {
return ErrPacket::decode(packet)?.expect_error();
} else {
// A Resultset starts with a [ColumnCountPacket] which is a single field that encodes
// how many columns we can expect when fetching rows from this statement
let column_count: u64 = ColumnCountPacket::decode(packet)?.columns;
// Next we have a [ColumnDefinitionPacket] which verbosely explains each minute
// detail about the column in question including table, aliasing, and type
// TODO: This information was *already* returned by PREPARE .., is there a way to suppress generation
let mut columns = vec![];
for _ in 0..column_count {
columns.push(ColumnDefinitionPacket::decode(self.receive().await?)?);
}
// When (legacy) EOFs are enabled, the fixed number column definitions are further terminated by
// an EOF packet
if !has_eof {
let _eof = EofPacket::decode(self.receive().await?)?;
}
// For each row in the result set we will receive a ResultRow packet.
// We may receive an [OkPacket], [EofPacket], or [ErrPacket] (depending on if EOFs are enabled) to finalize the iteration.
loop {
let packet = self.receive().await?;
if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
// NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows)
// but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF.
if !has_eof {
let _eof = EofPacket::decode(packet)?;
} else {
let _ok = OkPacket::decode(packet, capabilities)?;
}
break;
} else if packet[0] == 0xFF {
let err = ErrPacket::decode(packet)?;
panic!("received db err = {:?}", err);
} else {
// Ignore result rows; exec only returns number of affected rows;
let _ = ResultRow::decode(packet, &columns)?;
// For every row we decode we increment counter
rows = rows + 1;
}
}
}
Ok(rows)
}
fn fetch(
&mut self,
_query: &str,
_params: MariaDbQueryParameters,
) -> BoxStream<'_, crate::Result<Self::Row>> {
Box::pin(async_stream::try_stream! {
// Write prepare statement to buffer
self.start_sequence();
let prepare_ok = self.send_prepare(query).await?;
self.start_sequence();
self.execute(prepare_ok.statement_id, params).await?;
let capabilities = self.capabilities;
let has_eof = capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF);
let packet = self.receive().await?;
if packet[0] == 0x00 {
let _ok = OkPacket::decode(packet, capabilities)?;
} else if packet[0] == 0xFF {
return ErrPacket::decode(packet)?.expect_error();
}
// A Resultset starts with a [ColumnCountPacket] which is a single field that encodes
// how many columns we can expect when fetching rows from this statement
// let column_count: u64 = ColumnCountPacket::decode(packet)?.columns;
// Next we have a [ColumnDefinitionPacket] which verbosely explains each minute
// detail about the column in question including table, aliasing, and type
// TODO: This information was *already* returned by PREPARE .., is there a way to suppress generation
let mut columns = vec![];
for _ in 0..column_count {
columns.push(ColumnDefinitionPacket::decode(self.receive().await?)?);
}
// When (legacy) EOFs are enabled, the fixed number column definitions are further terminated by
// an EOF packet
// if !has_eof {
// let _eof = EofPacket::decode(self.receive().await?)?;
// }
// loop {
// let packet = self.receive().await?;
// if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
// if !has_eof {
// let _eof = EofPacket::decode(packet)?;
// } else {
// let _ok = OkPacket::decode(packet, capabilities)?;
// }
// break;
// } else if packet[0] == 0xFF {
// let err = ErrPacket::decode(packet)?;
// panic!("received db err = {:?}", err);
// } else {
// yield ResultRow::decode(packet, &columns);
// }
// }
Box::pin(async move {
let url = url?;
MariaDb::open(url).await
})
}
async fn fetch_optional(
&mut self,
_query: &str,
_params: MariaDbQueryParameters,
) -> crate::Result<Option<Self::Row>> {
unimplemented!();
}
async fn describe(&mut self, query: &str) -> crate::Result<Describe<MariaDb>> {
let prepare_ok = self.send_prepare(query).await?;
let mut param_types = Vec::with_capacity(prepare_ok.params as usize);
for _ in 0..prepare_ok.params {
let param = ColumnDefinitionPacket::decode(self.receive().await?)?;
param_types.push(param.field_type.0);
}
self.check_eof().await?;
let mut columns = Vec::with_capacity(prepare_ok.columns as usize);
for _ in 0..prepare_ok.columns {
let column = ColumnDefinitionPacket::decode(self.receive().await?)?;
columns.push(ResultField {
name: column.column_alias.or(column.column),
table_id: column.table_alias.or(column.table),
type_id: column.field_type.0,
})
}
self.check_eof().await?;
Ok(Describe {
param_types,
result_fields: columns,
fn close(mut self) -> BoxFuture<'static, crate::Result<()>> {
Box::pin(async move {
self.close().await
})
}
// async fn ping(&mut self) -> crate::Result<()> {
// self.ping().await
// }
}
impl_from_row_for_backend!(MariaDb);

View file

@ -17,7 +17,7 @@ use std::{
io,
net::{IpAddr, SocketAddr},
};
use url::{quirks::protocol, Url};
use crate::url::Url;
pub struct MariaDb {
pub(crate) stream: BufStream<TcpStream>,
@ -27,12 +27,10 @@ pub struct MariaDb {
}
impl MariaDb {
pub async fn open(url: &str) -> Result<Self> {
pub async fn open(url: Url) -> Result<Self> {
// TODO: Handle errors
let url = Url::parse(url).unwrap();
let host = url.host_str().unwrap_or("127.0.0.1");
let port = url.port().unwrap_or(3306);
let host = url.host();
let port = url.port(3306);
// TODO: handle errors
let host: IpAddr = host.parse().unwrap();
@ -221,9 +219,10 @@ impl MariaDb {
}
pub(super) async fn column_definitions(
&mut self,
packet: &[u8],
&mut self
) -> Result<Vec<ColumnDefinitionPacket>> {
let packet = self.receive().await?;
// A Resultset starts with a [ColumnCountPacket] which is a single field that encodes
// how many columns we can expect when fetching rows from this statement
let column_count: u64 = ColumnCountPacket::decode(packet)?.columns;
@ -248,7 +247,7 @@ impl MariaDb {
Ok(columns)
}
pub(super) async fn execute(
pub(super) async fn send_execute(
&mut self,
statement_id: u32,
_params: MariaDbQueryParameters,

View file

@ -5,7 +5,7 @@ use crate::{
},
Result,
};
use url::Url;
use crate::url::Url;
pub(crate) async fn establish(conn: &mut MariaDb, url: &Url) -> Result<()> {
let initial = InitialHandshakePacket::decode(conn.receive().await?)?;
@ -25,7 +25,7 @@ pub(crate) async fn establish(conn: &mut MariaDb, url: &Url) -> Result<()> {
max_packet_size: 1024,
client_collation: 192, // utf8_unicode_ci
username: url.username(),
database: &url.path()[1..],
database: &url.database(),
auth_data: None,
auth_plugin_name: None,
connection_attrs: &[],

View file

@ -0,0 +1,126 @@
use super::{MariaDb, MariaDbQueryParameters, MariaDbRow};
use crate::{
backend::Backend,
describe::{Describe, ResultField},
executor::Executor,
params::{IntoQueryParameters, QueryParameters},
mariadb::protocol::{
Capabilities, ColumnCountPacket, ColumnDefinitionPacket, ComStmtExecute, EofPacket,
ErrPacket, OkPacket, ResultRow, StmtExecFlag,
},
row::FromRow,
url::Url,
};
use futures_core::{future::BoxFuture, stream::BoxStream};
impl Executor for MariaDb {
type Backend = Self;
fn execute<'e, 'q: 'e, I: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'e, crate::Result<u64>>
where
I: IntoQueryParameters<Self::Backend> + Send,
{
let params = params.into_params();
Box::pin(async move {
let prepare = self.send_prepare(query).await?;
self.send_execute(prepare.statement_id, params);
let columns = self.column_definitions().await?;
let capabilities = self.capabilities;
// For each row in the result set we will receive a ResultRow packet.
// We may receive an [OkPacket], [EofPacket], or [ErrPacket] (depending on if EOFs are enabled) to finalize the iteration.
let mut rows = 0u64;
loop {
let packet = self.receive().await?;
if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
// NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows)
// but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF.
if !capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) {
let _eof = EofPacket::decode(packet)?;
} else {
let _ok = OkPacket::decode(packet, capabilities)?;
}
break;
} else if packet[0] == 0xFF {
let err = ErrPacket::decode(packet)?;
panic!("received db err = {:?}", err);
} else {
// Ignore result rows; exec only returns number of affected rows;
let _ = ResultRow::decode(packet, &columns)?;
// For every row we decode we increment counter
rows = rows + 1;
}
}
Ok(rows)
})
}
fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxStream<'e, crate::Result<T>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send + Unpin,
{
let params = params.into_params();
Box::pin(async_stream::try_stream! {
let prepare = self.send_prepare(query).await?;
self.send_execute(prepare.statement_id, params);
let columns = self.column_definitions().await?;
let capabilities = self.capabilities;
loop {
let packet = self.receive().await?;
if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
// NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows)
// but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF.
if !capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) {
let _eof = EofPacket::decode(packet)?;
} else {
let _ok = OkPacket::decode(packet, capabilities)?;
}
break;
} else if packet[0] == 0xFF {
let _err = ErrPacket::decode(packet)?;
panic!("ErrPacket received");
} else {
let row = ResultRow::decode(packet, &columns)?;
yield FromRow::from_row(row);
}
}
})
}
fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'e, crate::Result<Option<T>>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send,
{
unimplemented!();
}
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Backend>>> {
unimplemented!();
}
}

View file

@ -2,6 +2,7 @@ mod backend;
mod connection;
mod error;
mod establish;
mod executor;
mod io;
mod protocol;
mod query;

View file

@ -1,11 +1,12 @@
use crate::{
mariadb::{protocol::ResultRow, MariaDb},
row::RawRow,
row::Row,
};
pub struct MariaDbRow(pub(super) ResultRow);
#[derive(Debug)]
pub struct MariaDbRow(pub(crate) ResultRow);
impl RawRow for MariaDbRow {
impl Row for MariaDbRow {
type Backend = MariaDb;
#[inline]