Have Backend require Executor and de-duplicate some logic

This commit is contained in:
Ryan Leckey 2019-11-27 23:26:20 -08:00
parent d307e3cccf
commit 2227303f20
21 changed files with 361 additions and 340 deletions

View file

@ -1,6 +1,5 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use sqlx::postgres::protocol::{Bind, DataRow, RowDescription};
use sqlx::postgres::protocol::{Decode, Encode};
use sqlx::postgres::protocol::{Bind, DataRow, Decode, Encode, RowDescription};
fn bench(c: &mut Criterion) {
c.bench_function("decode_data_row", |b| {

View file

@ -1,9 +1,6 @@
use sqlx::{FromRow, Pool, Postgres};
use std::env;
use tide::http::StatusCode;
use tide::Request;
use tide::Response;
use tide::ResultExt;
use tide::{http::StatusCode, Request, Response, ResultExt};
#[async_std::main]
async fn main() -> anyhow::Result<()> {

View file

@ -1 +0,0 @@
stable

2
rustfmt.toml Normal file
View file

@ -0,0 +1,2 @@
unstable_features = true
merge_imports = true

View file

@ -18,7 +18,6 @@ mariadb = []
[dependencies]
async-std = { version = "1.1.0", features = ["attributes", "unstable"] }
async-stream = "0.2.0"
async-trait = "0.1.18"
bitflags = "1.2.1"
byteorder = { version = "1.3.2", default-features = false }
bytes = "0.4.12"

View file

@ -1,7 +1,8 @@
use crate::describe::Describe;
use crate::{params::QueryParameters, row::RawRow, types::HasTypeMetadata};
use async_trait::async_trait;
use futures_core::stream::BoxStream;
use crate::{
describe::Describe, executor::Executor, params::QueryParameters, row::Row,
types::HasTypeMetadata,
};
use futures_core::future::BoxFuture;
/// A database backend.
///
@ -9,22 +10,22 @@ use futures_core::stream::BoxStream;
/// important related traits as associated types.
///
/// This trait is not intended to be used directly.
/// Instead [sqlx::Connection] or [sqlx::Pool] should be used instead,
/// which provide concurrent access and typed retrieval of results.
#[async_trait]
pub trait Backend: HasTypeMetadata + Send + Sync + Sized {
/// Instead [sqlx::Connection] or [sqlx::Pool] should be used instead.
pub trait Backend:
Executor<Backend = Self> + HasTypeMetadata + Send + Sync + Sized + 'static
{
/// The concrete `QueryParameters` implementation for this backend.
type QueryParameters: QueryParameters<Backend = Self>;
/// The concrete `Row` implementation for this backend.
type Row: RawRow<Backend = Self>;
type Row: Row<Backend = Self>;
/// The identifier for tables; in Postgres this is an `oid` while
/// in MariaDB/MySQL this is the qualified name of the table.
type TableIdent;
/// Establish a new connection to the database server.
async fn open(url: &str) -> crate::Result<Self>
fn open(url: &str) -> BoxFuture<'static, crate::Result<Self>>
where
Self: Sized;
@ -32,30 +33,5 @@ pub trait Backend: HasTypeMetadata + Send + Sync + Sized {
///
/// This method is not required to be called. A database server will
/// eventually notice and clean up not fully closed connections.
async fn close(mut self) -> crate::Result<()>;
async fn ping(&mut self) -> crate::Result<()> {
// TODO: Does this need to be specialized for any database backends?
let _ = self
.execute("SELECT 1", Self::QueryParameters::new())
.await?;
Ok(())
}
async fn describe(&mut self, query: &str) -> crate::Result<Describe<Self>>;
async fn execute(&mut self, query: &str, params: Self::QueryParameters) -> crate::Result<u64>;
fn fetch(
&mut self,
query: &str,
params: Self::QueryParameters,
) -> BoxStream<'_, crate::Result<Self::Row>>;
async fn fetch_optional(
&mut self,
query: &str,
params: Self::QueryParameters,
) -> crate::Result<Option<Self::Row>>;
fn close(self) -> BoxFuture<'static, crate::Result<()>>;
}

View file

@ -5,12 +5,15 @@ use crate::{
executor::Executor,
params::IntoQueryParameters,
pool::{Live, SharedPool},
row::FromRow,
row::Row,
row::{FromRow, Row},
};
use futures_core::{future::BoxFuture, stream::BoxStream};
use futures_util::stream::StreamExt;
use std::{sync::Arc, time::Instant};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::Instant,
};
pub struct Connection<DB>
where
@ -29,21 +32,7 @@ where
}
pub async fn open(url: &str) -> crate::Result<Self> {
let raw = DB::open(url).await?;
Ok(Self::new(Live::unpooled(raw), None))
}
/// Verifies a connection to the database is still alive.
pub async fn ping(&mut self) -> crate::Result<()> {
self.live.ping().await
}
/// Analyze the SQL statement and report the inferred bind parameter types and returned
/// columns.
///
/// Mainly intended for use by sqlx-macros.
pub async fn describe(&mut self, statement: &str) -> crate::Result<Describe<DB>> {
self.live.describe(statement).await
Ok(Self::new(Live::unpooled(DB::open(url).await?), None))
}
}
@ -53,51 +42,45 @@ where
{
type Backend = DB;
fn execute<'c, 'q: 'c, I: 'c>(
&'c mut self,
fn execute<'e, 'q: 'e, I: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<u64, Error>>
) -> BoxFuture<'e, crate::Result<u64>>
where
I: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move { self.live.execute(query, params.into_params()).await })
self.live.execute(query, params)
}
fn fetch<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxStream<'c, Result<T, Error>>
) -> BoxStream<'e, crate::Result<T>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send + Unpin,
{
Box::pin(async_stream::try_stream! {
let mut s = self.live.fetch(query, params.into_params());
while let Some(row) = s.next().await.transpose()? {
yield T::from_row(Row(row));
}
})
self.live.fetch(query, params)
}
fn fetch_optional<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<Option<T>, Error>>
) -> BoxFuture<'e, crate::Result<Option<T>>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O>,
T: FromRow<Self::Backend, O> + Send,
{
Box::pin(async move {
let row = self
.live
.fetch_optional(query, params.into_params())
.await?;
self.live.fetch_optional(query, params)
}
Ok(row.map(Row).map(T::from_row))
})
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Backend>>> {
self.live.describe(query)
}
}

