diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 080a735d..e1249d0d 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -1,3 +1,5 @@ +use crate::database::Database; +use crate::describe::Describe; use crate::executor::Executor; use crate::url::Url; use futures_core::future::BoxFuture; @@ -8,13 +10,29 @@ use std::convert::TryInto; /// /// Prefer running queries from [Pool] unless there is a specific need for a single, continuous /// connection. -pub trait Connection: Executor + Send + 'static { +pub trait Connection +where + Self: Send + 'static, +{ + type Database: Database; + /// Close this database connection. fn close(self) -> BoxFuture<'static, crate::Result<()>>; /// Verifies a connection to the database is still alive. - fn ping(&mut self) -> BoxFuture> { - Box::pin(self.execute("SELECT 1", Default::default()).map_ok(|_| ())) + fn ping(&mut self) -> BoxFuture> + where + for<'a> &'a mut Self: Executor<'a>, + { + Box::pin((&mut *self).execute("SELECT 1").map_ok(|_| ())) + } + + #[doc(hidden)] + fn describe<'e, 'q: 'e>( + &'e mut self, + query: &'q str, + ) -> BoxFuture<'e, crate::Result>> { + todo!("make this a required function"); } } diff --git a/sqlx-core/src/cursor.rs b/sqlx-core/src/cursor.rs new file mode 100644 index 00000000..048d41ff --- /dev/null +++ b/sqlx-core/src/cursor.rs @@ -0,0 +1,36 @@ +use std::future::Future; + +use futures_core::future::BoxFuture; +use futures_core::stream::BoxStream; + +use crate::database::{Database, HasRow}; + +/// Represents a result set, which is generated by executing a query against the database. +/// +/// A `Cursor` can be created by either [`Executor::execute`](trait.Execute.html) or +/// [`Query::fetch`](struct.Query.html). +/// +/// Initially the `Cursor` is positioned before the first row. The `next` method moves the cursor +/// to the next row, and because it returns `None` when there are no more rows, it can be used +/// in a `while` loop to iterate through all returned rows. +pub trait Cursor<'a> +where + Self: Send, + // `.await`-ing a cursor will return the affected rows from the query + Self: Future>, +{ + type Database: Database; + + /// Fetch the first row in the result. Returns `None` if no row is present. + /// + /// Returns `Error::MoreThanOneRow` if more than one row is in the result. + fn first(self) -> BoxFuture<'a, crate::Result::Row>>>; + + /// Fetch the next row in the result. Returns `None` if there are no more rows. + fn next(&mut self) -> BoxFuture::Row>>>; + + /// Map the `Row`s in this result to a different type, returning a [`Stream`] of the results. + fn map(self, f: F) -> BoxStream<'a, crate::Result> + where + F: Fn(::Row) -> T; +} diff --git a/sqlx-core/src/database.rs b/sqlx-core/src/database.rs index 9c2f854a..ef18f033 100644 --- a/sqlx-core/src/database.rs +++ b/sqlx-core/src/database.rs @@ -2,6 +2,7 @@ use std::fmt::Display; use crate::arguments::Arguments; use crate::connection::Connection; +use crate::cursor::Cursor; use crate::row::Row; use crate::types::TypeInfo; @@ -9,19 +10,38 @@ use crate::types::TypeInfo; /// /// This trait encapsulates a complete driver implementation to a specific /// database (e.g., MySQL, Postgres). -pub trait Database: 'static { +pub trait Database +where + Self: Sized + 'static, + Self: HasRow, + Self: for<'a> HasRawValue<'a>, + Self: for<'a> HasCursor<'a, Database = Self>, +{ /// The concrete `Connection` implementation for this database. type Connection: Connection; /// The concrete `Arguments` implementation for this database. type Arguments: Arguments; - /// The concrete `Row` implementation for this database. - type Row: Row; - /// The concrete `TypeInfo` implementation for this database. type TypeInfo: TypeInfo; /// The Rust type of table identifiers for this database. type TableId: Display + Clone; } + +pub trait HasRawValue<'a> { + type RawValue; +} + +pub trait HasCursor<'a> { + type Database: Database; + + type Cursor: Cursor<'a, Database = Self::Database>; +} + +pub trait HasRow { + type Database: Database; + + type Row: Row; +} diff --git a/sqlx-core/src/executor.rs b/sqlx-core/src/executor.rs index fe403937..43333764 100644 --- a/sqlx-core/src/executor.rs +++ b/sqlx-core/src/executor.rs @@ -1,72 +1,57 @@ -use crate::database::Database; +use crate::database::{Database, HasCursor}; use crate::describe::Describe; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use futures_util::TryStreamExt; -/// Encapsulates query execution on the database. +/// A type that contains or can provide a database connection to use for executing queries +/// against the database. /// -/// Implemented primarily by [crate::Pool]. -pub trait Executor { - type Database: Database + ?Sized; +/// No guarantees are provided that successive queries run on the same physical database +/// connection. A [`Connection`](trait.Connection.html) is an `Executor` that guarantees that successive +/// queries are run on the same physical database connection. +/// +/// Implementations are provided for [`&Pool`](struct.Pool.html), +/// [`&mut PoolConnection`](struct.PoolConnection.html), +/// and [`&mut Connection`](trait.Connection.html). +pub trait Executor<'a> +where + Self: Send, +{ + /// The specific database that this type is implemented for. + type Database: Database; - /// Send a raw SQL command to the database. - /// - /// This is intended for queries that cannot or should not be prepared (ex. `BEGIN`). - /// - /// Does not support fetching results. - fn send<'e, 'q: 'e>(&'e mut self, command: &'q str) -> BoxFuture<'e, crate::Result<()>>; - - /// Execute the query, returning the number of rows affected. - fn execute<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: ::Arguments, - ) -> BoxFuture<'e, crate::Result>; - - /// Executes the query and returns a [Stream] of [Row]. - fn fetch<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: ::Arguments, - ) -> BoxStream<'e, crate::Result<::Row>>; - - /// Executes the query and returns up to resulting record. - /// - /// * [crate::Error::FoundMoreThanOne] will be returned if the query produced more than 1 row. - fn fetch_optional<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: ::Arguments, - ) -> BoxFuture<'e, crate::Result::Row>>> { - let mut s = self.fetch(query, args); - Box::pin(async move { - match s.try_next().await? { - Some(val) => { - if s.try_next().await?.is_some() { - Err(crate::Error::FoundMoreThanOne) - } else { - Ok(Some(val)) - } - } - None => Ok(None), - } - }) - } - - /// Execute the query and return at most one resulting record. - fn fetch_one<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: ::Arguments, - ) -> BoxFuture<'e, crate::Result<::Row>> { - let mut s = self.fetch(query, args); - Box::pin(async move { s.try_next().await?.ok_or(crate::Error::NotFound) }) - } + /// Executes a query that may or may not return a result set. + fn execute<'b, E>(self, query: E) -> >::Cursor + where + E: Execute<'b, Self::Database>; #[doc(hidden)] - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>>; + fn execute_by_ref<'b, E>(&mut self, query: E) -> >::Cursor + where + E: Execute<'b, Self::Database>; +} + +/// A type that may be executed against a database connection. +pub trait Execute<'a, DB> +where + DB: Database, +{ + /// Returns the query to be executed and the arguments to bind against the query, if any. + /// + /// Returning `None` for `Arguments` indicates to use a "simple" query protocol and to not + /// prepare the query. Returning `Some(Default::default())` is an empty arguments object that + /// will be prepared (and cached) before execution. + #[doc(hidden)] + fn into_parts(self) -> (&'a str, Option); +} + +impl<'a, DB> Execute<'a, DB> for &'a str +where + DB: Database, +{ + #[inline] + fn into_parts(self) -> (&'a str, Option) { + (self, None) + } } diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index 8eaffa5e..03dcc669 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -13,10 +13,10 @@ mod io; mod cache; mod connection; +mod cursor; mod database; mod executor; mod query; -mod query_as; mod transaction; mod url; @@ -51,14 +51,11 @@ pub use database::Database; pub use error::{Error, Result}; pub use connection::{Connect, Connection}; +pub use cursor::Cursor; pub use executor::Executor; pub use query::{query, Query}; -pub use query_as::{query_as, QueryAs}; pub use transaction::Transaction; -#[doc(hidden)] -pub use query_as::query_as_mapped; - #[doc(inline)] pub use pool::Pool; diff --git a/sqlx-core/src/mysql/database.rs b/sqlx-core/src/mysql/database.rs index ee1f540a..cd4bb427 100644 --- a/sqlx-core/src/mysql/database.rs +++ b/sqlx-core/src/mysql/database.rs @@ -1,4 +1,4 @@ -use crate::Database; +use crate::database::{Database, HasCursor, HasRawValue, HasRow}; /// **MySQL** database driver. pub struct MySql; @@ -8,9 +8,23 @@ impl Database for MySql { type Arguments = super::MySqlArguments; - type Row = super::MySqlRow; - type TypeInfo = super::MySqlTypeInfo; type TableId = Box; } + +impl HasRow for MySql { + type Database = MySql; + + type Row = super::MySqlRow; +} + +impl<'a> HasCursor<'a> for MySql { + type Database = MySql; + + type Cursor = super::MySqlCursor<'a>; +} + +impl<'a> HasRawValue<'a> for MySql { + type RawValue = Option<&'a [u8]>; +} diff --git a/sqlx-core/src/pool/conn.rs b/sqlx-core/src/pool/conn.rs index 8e5adc75..f99a02c3 100644 --- a/sqlx-core/src/pool/conn.rs +++ b/sqlx-core/src/pool/conn.rs @@ -59,6 +59,8 @@ impl Connection for PoolConnection where C: Connect, { + type Database = C::Database; + /// Detach the connection from the pool and close it nicely. fn close(mut self) -> BoxFuture<'static, crate::Result<()>> { Box::pin(async move { @@ -166,7 +168,8 @@ impl<'s, C> Floating<'s, Idle> { where C: Connection, { - self.live.raw.ping().await + // TODO self.live.raw.ping().await + todo!() } pub fn into_live(self) -> Floating<'s, Live> { diff --git a/sqlx-core/src/pool/executor.rs b/sqlx-core/src/pool/executor.rs index c84f36f5..b0afa616 100644 --- a/sqlx-core/src/pool/executor.rs +++ b/sqlx-core/src/pool/executor.rs @@ -3,148 +3,89 @@ use std::ops::DerefMut; use futures_core::{future::BoxFuture, stream::BoxStream}; use futures_util::StreamExt; -use crate::{connection::Connect, describe::Describe, executor::Executor, pool::Pool, Database}; +use crate::{ + connection::{Connect, Connection}, + describe::Describe, + executor::Executor, + pool::Pool, + Database, +}; use super::PoolConnection; +use crate::database::HasCursor; +use crate::executor::Execute; -impl Executor for Pool +impl<'p, C> Executor<'p> for &'p Pool where C: Connect, + for<'con> &'con mut C: Executor<'con>, { - type Database = ::Database; + type Database = C::Database; - fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> { - Box::pin(async move { <&Pool as Executor>::send(&mut &*self, commands).await }) + fn execute<'q, E>(self, query: E) -> >::Cursor + where + E: Execute<'q, Self::Database>, + { + todo!() } - fn execute<'e, 'q: 'e>( + fn execute_by_ref<'q, 'e, E>( &'e mut self, - query: &'q str, - args: <::Database as Database>::Arguments, - ) -> BoxFuture<'e, crate::Result> { - Box::pin(async move { <&Pool as Executor>::execute(&mut &*self, query, args).await }) - } - - fn fetch<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: <::Database as Database>::Arguments, - ) -> BoxStream<'e, crate::Result<<::Database as Database>::Row>> { - Box::pin(async_stream::try_stream! { - let mut self_ = &*self; - let mut s = <&Pool as Executor>::fetch(&mut self_, query, args); - - while let Some(row) = s.next().await.transpose()? { - yield row; - } - }) - } - - fn fetch_optional<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: <::Database as Database>::Arguments, - ) -> BoxFuture<'e, crate::Result::Database as Database>::Row>>> { - Box::pin( - async move { <&Pool as Executor>::fetch_optional(&mut &*self, query, args).await }, - ) - } - - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - Box::pin(async move { <&Pool as Executor>::describe(&mut &*self, query).await }) + query: E, + ) -> >::Cursor + where + E: Execute<'q, Self::Database>, + { + todo!() } } -impl Executor for &'_ Pool +impl<'c, C> Executor<'c> for &'c mut PoolConnection where C: Connect, + for<'con> &'con mut C: Executor<'con>, { - type Database = ::Database; + type Database = C::Database; - fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> { - Box::pin(async move { self.acquire().await?.send(commands).await }) + fn execute<'q, E>(self, query: E) -> >::Cursor + where + E: Execute<'q, Self::Database>, + { + todo!() } - fn execute<'e, 'q: 'e>( + fn execute_by_ref<'q, 'e, E>( &'e mut self, - query: &'q str, - args: <::Database as Database>::Arguments, - ) -> BoxFuture<'e, crate::Result> { - Box::pin(async move { self.acquire().await?.execute(query, args).await }) - } - - fn fetch<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: <::Database as Database>::Arguments, - ) -> BoxStream<'e, crate::Result<<::Database as Database>::Row>> { - Box::pin(async_stream::try_stream! { - let mut live = self.acquire().await?; - let mut s = live.fetch(query, args); - - while let Some(row) = s.next().await.transpose()? { - yield row; - } - }) - } - - fn fetch_optional<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: <::Database as Database>::Arguments, - ) -> BoxFuture<'e, crate::Result::Database as Database>::Row>>> { - Box::pin(async move { self.acquire().await?.fetch_optional(query, args).await }) - } - - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - Box::pin(async move { self.acquire().await?.describe(query).await }) + query: E, + ) -> >::Cursor + where + E: Execute<'q, Self::Database>, + { + todo!() } } -impl Executor for PoolConnection +impl Executor<'static> for PoolConnection where C: Connect, + // for<'con> &'con mut C: Executor<'con>, { - type Database = ::Database; + type Database = C::Database; - fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> { - self.deref_mut().send(commands) + fn execute<'q, E>(self, query: E) -> >::Cursor + where + E: Execute<'q, Self::Database>, + { + unimplemented!() } - fn execute<'e, 'q: 'e>( + fn execute_by_ref<'q, 'e, E>( &'e mut self, - query: &'q str, - args: <::Database as Database>::Arguments, - ) -> BoxFuture<'e, crate::Result> { - self.deref_mut().execute(query, args) - } - - fn fetch<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: <::Database as Database>::Arguments, - ) -> BoxStream<'e, crate::Result<<::Database as Database>::Row>> { - self.deref_mut().fetch(query, args) - } - - fn fetch_optional<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: <::Database as Database>::Arguments, - ) -> BoxFuture<'e, crate::Result::Database as Database>::Row>>> { - self.deref_mut().fetch_optional(query, args) - } - - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - self.deref_mut().describe(query) + query: E, + ) -> >::Cursor + where + E: Execute<'q, Self::Database>, + { + todo!() } } diff --git a/sqlx-core/src/postgres/connection.rs b/sqlx-core/src/postgres/connection.rs index 7c0e3411..b88f05b6 100644 --- a/sqlx-core/src/postgres/connection.rs +++ b/sqlx-core/src/postgres/connection.rs @@ -16,7 +16,7 @@ use crate::postgres::protocol::{ }; use crate::postgres::PgError; use crate::url::Url; -use crate::Result; +use crate::{Postgres, Result}; /// An asynchronous connection to a [Postgres][super::Postgres] database. /// @@ -397,17 +397,6 @@ impl PgConnection { } } -impl PgConnection { - #[deprecated(note = "please use 'connect' instead")] - pub fn open(url: T) -> BoxFuture<'static, Result> - where - T: TryInto, - Self: Sized, - { - Box::pin(PgConnection::establish(url.try_into())) - } -} - impl Connect for PgConnection { fn connect(url: T) -> BoxFuture<'static, Result> where @@ -419,6 +408,8 @@ impl Connect for PgConnection { } impl Connection for PgConnection { + type Database = Postgres; + fn close(self) -> BoxFuture<'static, Result<()>> { Box::pin(self.terminate()) } diff --git a/sqlx-core/src/postgres/cursor.rs b/sqlx-core/src/postgres/cursor.rs new file mode 100644 index 00000000..ff5fa2ea --- /dev/null +++ b/sqlx-core/src/postgres/cursor.rs @@ -0,0 +1,56 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::future::BoxFuture; +use futures_core::stream::BoxStream; + +use crate::cursor::Cursor; +use crate::database::HasRow; +use crate::postgres::protocol::StatementId; +use crate::postgres::PgConnection; +use crate::Postgres; + +pub struct PgCursor<'a> { + statement: StatementId, + connection: &'a mut PgConnection, +} + +impl<'a> PgCursor<'a> { + pub(super) fn from_connection( + connection: &'a mut PgConnection, + statement: StatementId, + ) -> Self { + Self { + connection, + statement, + } + } +} + +impl<'a> Cursor<'a> for PgCursor<'a> { + type Database = Postgres; + + fn first(self) -> BoxFuture<'a, crate::Result::Row>>> { + todo!() + } + + fn next(&mut self) -> BoxFuture::Row>>> { + todo!() + } + + fn map(self, f: F) -> BoxStream<'a, crate::Result> + where + F: Fn(::Row) -> T, + { + todo!() + } +} + +impl<'a> Future for PgCursor<'a> { + type Output = crate::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + todo!() + } +} diff --git a/sqlx-core/src/postgres/database.rs b/sqlx-core/src/postgres/database.rs index 8b76759e..0bfc0e6e 100644 --- a/sqlx-core/src/postgres/database.rs +++ b/sqlx-core/src/postgres/database.rs @@ -1,4 +1,4 @@ -use crate::database::Database; +use crate::database::{Database, HasCursor, HasRawValue, HasRow}; /// **Postgres** database driver. pub struct Postgres; @@ -8,9 +8,25 @@ impl Database for Postgres { type Arguments = super::PgArguments; - type Row = super::PgRow; - type TypeInfo = super::PgTypeInfo; type TableId = u32; } + +impl HasRow for Postgres { + // TODO: Can we drop the `type Database = _` + type Database = Postgres; + + type Row = super::PgRow; +} + +impl<'a> HasCursor<'a> for Postgres { + // TODO: Can we drop the `type Database = _` + type Database = Postgres; + + type Cursor = super::PgCursor<'a>; +} + +impl<'a> HasRawValue<'a> for Postgres { + type RawValue = Option<&'a [u8]>; +} diff --git a/sqlx-core/src/postgres/executor.rs b/sqlx-core/src/postgres/executor.rs index 67be5c61..ad5b7900 100644 --- a/sqlx-core/src/postgres/executor.rs +++ b/sqlx-core/src/postgres/executor.rs @@ -6,17 +6,9 @@ use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use crate::describe::{Column, Describe}; +use crate::executor::{Execute, Executor}; use crate::postgres::protocol::{self, Encode, Message, StatementId, TypeFormat}; -use crate::postgres::{PgArguments, PgRow, PgTypeInfo, Postgres}; - -#[derive(Debug)] -enum Step { - Command(u64), - NoData, - Row(protocol::DataRow), - ParamDesc(Box), - RowDesc(Box), -} +use crate::postgres::{PgArguments, PgCursor, PgRow, PgTypeInfo, Postgres}; impl super::PgConnection { fn write_prepare(&mut self, query: &str, args: &PgArguments) -> StatementId { @@ -63,274 +55,51 @@ impl super::PgConnection { fn write_sync(&mut self) { protocol::Sync.encode(self.stream.buffer_mut()); } - - async fn wait_until_ready(&mut self) -> crate::Result<()> { - if !self.ready { - while let Some(message) = self.receive().await? { - match message { - Message::ReadyForQuery(_) => { - self.ready = true; - break; - } - - _ => { - // Drain the stream - } - } - } - } - - Ok(()) - } - - async fn step(&mut self) -> crate::Result> { - while let Some(message) = self.receive().await? { - match message { - Message::BindComplete - | Message::ParseComplete - | Message::PortalSuspended - | Message::CloseComplete => {} - - Message::CommandComplete(body) => { - return Ok(Some(Step::Command(body.affected_rows))); - } - - Message::NoData => { - return Ok(Some(Step::NoData)); - } - - Message::DataRow(body) => { - return Ok(Some(Step::Row(body))); - } - - Message::ReadyForQuery(_) => { - self.ready = true; - - return Ok(None); - } - - Message::ParameterDescription(desc) => { - return Ok(Some(Step::ParamDesc(desc))); - } - - Message::RowDescription(desc) => { - return Ok(Some(Step::RowDesc(desc))); - } - - message => { - return Err(protocol_err!("received unexpected message: {:?}", message).into()); - } - } - } - - // Connection was (unexpectedly) closed - Err(io::Error::from(io::ErrorKind::ConnectionAborted).into()) - } } -impl super::PgConnection { - async fn send<'e, 'q: 'e>(&'e mut self, command: &'q str) -> crate::Result<()> { - protocol::Query(command).encode(self.stream.buffer_mut()); +impl<'e> Executor<'e> for &'e mut super::PgConnection { + type Database = Postgres; - self.wait_until_ready().await?; + fn execute<'q, E>(self, query: E) -> PgCursor<'e> + where + E: Execute<'q, Self::Database>, + { + let (query, arguments) = query.into_parts(); - self.stream.flush().await?; - self.ready = false; + // TODO: Handle [arguments] being None. This should be a SIMPLE query. + let arguments = arguments.unwrap(); - while let Some(_step) = self.step().await? { - // Drain the stream until ReadyForQuery - } + // Check the statement cache for a statement ID that matches the given query + // If it doesn't exist, we generate a new statement ID and write out [Parse] to the + // connection command buffer + let statement = self.write_prepare(query, &arguments); - Ok(()) - } - - async fn execute<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: PgArguments, - ) -> crate::Result { - let statement = self.write_prepare(query, &args); - - self.write_bind("", statement, &args); - self.write_execute("", 1); - self.write_sync(); - - self.wait_until_ready().await?; - - self.stream.flush().await?; - self.ready = false; - - let mut affected = 0; - - while let Some(step) = self.step().await? { - if let Step::Command(cnt) = step { - affected = cnt; - } - } - - Ok(affected) - } - - // Initial part of [fetch]; write message to stream - fn write_fetch(&mut self, query: &str, args: &PgArguments) -> StatementId { - let statement = self.write_prepare(query, &args); - - self.write_bind("", statement, &args); + // Next, [Bind] attaches the arguments to the statement and creates a named portal + self.write_bind("", statement, &arguments); + // Next, [Describe] will return the expected result columns and types + // Conditionally run [Describe] only if the results have not been cached if !self.statement_cache.has_columns(statement) { self.write_describe(protocol::Describe::Portal("")); } + // Next, [Execute] then executes the named portal self.write_execute("", 0); + + // Finally, [Sync] asks postgres to process the messages that we sent and respond with + // a [ReadyForQuery] message when it's completely done. Theoretically, we could send + // dozens of queries before a [Sync] and postgres can handle that. Execution on the server + // is still serial but it would reduce round-trips. Some kind of builder pattern that is + // termed batching might suit this. self.write_sync(); - statement + PgCursor::from_connection(self, statement) } - async fn get_columns( - &mut self, - statement: StatementId, - ) -> crate::Result, usize>>> { - if !self.statement_cache.has_columns(statement) { - let desc: Option<_> = 'outer: loop { - while let Some(step) = self.step().await? { - match step { - Step::RowDesc(desc) => break 'outer Some(desc), - - Step::NoData => break 'outer None, - - _ => {} - } - } - - unreachable!(); - }; - - let mut columns = HashMap::new(); - - if let Some(desc) = desc { - columns.reserve(desc.fields.len()); - - for (index, field) in desc.fields.iter().enumerate() { - if let Some(name) = &field.name { - columns.insert(name.clone(), index); - } - } - } - - self.statement_cache.put_columns(statement, columns); - } - - Ok(self.statement_cache.get_columns(statement)) - } - - fn fetch<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: PgArguments, - ) -> BoxStream<'e, crate::Result> { - Box::pin(async_stream::try_stream! { - let statement = self.write_fetch(query, &args); - - self.wait_until_ready().await?; - - self.stream.flush().await?; - self.ready = false; - - let columns = self.get_columns(statement).await?; - - while let Some(step) = self.step().await? { - if let Step::Row(data) = step { - yield PgRow { data, columns: Arc::clone(&columns) }; - } - } - - // No more rows in the result set - }) - } - - async fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> crate::Result> { - let statement = self.write_prepare(query, &Default::default()); - - self.write_describe(protocol::Describe::Statement(statement)); - self.write_sync(); - - self.stream.flush().await?; - self.wait_until_ready().await?; - - let params = match self.step().await? { - Some(Step::ParamDesc(desc)) => desc, - - step => { - return Err( - protocol_err!("expected ParameterDescription; received {:?}", step).into(), - ); - } - }; - - let result = match self.step().await? { - Some(Step::RowDesc(desc)) => Some(desc), - Some(Step::NoData) => None, - - step => { - return Err(protocol_err!("expected RowDescription; received {:?}", step).into()); - } - }; - - Ok(Describe { - param_types: params - .ids - .iter() - .map(|id| PgTypeInfo::new(*id)) - .collect::>() - .into_boxed_slice(), - result_columns: result - .map(|r| r.fields) - .unwrap_or_default() - .into_vec() - .into_iter() - // TODO: Should [Column] just wrap [protocol::Field] ? - .map(|field| Column { - name: field.name, - table_id: field.table_id, - type_info: PgTypeInfo::new(field.type_id), - }) - .collect::>() - .into_boxed_slice(), - }) - } -} - -impl crate::Executor for super::PgConnection { - type Database = super::Postgres; - - fn send<'e, 'q: 'e>(&'e mut self, query: &'q str) -> BoxFuture<'e, crate::Result<()>> { - Box::pin(self.send(query)) - } - - fn execute<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: PgArguments, - ) -> BoxFuture<'e, crate::Result> { - Box::pin(self.execute(query, args)) - } - - fn fetch<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: PgArguments, - ) -> BoxStream<'e, crate::Result> { - self.fetch(query, args) - } - - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - Box::pin(self.describe(query)) + fn execute_by_ref<'q, E>(&mut self, query: E) -> PgCursor<'_> + where + E: Execute<'q, Self::Database>, + { + self.execute(query) } } diff --git a/sqlx-core/src/postgres/mod.rs b/sqlx-core/src/postgres/mod.rs index 1930f9ac..c5a3f02e 100644 --- a/sqlx-core/src/postgres/mod.rs +++ b/sqlx-core/src/postgres/mod.rs @@ -2,6 +2,7 @@ pub use arguments::PgArguments; pub use connection::PgConnection; +pub use cursor::PgCursor; pub use database::Postgres; pub use error::PgError; pub use row::PgRow; @@ -9,6 +10,7 @@ pub use types::PgTypeInfo; mod arguments; mod connection; +mod cursor; mod database; mod error; mod executor; diff --git a/sqlx-core/src/query.rs b/sqlx-core/src/query.rs index b14d4c94..3d9ef8cc 100644 --- a/sqlx-core/src/query.rs +++ b/sqlx-core/src/query.rs @@ -1,88 +1,77 @@ use crate::arguments::Arguments; use crate::arguments::IntoArguments; -use crate::database::Database; +use crate::cursor::Cursor; +use crate::database::{Database, HasCursor, HasRow}; use crate::encode::Encode; -use crate::executor::Executor; +use crate::executor::{Execute, Executor}; use crate::types::HasSqlType; use futures_core::stream::BoxStream; +use futures_util::future::ready; +use futures_util::TryFutureExt; use futures_util::TryStreamExt; +use std::future::Future; use std::marker::PhantomData; -/// Dynamic SQL query with bind parameters. Returned by [query]. -/// -/// The methods on this struct should be passed a reference to [crate::Pool] or one of -/// the connection types. -pub struct Query<'q, DB, T = ::Arguments> +/// Raw SQL query with bind parameters. Returned by [`query`]. +pub struct Query<'a, DB, T = ::Arguments> where DB: Database, { - query: &'q str, + query: &'a str, arguments: T, database: PhantomData, } -impl<'q, DB, P> Query<'q, DB, P> +impl<'a, DB, P> Execute<'a, DB> for Query<'a, DB, P> where DB: Database, P: IntoArguments + Send, { - /// Execute the query for its side-effects. - /// - /// Returns the number of rows affected, or 0 if not applicable. - pub async fn execute(self, executor: &mut E) -> crate::Result + fn into_parts(self) -> (&'a str, Option<::Arguments>) { + (self.query, Some(self.arguments.into_arguments())) + } +} + +impl<'a, DB, P> Query<'a, DB, P> +where + DB: Database, + P: IntoArguments + Send, +{ + pub fn execute<'b, E>(self, executor: E) -> impl Future> + 'b where - E: Executor, + E: Executor<'b, Database = DB>, + 'a: 'b, { - executor - .execute(self.query, self.arguments.into_arguments()) - .await + executor.execute(self) } - /// Execute the query, returning the rows as a futures `Stream`. - /// - /// Use [fetch_all] if you want a `Vec` instead. - pub fn fetch<'e, E>(self, executor: &'e mut E) -> BoxStream<'e, crate::Result> + pub fn fetch<'b, E>(self, executor: E) -> >::Cursor where - E: Executor, - 'q: 'e, + E: Executor<'b, Database = DB>, + 'a: 'b, { - executor.fetch(self.query, self.arguments.into_arguments()) + executor.execute(self) } - /// Execute the query and get all rows from the result as a `Vec`. - pub async fn fetch_all(self, executor: &mut E) -> crate::Result> + pub async fn fetch_optional<'b, E>( + self, + executor: E, + ) -> crate::Result::Row>> where - E: Executor, + E: Executor<'b, Database = DB>, { - executor - .fetch(self.query, self.arguments.into_arguments()) - .try_collect() - .await + executor.execute(self).first().await } - /// Execute a query which should return either 0 or 1 rows. - /// - /// Returns [crate::Error::FoundMoreThanOne] if more than 1 row is returned. - /// Use `.fetch().try_next()` if you just want one row. - pub async fn fetch_optional(self, executor: &mut E) -> crate::Result> + pub async fn fetch_one<'b, E>(self, executor: E) -> crate::Result<::Row> where - E: Executor, + E: Executor<'b, Database = DB>, { - executor - .fetch_optional(self.query, self.arguments.into_arguments()) - .await - } - - /// Execute a query which should return exactly 1 row. - /// - /// * Returns [crate::Error::NotFound] if 0 rows are returned. - /// * Returns [crate::Error::FoundMoreThanOne] if more than one row is returned. - pub async fn fetch_one(self, executor: &mut E) -> crate::Result - where - E: Executor, - { - executor - .fetch_one(self.query, self.arguments.into_arguments()) + self.fetch_optional(executor) + .and_then(|row| match row { + Some(row) => ready(Ok(row)), + None => ready(Err(crate::Error::NotFound)), + }) .await } } @@ -92,12 +81,6 @@ where DB: Database, { /// Bind a value for use with this SQL query. - /// - /// # Logic Safety - /// - /// This function should be used with care, as SQLx cannot validate - /// that the value is of the right type nor can it validate that you have - /// passed the correct number of parameters. pub fn bind(mut self, value: T) -> Self where DB: HasSqlType, @@ -108,17 +91,7 @@ where } } -/// Construct a full SQL query that can be chained to bind parameters and executed. -/// -/// # Examples -/// -/// ```ignore -/// let names: Vec = sqlx::query("SELECT name FROM users WHERE active = ?") -/// .bind(false) // [active = ?] -/// .fetch(&mut connection) // -> Stream -/// .map_ok(|row| row.name("name")) // -> Stream -/// .try_collect().await?; // -> Vec -/// ``` +/// Construct a raw SQL query that can be chained to bind parameters and executed. pub fn query(sql: &str) -> Query where DB: Database, diff --git a/sqlx-core/src/query_as.rs b/sqlx-core/src/query_as.rs deleted file mode 100644 index 31d57df8..00000000 --- a/sqlx-core/src/query_as.rs +++ /dev/null @@ -1,167 +0,0 @@ -use futures_core::Stream; -use futures_util::{future, TryStreamExt}; - -use crate::arguments::{Arguments, ImmutableArguments}; -use crate::{ - arguments::IntoArguments, database::Database, encode::Encode, executor::Executor, row::FromRow, - types::HasSqlType, -}; - -/// SQL query with bind parameters, which maps rows to an explicit output type. -/// -/// Returned by [query_as] and [query!] *et al*. -/// -/// The methods on this struct should be passed a reference to [crate::Pool] or one of -/// the connection types. -pub struct QueryAs<'q, DB, R, P = ::Arguments> -where - DB: Database, -{ - query: &'q str, - args: P, - map_row: fn(DB::Row) -> crate::Result, -} - -/// The result of [query!] for SQL queries that does not return output. -impl QueryAs<'_, DB, (), P> -where - DB: Database, - P: IntoArguments + Send, -{ - /// Execute the query for its side-effects. - /// - /// Returns the number of rows affected, or 0 if not applicable. - pub async fn execute(self, executor: &mut E) -> crate::Result - where - E: Executor, - { - executor - .execute(self.query, self.args.into_arguments()) - .await - } -} - -impl<'q, DB, R, P> QueryAs<'q, DB, R, P> -where - DB: Database, - P: IntoArguments + Send, - R: Send + 'q, -{ - /// Execute the query, returning the rows as a futures `Stream`. - /// - /// Use [fetch_all] if you want a `Vec` instead. - pub fn fetch<'e, E>(self, executor: &'e mut E) -> impl Stream> + 'e - where - E: Executor, - 'q: 'e, - { - let Self { - query, - args, - map_row, - .. - } = self; - executor - .fetch(query, args.into_arguments()) - .and_then(move |row| future::ready(map_row(row))) - } - - /// Execute the query and get all rows from the result as a `Vec`. - pub async fn fetch_all(self, executor: &mut E) -> crate::Result> - where - E: Executor, - { - self.fetch(executor).try_collect().await - } - - /// Execute a query which should return either 0 or 1 rows. - /// - /// Returns [crate::Error::FoundMoreThanOne] if more than 1 row is returned. - /// Use `.fetch().try_next()` if you just want one row. - pub async fn fetch_optional(self, executor: &mut E) -> crate::Result> - where - E: Executor, - { - executor - .fetch_optional(self.query, self.args.into_arguments()) - .await? - .map(self.map_row) - .transpose() - } - - /// Execute a query which should return exactly 1 row. - /// - /// * Returns [crate::Error::NotFound] if 0 rows are returned. - /// * Returns [crate::Error::FoundMoreThanOne] if more than one row is returned. - pub async fn fetch_one(self, executor: &mut E) -> crate::Result - where - E: Executor, - { - (self.map_row)( - executor - .fetch_one(self.query, self.args.into_arguments()) - .await?, - ) - } -} - -impl<'q, DB, R> QueryAs<'q, DB, R> -where - DB: Database, - DB::Arguments: Arguments, -{ - /// Bind a value for use with this SQL query. - /// - /// # Logic Safety - /// - /// This function should be used with care, as SQLx cannot validate - /// that the value is of the right type nor can it validate that you have - /// passed the correct number of parameters. - pub fn bind(mut self, value: T) -> Self - where - DB: HasSqlType, - T: Encode, - { - self.args.add(value); - self - } - - // used by query!() and friends - #[doc(hidden)] - pub fn bind_all(self, values: DB::Arguments) -> QueryAs<'q, DB, R, ImmutableArguments> { - QueryAs { - query: self.query, - args: ImmutableArguments(values), - map_row: self.map_row, - } - } -} - -/// Construct a dynamic SQL query with an explicit output type implementing [FromRow]. -#[inline] -pub fn query_as(query: &str) -> QueryAs -where - DB: Database, - T: FromRow, -{ - QueryAs { - query, - args: Default::default(), - map_row: |row| Ok(T::from_row(row)), - } -} - -#[doc(hidden)] -pub fn query_as_mapped( - query: &str, - map_row: fn(DB::Row) -> crate::Result, -) -> QueryAs -where - DB: Database, -{ - QueryAs { - query, - args: Default::default(), - map_row, - } -} diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index 7adcdd16..1ce2a5da 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -1,17 +1,19 @@ use std::ops::{Deref, DerefMut}; use futures_core::future::BoxFuture; -use futures_core::stream::BoxStream; use crate::connection::Connection; -use crate::database::Database; -use crate::describe::Describe; -use crate::executor::Executor; +use crate::database::HasCursor; +use crate::executor::{Execute, Executor}; use crate::runtime::spawn; +use crate::Database; +// Transaction> +// Transaction pub struct Transaction where - T: Connection + Send + 'static, + T: Connection, + T: Executor<'static>, { inner: Option, depth: u32, @@ -19,15 +21,16 @@ where impl Transaction where - T: Connection + Send + 'static, + T: Connection, + T: Executor<'static>, { pub(crate) async fn new(depth: u32, mut inner: T) -> crate::Result { if depth == 0 { - inner.send("BEGIN").await?; + inner.execute_by_ref("BEGIN").await?; } else { let stmt = format!("SAVEPOINT _sqlx_savepoint_{}", depth); - inner.send(&stmt).await?; + inner.execute_by_ref(&*stmt).await?; } Ok(Self { @@ -45,11 +48,11 @@ where let depth = self.depth; if depth == 1 { - inner.send("COMMIT").await?; + inner.execute_by_ref("COMMIT").await?; } else { let stmt = format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1); - inner.send(&stmt).await?; + inner.execute_by_ref(&*stmt).await?; } Ok(inner) @@ -60,11 +63,11 @@ where let depth = self.depth; if depth == 1 { - inner.send("ROLLBACK").await?; + inner.execute_by_ref("ROLLBACK").await?; } else { let stmt = format!("ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}", depth - 1); - inner.send(&stmt).await?; + inner.execute_by_ref(&*stmt).await?; } Ok(inner) @@ -73,20 +76,22 @@ where const ERR_FINALIZED: &str = "(bug) transaction already finalized"; -impl Deref for Transaction +impl Deref for Transaction where - Conn: Connection, + T: Connection, + T: Executor<'static>, { - type Target = Conn; + type Target = T; fn deref(&self) -> &Self::Target { self.inner.as_ref().expect(ERR_FINALIZED) } } -impl DerefMut for Transaction +impl DerefMut for Transaction where - Conn: Connection, + T: Connection, + T: Executor<'static>, { fn deref_mut(&mut self) -> &mut Self::Target { self.inner.as_mut().expect(ERR_FINALIZED) @@ -96,64 +101,52 @@ where impl Connection for Transaction where T: Connection, + T: Executor<'static>, { + type Database = ::Database; + // Close is equivalent to ROLLBACK followed by CLOSE fn close(self) -> BoxFuture<'static, crate::Result<()>> { Box::pin(async move { self.rollback().await?.close().await }) } } -impl Executor for Transaction +impl<'a, DB, T> Executor<'a> for &'a mut Transaction where - T: Connection, + DB: Database, + T: Connection, + T: Executor<'static, Database = DB>, { - type Database = T::Database; + type Database = ::Database; - fn send<'e, 'q: 'e>(&'e mut self, commands: &'q str) -> BoxFuture<'e, crate::Result<()>> { - self.deref_mut().send(commands) + fn execute<'b, E>(self, query: E) -> <::Database as HasCursor<'a>>::Cursor + where + E: Execute<'b, Self::Database>, + { + (**self).execute_by_ref(query) } - fn execute<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: ::Arguments, - ) -> BoxFuture<'e, crate::Result> { - self.deref_mut().execute(query, args) - } - - fn fetch<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: ::Arguments, - ) -> BoxStream<'e, crate::Result<::Row>> { - self.deref_mut().fetch(query, args) - } - - fn fetch_optional<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - args: ::Arguments, - ) -> BoxFuture<'e, crate::Result::Row>>> { - self.deref_mut().fetch_optional(query, args) - } - - fn describe<'e, 'q: 'e>( - &'e mut self, - query: &'q str, - ) -> BoxFuture<'e, crate::Result>> { - self.deref_mut().describe(query) + fn execute_by_ref<'b, 'c, E>( + &'c mut self, + query: E, + ) -> >::Cursor + where + E: Execute<'b, Self::Database>, + { + (**self).execute_by_ref(query) } } -impl Drop for Transaction +impl Drop for Transaction where - Conn: Connection, + T: Connection, + T: Executor<'static>, { fn drop(&mut self) { if self.depth > 0 { if let Some(mut inner) = self.inner.take() { spawn(async move { - let res = inner.send("ROLLBACK").await; + let res = inner.execute_by_ref("ROLLBACK").await; // If the rollback failed we need to close the inner connection if res.is_err() { diff --git a/src/lib.rs b/src/lib.rs index 8fc38ca3..2f982871 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,10 +17,7 @@ pub use sqlx_core::{ }; // Functions -pub use sqlx_core::{query, query_as}; - -#[doc(hidden)] -pub use sqlx_core::query_as_mapped; +pub use sqlx_core::query; #[cfg(feature = "mysql")] #[cfg_attr(docsrs, doc(cfg(feature = "mysql")))]