implement Executor::send()

This commit is contained in:
Austin Bonander 2019-12-19 13:08:15 -08:00 committed by Ryan Leckey
parent 56875a8931
commit 5192983093
6 changed files with 65 additions and 14 deletions

View file

@ -73,4 +73,9 @@ pub trait Executor: Send {
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Backend>>>;
/// Send a semicolon-delimited series of arbitrary SQL commands to the server.
///
/// Does not support fetching results.
fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>>;
}

View file

@ -19,6 +19,7 @@ use crate::{Describe, Error, io::{Buf, BufMut, BufStream}, mysql::{
use super::establish;
use crate::mysql::MySql;
use crate::mysql::protocol::ComQuery;
pub type StatementId = u32;
@ -306,4 +307,40 @@ impl Connection {
Ok(())
}
async fn expect_eof_or_err(&mut self) -> crate::Result<()> {
let packet = self.receive().await?;
match buf[0] {
0xFE => EofPacket::decode(packet)?,
0xFF => ErrPacket::decode(packet)?.expect_error()?,
_ => return Err(protocol_err!("expected EOF or ERR, got {:02X}", buf[0]).into()),
}
Ok(())
}
pub(super) async fn send_raw(
&mut self,
commands: &str
) -> Result<()> {
self.stream.flush().await?;
self.start_sequence();
// enable multi-statement only for this query
self.write(ComSetOption { option: SetOptionOptions::MySqlOptionMultiStatementsOn });
self.write(ComQuery { sql_statement: commands });
self.write(ComSetOption { option: SetOptionOptions::MySqlOptionMultiStatementsOff });
self.stream.flush().await?;
self.expect_eof_or_err().await?;
let packet = self.receive().await?;
if packet[0] == 0xFF { return ErrPacket::decode(packet)?.expect_error() }
/// otherwise ignore packet
self.expect_eof_or_err().await?;
Ok(())
}
}

View file

@ -152,4 +152,8 @@ impl Executor for Connection {
) -> BoxFuture<'e, crate::Result<Describe<Self::Backend>>> {
Box::pin(self.conn.prepare_describe(query))
}
fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> {
unimplemented!()
}
}

View file

@ -1,9 +1,7 @@
use crate::{
backend::Backend, describe::Describe, executor::Executor, params::IntoQueryParameters,
pool::Pool, row::FromRow,
};
use futures_core::{future::BoxFuture, stream::BoxStream};
use crate::{backend::Backend, describe::Describe, executor::Executor, params::IntoQueryParameters, pool::Pool, row::FromRow, Error};
use futures_core::{future::BoxFuture, stream::BoxStream, Future};
use futures_util::StreamExt;
use bitflags::_core::pin::Pin;
impl<DB> Executor for Pool<DB>
where
@ -107,4 +105,8 @@ where
) -> BoxFuture<'e, crate::Result<Describe<Self::Backend>>> {
Box::pin(async move { self.acquire().await?.describe(query).await })
}
fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> {
Box::pin(async move { self.acquire().await?.batch_exec(commands).await })
}
}

View file

@ -180,6 +180,11 @@ impl Connection {
protocol::Execute { portal, limit }.encode(self.stream.buffer_mut());
}
pub(super) async fn send(&mut self, commands: &str) -> crate::Result<()> {
protocol::Query(commands).encode(self.stream.buffer_mut());
self.sync().await
}
pub(super) async fn sync(&mut self) -> crate::Result<()> {
protocol::Sync.encode(self.stream.buffer_mut());

View file

@ -1,14 +1,8 @@
use super::{connection::Step, Connection, Postgres};
use crate::{
backend::Backend,
describe::{Describe, ResultField},
executor::Executor,
params::{IntoQueryParameters, QueryParameters},
row::FromRow,
url::Url,
};
use futures_core::{future::BoxFuture, stream::BoxStream};
use crate::{backend::Backend, describe::{Describe, ResultField}, executor::Executor, params::{IntoQueryParameters, QueryParameters}, row::FromRow, url::Url, Error};
use futures_core::{future::BoxFuture, stream::BoxStream, Future};
use crate::postgres::query::PostgresQueryParameters;
use bitflags::_core::pin::Pin;
impl Connection {
async fn prepare_cached(&mut self, query: &str, params: &PostgresQueryParameters) -> crate::Result<String> {
@ -159,4 +153,8 @@ impl Executor for Connection {
})
})
}
fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> {
Box::pin(self.conn.send(commands))
}
}