View file

@ -1,32 +1,49 @@
use crate::{backend::Backend, error::Error, params::IntoQueryParameters, row::FromRow};
use crate::{
backend::Backend,
describe::Describe,
error::Error,
params::{IntoQueryParameters, QueryParameters},
row::FromRow,
};
use futures_core::{future::BoxFuture, stream::BoxStream};
use futures_util::TryStreamExt;
use futures_util::{TryFutureExt, TryStreamExt};
pub trait Executor: Send {
type Backend: Backend;
fn execute<'c, 'q: 'c, I: 'c>(
&'c mut self,
/// Verifies a connection to the database is still alive.
fn ping<'e>(&'e mut self) -> BoxFuture<'e, crate::Result<()>> {
Box::pin(
self.execute(
"SELECT 1",
<Self::Backend as Backend>::QueryParameters::new(),
)
.map_ok(|_| ()),
)
}
fn execute<'e, 'q: 'e, I: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<u64, Error>>
) -> BoxFuture<'e, crate::Result<u64>>
where
I: IntoQueryParameters<Self::Backend> + Send;
fn fetch<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxStream<'c, Result<T, Error>>
) -> BoxStream<'e, crate::Result<T>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send + Unpin;
fn fetch_all<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch_all<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<Vec<T>, Error>>
) -> BoxFuture<'e, crate::Result<Vec<T>>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send + Unpin,
@ -34,20 +51,20 @@ pub trait Executor: Send {
Box::pin(self.fetch(query, params).try_collect())
}
fn fetch_optional<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<Option<T>, Error>>
) -> BoxFuture<'e, crate::Result<Option<T>>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send;
fn fetch_one<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch_one<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<T, Error>>
) -> BoxFuture<'e, crate::Result<T>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send,
@ -55,4 +72,11 @@ pub trait Executor: Send {
let fut = self.fetch_optional(query, params);
Box::pin(async move { fut.await?.ok_or(Error::NotFound) })
}
/// Analyze the SQL statement and report the inferred bind parameter types and returned
/// columns.
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Backend>>>;
}

View file

@ -1,11 +1,14 @@
use super::{MariaDb, MariaDbQueryParameters, MariaDbRow};
use crate::backend::Backend;
use crate::describe::{Describe, ResultField};
use crate::mariadb::protocol::{StmtExecFlag, ComStmtExecute, ResultRow, Capabilities, OkPacket, EofPacket, ErrPacket, ColumnDefinitionPacket, ColumnCountPacket};
use async_trait::async_trait;
use crate::{
backend::Backend,
describe::{Describe, ResultField},
mariadb::protocol::{
Capabilities, ColumnCountPacket, ColumnDefinitionPacket, ComStmtExecute, EofPacket,
ErrPacket, OkPacket, ResultRow, StmtExecFlag,
},
};
use futures_core::stream::BoxStream;
#[async_trait]
impl Backend for MariaDb {
type QueryParameters = MariaDbQueryParameters;
type Row = MariaDbRow;

View file

@ -17,8 +17,7 @@ use std::{
io,
net::{IpAddr, SocketAddr},
};
use url::quirks::protocol;
use url::Url;
use url::{quirks::protocol, Url};
pub struct MariaDb {
pub(crate) stream: BufStream<TcpStream>,
@ -192,13 +191,20 @@ impl MariaDb {
ComStmtPrepareOk::decode(packet).map_err(Into::into)
}
pub(super) async fn step(&mut self, columns: &Vec<ColumnDefinitionPacket>, packet: &[u8]) -> Result<Option<ResultRow>> {
pub(super) async fn step(
&mut self,
columns: &Vec<ColumnDefinitionPacket>,
packet: &[u8],
) -> Result<Option<ResultRow>> {
// For each row in the result set we will receive a ResultRow packet.
// We may receive an [OkPacket], [EofPacket], or [ErrPacket] (depending on if EOFs are enabled) to finalize the iteration.
if packet[0] == 0xFE && packet.len() < 0xFF_FF_FF {
// NOTE: It's possible for a ResultRow to start with 0xFE (which would normally signify end-of-rows)
// but it's not possible for an Ok/Eof to be larger than 0xFF_FF_FF.
if !self.capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) {
if !self
.capabilities
.contains(Capabilities::CLIENT_DEPRECATE_EOF)
{
let _eof = EofPacket::decode(packet)?;
Ok(None)
} else {
@ -214,7 +220,10 @@ impl MariaDb {
}
}
pub(super) async fn column_definitions(&mut self, packet: &[u8]) -> Result<Vec<ColumnDefinitionPacket>> {
pub(super) async fn column_definitions(
&mut self,
packet: &[u8],
) -> Result<Vec<ColumnDefinitionPacket>> {
// A Resultset starts with a [ColumnCountPacket] which is a single field that encodes
// how many columns we can expect when fetching rows from this statement
let column_count: u64 = ColumnCountPacket::decode(packet)?.columns;
@ -229,7 +238,10 @@ impl MariaDb {
// When (legacy) EOFs are enabled, the fixed number column definitions are further terminated by
// an EOF packet
if !self.capabilities.contains(Capabilities::CLIENT_DEPRECATE_EOF) {
if !self
.capabilities
.contains(Capabilities::CLIENT_DEPRECATE_EOF)
{
let _eof = EofPacket::decode(self.receive().await?)?;
}

View file

@ -1,5 +1,7 @@
use crate::mariadb::{protocol::ResultRow, MariaDb};
use crate::row::RawRow;
use crate::{
mariadb::{protocol::ResultRow, MariaDb},
row::RawRow,
};
pub struct MariaDbRow(pub(super) ResultRow);

View file

@ -1,11 +1,16 @@
use crate::{
backend::Backend,
connection::Connection,
describe::Describe,
error::Error,
executor::Executor,
params::IntoQueryParameters,
row::{FromRow, Row},
};
use async_std::{
sync::{channel, Receiver, Sender},
task,
};
use futures_channel::oneshot;
use futures_core::{future::BoxFuture, stream::BoxStream};
use futures_util::{future::FutureExt, stream::StreamExt};
@ -20,9 +25,6 @@ use std::{
time::{Duration, Instant},
};
use async_std::sync::{channel, Receiver, Sender};
use async_std::task;
/// A pool of database connections.
pub struct Pool<DB>(Arc<SharedPool<DB>>)
where
@ -244,22 +246,22 @@ where
{
type Backend = DB;
fn execute<'c, 'q: 'c, I: 'c>(
&'c mut self,
fn execute<'e, 'q: 'e, I: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<u64, Error>>
) -> BoxFuture<'e, crate::Result<u64>>
where
I: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move { <&Pool<DB> as Executor>::execute(&mut &*self, query, params).await })
}
fn fetch<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxStream<'c, Result<T, Error>>
) -> BoxStream<'e, crate::Result<T>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send + Unpin,
@ -271,16 +273,14 @@ where
while let Some(row) = s.next().await.transpose()? {
yield row;
}
drop(s);
})
}
fn fetch_optional<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<Option<T>, Error>>
) -> BoxFuture<'e, crate::Result<Option<T>>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send,
@ -289,6 +289,13 @@ where
<&Pool<DB> as Executor>::fetch_optional(&mut &*self, query, params).await
})
}
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Backend>>> {
Box::pin(async move { <&Pool<DB> as Executor>::describe(&mut &*self, query).await })
}
}
impl<DB> Executor for &'_ Pool<DB>
@ -297,59 +304,53 @@ where
{
type Backend = DB;
fn execute<'c, 'q: 'c, I: 'c>(
&'c mut self,
fn execute<'e, 'q: 'e, I: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<u64, Error>>
) -> BoxFuture<'e, crate::Result<u64>>
where
I: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move {
let mut live = self.0.acquire().await?;
let result = live.execute(query, params.into_params()).await;
result
})
Box::pin(async move { self.0.acquire().await?.execute(query, params).await })
}
fn fetch<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxStream<'c, Result<T, Error>>
) -> BoxStream<'e, crate::Result<T>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send + Unpin,
{
Box::pin(async_stream::try_stream! {
let mut live = self.0.acquire().await?;
let mut s = live.fetch(query, params.into_params());
let mut s = live.fetch(query, params);
while let Some(row) = s.next().await.transpose()? {
yield T::from_row(Row(row));
yield row;
}
})
}
fn fetch_optional<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>(
&'c mut self,
fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'c, Result<Option<T>, Error>>
) -> BoxFuture<'e, crate::Result<Option<T>>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send,
{
Box::pin(async move {
Ok(self
.0
.acquire()
.await?
.fetch_optional(query, params.into_params())
.await?
.map(Row)
.map(T::from_row))
})
Box::pin(async move { self.0.acquire().await?.fetch_optional(query, params).await })
}
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Backend>>> {
Box::pin(async move { self.0.acquire().await?.describe(query).await })
}
}

View file

@ -1,15 +1,12 @@
use super::connection::Step;
use super::Postgres;
use super::PostgresQueryParameters;
use super::PostgresRow;
use crate::backend::Backend;
use crate::describe::{Describe, ResultField};
use crate::params::QueryParameters;
use crate::url::Url;
use async_trait::async_trait;
use futures_core::stream::BoxStream;
use super::{connection::Step, Postgres, PostgresQueryParameters, PostgresRow};
use crate::{
backend::Backend,
describe::{Describe, ResultField},
params::QueryParameters,
url::Url,
};
use futures_core::{future::BoxFuture, stream::BoxStream};
#[async_trait]
impl Backend for Postgres {
type QueryParameters = PostgresQueryParameters;
@ -17,132 +14,27 @@ impl Backend for Postgres {
type TableIdent = u32;
async fn open(url: &str) -> crate::Result<Self> {
let url = Url::parse(url)?;
let address = url.resolve(5432);
let mut conn = Self::new(address).await?;
fn open(url: &str) -> BoxFuture<'static, crate::Result<Self>> {
let url = Url::parse(url);
conn.startup(
url.username(),
url.password().unwrap_or_default(),
url.database(),
)
.await?;
Box::pin(async move {
let url = url?;
let address = url.resolve(5432);
let mut conn = Self::new(address).await?;
Ok(conn)
}
conn.startup(
url.username(),
url.password().unwrap_or_default(),
url.database(),
)
.await?;
#[inline]
async fn close(mut self) -> crate::Result<()> {
self.terminate().await
}
async fn execute(
&mut self,
query: &str,
params: PostgresQueryParameters,
) -> crate::Result<u64> {
self.parse("", query, &params);
self.bind("", "", &params);
self.execute("", 1);
self.sync().await?;
let mut affected = 0;
while let Some(step) = self.step().await? {
if let Step::Command(cnt) = step {
affected = cnt;
}
}
Ok(affected)
}
fn fetch(
&mut self,
query: &str,
params: PostgresQueryParameters,
) -> BoxStream<'_, crate::Result<PostgresRow>> {
self.parse("", query, &params);
self.bind("", "", &params);
self.execute("", 0);
Box::pin(async_stream::try_stream! {
self.sync().await?;
while let Some(step) = self.step().await? {
if let Step::Row(row) = step {
yield row;
}
}
Ok(conn)
})
}
async fn fetch_optional(
&mut self,
query: &str,
params: PostgresQueryParameters,
) -> crate::Result<Option<PostgresRow>> {
self.parse("", query, &params);
self.bind("", "", &params);
self.execute("", 2);
self.sync().await?;
let mut row: Option<PostgresRow> = None;
while let Some(step) = self.step().await? {
if let Step::Row(r) = step {
if row.is_some() {
return Err(crate::Error::FoundMoreThanOne);
}
row = Some(r);
}
}
Ok(row)
}
async fn describe(&mut self, body: &str) -> crate::Result<Describe<Postgres>> {
self.parse("", body, &PostgresQueryParameters::new());
self.describe("");
self.sync().await?;
let param_desc = loop {
let step = self
.step()
.await?
.ok_or(protocol_err!("did not receive ParameterDescription"));
if let Step::ParamDesc(desc) = step? {
break desc;
}
};
let row_desc = loop {
let step = self
.step()
.await?
.ok_or(protocol_err!("did not receive RowDescription"));
if let Step::RowDesc(desc) = step? {
break desc;
}
};
Ok(Describe {
param_types: param_desc.ids.into_vec(),
result_fields: row_desc
.fields
.into_vec()
.into_iter()
.map(|field| ResultField {
name: Some(field.name),
table_id: Some(field.table_id),
type_id: field.type_id,
})
.collect(),
})
fn close(mut self) -> BoxFuture<'static, crate::Result<()>> {
Box::pin(self.terminate())
}
}

View file

@ -7,8 +7,10 @@ use crate::{
};
use async_std::net::TcpStream;
use byteorder::NetworkEndian;
use std::net::Shutdown;
use std::{io, net::SocketAddr};
use std::{
io,
net::{Shutdown, SocketAddr},
};
pub struct Postgres {
stream: BufStream<TcpStream>,

View file

@ -0,0 +1,148 @@
use super::{connection::Step, Postgres, PostgresQueryParameters, PostgresRow};
use crate::{
backend::Backend,
describe::{Describe, ResultField},
executor::Executor,
params::{IntoQueryParameters, QueryParameters},
row::FromRow,
url::Url,
};
use futures_core::{future::BoxFuture, stream::BoxStream};
impl Executor for Postgres {
type Backend = Self;
fn execute<'e, 'q: 'e, I: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'e, crate::Result<u64>>
where
I: IntoQueryParameters<Self::Backend> + Send,
{
Box::pin(async move {
let params = params.into_params();
self.parse("", query, &params);
self.bind("", "", &params);
self.execute("", 1);
self.sync().await?;
let mut affected = 0;
while let Some(step) = self.step().await? {
if let Step::Command(cnt) = step {
affected = cnt;
}
}
Ok(affected)
})
}
fn fetch<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxStream<'e, crate::Result<T>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send + Unpin,
{
let params = params.into_params();
self.parse("", query, &params);
self.bind("", "", &params);
self.execute("", 0);
Box::pin(async_stream::try_stream! {
self.sync().await?;
while let Some(step) = self.step().await? {
if let Step::Row(row) = step {
yield FromRow::from_row(row);
}
}
})
}
fn fetch_optional<'e, 'q: 'e, I: 'e, O: 'e, T: 'e>(
&'e mut self,
query: &'q str,
params: I,
) -> BoxFuture<'e, crate::Result<Option<T>>>
where
I: IntoQueryParameters<Self::Backend> + Send,
T: FromRow<Self::Backend, O> + Send,
{
Box::pin(async move {
let params = params.into_params();
self.parse("", query, &params);
self.bind("", "", &params);
self.execute("", 2);
self.sync().await?;
let mut row: Option<_> = None;
while let Some(step) = self.step().await? {
if let Step::Row(r) = step {
if row.is_some() {
return Err(crate::Error::FoundMoreThanOne);
}
row = Some(FromRow::from_row(r));
}
}
Ok(row)
})
}
fn describe<'e, 'q: 'e>(
&'e mut self,
query: &'q str,
) -> BoxFuture<'e, crate::Result<Describe<Self::Backend>>> {
Box::pin(async move {
self.parse("", query, &PostgresQueryParameters::new());
self.describe("");
self.sync().await?;
let param_desc = loop {
let step = self
.step()
.await?
.ok_or(protocol_err!("did not receive ParameterDescription"));
if let Step::ParamDesc(desc) = step? {
break desc;
}
};
let row_desc = loop {
let step = self
.step()
.await?
.ok_or(protocol_err!("did not receive RowDescription"));
if let Step::RowDesc(desc) = step? {
break desc;
}
};
Ok(Describe {
param_types: param_desc.ids.into_vec(),
result_fields: row_desc
.fields
.into_vec()
.into_iter()
.map(|field| ResultField {
name: Some(field.name),
table_id: Some(field.table_id),
type_id: field.type_id,
})
.collect(),
})
})
}
}

View file

@ -1,6 +1,7 @@
mod backend;
mod connection;
mod error;
mod executor;
mod query;
mod row;

View file

@ -1,10 +1,10 @@
use super::{protocol::DataRow, Postgres};
use crate::row::RawRow;
use crate::row::Row;
#[derive(Debug)]
pub struct PostgresRow(pub(crate) DataRow);
impl RawRow for PostgresRow {
impl Row for PostgresRow {
type Backend = Postgres;
#[inline]

View file

@ -1,12 +1,17 @@
use crate::params::IntoQueryParameters;
use crate::{
backend::Backend, encode::Encode, error::Error, executor::Executor, params::QueryParameters,
row::FromRow, types::HasSqlType, Row,
backend::Backend,
encode::Encode,
error::Error,
executor::Executor,
params::{IntoQueryParameters, QueryParameters},
row::FromRow,
types::HasSqlType,
Row,
};
use bitflags::_core::marker::PhantomData;
use futures_core::{future::BoxFuture, stream::BoxStream};
pub struct Query<'q, DB, I = <DB as Backend>::QueryParameters, O = Row<DB>, T = O>
pub struct Query<'q, DB, I = <DB as Backend>::QueryParameters, O = <DB as Backend>::Row>
where
DB: Backend,
{
@ -19,9 +24,6 @@ where
#[doc(hidden)]
pub output: PhantomData<O>,
#[doc(hidden)]
pub target: PhantomData<T>,
#[doc(hidden)]
pub backend: PhantomData<DB>,
}
@ -31,6 +33,7 @@ where
DB: Backend,
DB::QueryParameters: 'q,
I: IntoQueryParameters<DB> + Send,
O: FromRow<DB, O> + Send + Unpin,
{
#[inline]
pub fn execute<E>(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result<u64>>
@ -39,37 +42,29 @@ where
{
executor.execute(self.query, self.input)
}
}
impl<'q, DB, I: 'q, O: 'q, T: 'q> Query<'q, DB, I, O, T>
where
DB: Backend,
DB::QueryParameters: 'q,
I: IntoQueryParameters<DB> + Send,
T: FromRow<DB, O> + Send + Unpin,
{
pub fn fetch<E>(self, executor: &'q mut E) -> BoxStream<'q, crate::Result<T>>
pub fn fetch<E>(self, executor: &'q mut E) -> BoxStream<'q, crate::Result<O>>
where
E: Executor<Backend = DB>,
{
executor.fetch(self.query, self.input)
}
pub fn fetch_all<E>(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result<Vec<T>>>
pub fn fetch_all<E>(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result<Vec<O>>>
where
E: Executor<Backend = DB>,
{
executor.fetch_all(self.query, self.input)
}
pub fn fetch_optional<E>(self, executor: &'q mut E) -> BoxFuture<'q, Result<Option<T>, Error>>
pub fn fetch_optional<E>(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result<Option<O>>>
where
E: Executor<Backend = DB>,
{
executor.fetch_optional(self.query, self.input)
}
pub fn fetch_one<E>(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result<T>>
pub fn fetch_one<E>(self, executor: &'q mut E) -> BoxFuture<'q, crate::Result<O>>
where
E: Executor<Backend = DB>,
{
@ -77,7 +72,7 @@ where
}
}
impl<DB, Target> Query<'_, DB, <DB as Backend>::QueryParameters, Row<DB>, Target>
impl<DB> Query<'_, DB, <DB as Backend>::QueryParameters>
where
DB: Backend,
{
@ -100,7 +95,7 @@ where
/// Construct a full SQL query using raw SQL.
#[inline]
pub fn query<DB, T>(query: &str) -> Query<'_, DB, DB::QueryParameters, Row<DB>, T>
pub fn query<DB>(query: &str) -> Query<'_, DB>
where
DB: Backend,
{
@ -109,6 +104,5 @@ where
input: DB::QueryParameters::new(),
output: PhantomData,
backend: PhantomData,
target: PhantomData,
}
}

View file

@ -1,6 +1,6 @@
use crate::{backend::Backend, decode::Decode, types::HasSqlType};
pub trait RawRow: Send {
pub trait Row: Send {
type Backend: Backend;
fn len(&self) -> usize;
@ -16,25 +16,8 @@ pub trait RawRow: Send {
}
}
pub struct Row<DB>(pub(crate) DB::Row)
where
DB: Backend;
impl<DB> Row<DB>
where
DB: Backend,
{
pub fn get<T>(&self, index: usize) -> T
where
DB: HasSqlType<T>,
T: Decode<DB>,
{
self.0.get(index)
}
}
pub trait FromRow<DB: Backend, O = Row<DB>> {
fn from_row(row: Row<DB>) -> Self;
pub trait FromRow<DB: Backend, O = <DB as Backend>::Row> {
fn from_row(row: <DB as Backend>::Row) -> Self;
}
#[allow(unused)]
@ -47,7 +30,9 @@ macro_rules! impl_from_row {
$($T: crate::decode::Decode<$B>,)+
{
#[inline]
fn from_row(row: crate::row::Row<$B>) -> Self {
fn from_row(row: <$B as crate::backend::Backend>::Row) -> Self {
use crate::row::Row;
($(row.get($idx),)+)
}
}
@ -59,7 +44,9 @@ macro_rules! impl_from_row {
$($T: crate::decode::Decode<$B>,)+
{
#[inline]
fn from_row(row: crate::row::Row<$B>) -> Self {
fn from_row(row: <$B as crate::backend::Backend>::Row) -> Self {
use crate::row::Row;
($(row.get($idx),)+)
}
}
@ -69,9 +56,9 @@ macro_rules! impl_from_row {
#[allow(unused)]
macro_rules! impl_from_row_for_backend {
($B:ident) => {
impl crate::row::FromRow<$B> for crate::row::Row<$B> where $B: crate::Backend {
impl crate::row::FromRow<$B> for <$B as crate::backend::Backend>::Row where $B: crate::Backend {
#[inline]
fn from_row(row: crate::row::Row<$B>) -> Self {
fn from_row(row: <$B as crate::backend::Backend>::Row) -> Self {
row
}
}

View file

@ -16,7 +16,7 @@ use syn::{
Expr, ExprLit, Lit, Token,
};
use sqlx::HasTypeMetadata;
use sqlx::{Executor, HasTypeMetadata};
use async_std::task;

View file

@ -1,4 +1,4 @@
use sqlx::{Connection, Postgres};
use sqlx::{Connection, Postgres, Row};
use std::env;
macro_rules! test {