From a9fb26352093676a9d123efd1fa50cc8ef0859a8 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Fri, 22 Nov 2019 17:06:32 +0000 Subject: [PATCH] simplify pool implementation, run rustfmt --- sqlx-core/Cargo.toml | 4 +- sqlx-core/src/connection.rs | 50 +------ sqlx-core/src/lib.rs | 2 +- sqlx-core/src/mariadb/query.rs | 2 +- sqlx-core/src/mariadb/types/binary.rs | 6 +- sqlx-core/src/mariadb/types/boolean.rs | 2 +- sqlx-core/src/mariadb/types/character.rs | 4 +- sqlx-core/src/mariadb/types/numeric.rs | 10 +- sqlx-core/src/pool.rs | 171 +++++++++++++--------- sqlx-core/src/postgres/query.rs | 4 +- sqlx-core/src/postgres/types/binary.rs | 6 +- sqlx-core/src/postgres/types/boolean.rs | 2 +- sqlx-core/src/postgres/types/character.rs | 4 +- sqlx-core/src/postgres/types/numeric.rs | 10 +- sqlx-core/src/postgres/types/uuid.rs | 2 +- sqlx-core/src/sql.rs | 4 +- tests/sql-macro-test.rs | 3 +- 17 files changed, 139 insertions(+), 147 deletions(-) diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 32e604ba..9f695b6d 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -16,14 +16,12 @@ postgres = [] mariadb = [] [dependencies] -async-std = { version = "1.1.0", features = ["attributes"] } +async-std = { version = "1.1.0", features = ["attributes", "unstable"] } async-stream = "0.2.0" async-trait = "0.1.18" bitflags = "1.2.1" byteorder = { version = "1.3.2", default-features = false } bytes = "0.4.12" -crossbeam-queue = "0.2.0" -crossbeam-utils = { version = "0.7.0", default-features = false } futures-channel = "0.3.1" futures-core = "0.3.1" futures-util = "0.3.1" diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index bbf6ecd7..28aaaa68 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -15,7 +15,7 @@ pub struct Connection where DB: Backend, { - live: Option>, + live: Live, pool: Option>>, } @@ -24,25 +24,17 @@ where DB: Backend, { pub(crate) fn new(live: Live, pool: Option>>) -> Self { - Self { - live: Some(live), - pool, - } + Self { live, pool } } pub async fn open(url: &str) -> crate::Result { let raw = DB::open(url).await?; - let live = Live { - raw, - since: Instant::now(), - }; - - Ok(Self::new(live, None)) + Ok(Self::new(Live::unpooled(raw), None)) } /// Verifies a connection to the database is still alive. pub async fn ping(&mut self) -> crate::Result<()> { - self.live.as_mut().expect("released").raw.ping().await + self.live.ping().await } /// Analyze the SQL statement and report the inferred bind parameter types and returned @@ -50,12 +42,7 @@ where /// /// Mainly intended for use by sqlx-macros. pub async fn describe(&mut self, statement: &str) -> crate::Result> { - self.live - .as_mut() - .expect("released") - .raw - .describe(statement) - .await + self.live.describe(statement).await } } @@ -73,14 +60,7 @@ where where A: IntoQueryParameters + Send, { - Box::pin(async move { - self.live - .as_mut() - .expect("released") - .raw - .execute(query, params.into_params()) - .await - }) + Box::pin(async move { self.live.execute(query, params.into_params()).await }) } fn fetch<'c, 'q: 'c, T: 'c, A: 'c>( @@ -93,7 +73,7 @@ where T: FromSqlRow + Send + Unpin, { Box::pin(async_stream::try_stream! { - let mut s = self.live.as_mut().expect("released").raw.fetch(query, params.into_params()); + let mut s = self.live.fetch(query, params.into_params()); while let Some(row) = s.next().await.transpose()? { yield T::from_row(row); @@ -113,9 +93,6 @@ where Box::pin(async move { let row = self .live - .as_mut() - .expect("released") - .raw .fetch_optional(query, params.into_params()) .await?; @@ -123,16 +100,3 @@ where }) } } - -impl Drop for Connection -where - DB: Backend, -{ - fn drop(&mut self) { - if let Some(pool) = &self.pool { - if let Some(live) = self.live.take() { - pool.release(live); - } - } - } -} diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index 734f81d9..7293c72c 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -38,11 +38,11 @@ pub use self::{ compiled::CompiledSql, connection::Connection, decode::Decode, + encode::Encode, error::{Error, Result}, executor::Executor, pool::Pool, row::{FromSqlRow, Row}, - encode::Encode, sql::{query, SqlQuery}, types::HasSqlType, }; diff --git a/sqlx-core/src/mariadb/query.rs b/sqlx-core/src/mariadb/query.rs index 610005bc..fc3e9832 100644 --- a/sqlx-core/src/mariadb/query.rs +++ b/sqlx-core/src/mariadb/query.rs @@ -1,8 +1,8 @@ use super::MariaDb; use crate::{ + encode::{Encode, IsNull}, mariadb::types::MariaDbTypeMetadata, query::QueryParameters, - encode::{IsNull, Encode}, types::HasSqlType, }; diff --git a/sqlx-core/src/mariadb/types/binary.rs b/sqlx-core/src/mariadb/types/binary.rs index d235f897..4bf736d5 100644 --- a/sqlx-core/src/mariadb/types/binary.rs +++ b/sqlx-core/src/mariadb/types/binary.rs @@ -1,10 +1,10 @@ use crate::{ + encode::IsNull, mariadb::{ protocol::{FieldType, ParameterFlag}, types::MariaDbTypeMetadata, }, - encode::IsNull, - Decode, HasSqlType, MariaDb, Encode, + Decode, Encode, HasSqlType, MariaDb, }; impl HasSqlType<[u8]> for MariaDb { @@ -31,7 +31,7 @@ impl Encode for [u8] { impl Encode for Vec { fn encode(&self, buf: &mut Vec) -> IsNull { - <[u8] as Encode>::to_sql(self, buf) + <[u8] as Encode>::encode(self, buf) } } diff --git a/sqlx-core/src/mariadb/types/boolean.rs b/sqlx-core/src/mariadb/types/boolean.rs index 132b9b21..440413c1 100644 --- a/sqlx-core/src/mariadb/types/boolean.rs +++ b/sqlx-core/src/mariadb/types/boolean.rs @@ -1,8 +1,8 @@ use super::{MariaDb, MariaDbTypeMetadata}; use crate::{ decode::Decode, + encode::{Encode, IsNull}, mariadb::protocol::{FieldType, ParameterFlag}, - encode::{IsNull, Encode}, types::HasSqlType, }; diff --git a/sqlx-core/src/mariadb/types/character.rs b/sqlx-core/src/mariadb/types/character.rs index a0b6154f..6300b193 100644 --- a/sqlx-core/src/mariadb/types/character.rs +++ b/sqlx-core/src/mariadb/types/character.rs @@ -1,8 +1,8 @@ use super::{MariaDb, MariaDbTypeMetadata}; use crate::{ decode::Decode, + encode::{Encode, IsNull}, mariadb::protocol::{FieldType, ParameterFlag}, - encode::{IsNull, Encode}, types::HasSqlType, }; use std::str; @@ -37,7 +37,7 @@ impl Encode for str { impl Encode for String { #[inline] fn encode(&self, buf: &mut Vec) -> IsNull { - >::to_sql(self.as_str(), buf) + >::encode(self.as_str(), buf) } } diff --git a/sqlx-core/src/mariadb/types/numeric.rs b/sqlx-core/src/mariadb/types/numeric.rs index e85e192d..4104a264 100644 --- a/sqlx-core/src/mariadb/types/numeric.rs +++ b/sqlx-core/src/mariadb/types/numeric.rs @@ -1,8 +1,8 @@ use super::{MariaDb, MariaDbTypeMetadata}; use crate::{ decode::Decode, + encode::{Encode, IsNull}, mariadb::protocol::{FieldType, ParameterFlag}, - encode::{IsNull, Encode}, types::HasSqlType, }; use byteorder::{BigEndian, ByteOrder}; @@ -102,14 +102,14 @@ impl HasSqlType for MariaDb { impl Encode for f32 { #[inline] fn encode(&self, buf: &mut Vec) -> IsNull { - >::to_sql(&(self.to_bits() as i32), buf) + >::encode(&(self.to_bits() as i32), buf) } } impl Decode for f32 { #[inline] fn decode(buf: Option<&[u8]>) -> Self { - f32::from_bits(>::from_sql(buf) as u32) + f32::from_bits(>::decode(buf) as u32) } } @@ -127,13 +127,13 @@ impl HasSqlType for MariaDb { impl Encode for f64 { #[inline] fn encode(&self, buf: &mut Vec) -> IsNull { - >::to_sql(&(self.to_bits() as i64), buf) + >::encode(&(self.to_bits() as i64), buf) } } impl Decode for f64 { #[inline] fn decode(buf: Option<&[u8]>) -> Self { - f64::from_bits(>::from_sql(buf) as u64) + f64::from_bits(>::decode(buf) as u64) } } diff --git a/sqlx-core/src/pool.rs b/sqlx-core/src/pool.rs index b409dd8c..12115b09 100644 --- a/sqlx-core/src/pool.rs +++ b/sqlx-core/src/pool.rs @@ -2,12 +2,13 @@ use crate::{ backend::Backend, connection::Connection, error::Error, executor::Executor, query::IntoQueryParameters, row::FromSqlRow, }; -use crossbeam_queue::{ArrayQueue, SegQueue}; use futures_channel::oneshot; use futures_core::{future::BoxFuture, stream::BoxStream}; -use futures_util::stream::StreamExt; +use futures_util::{future::FutureExt, stream::StreamExt}; use std::{ + future::Future, marker::PhantomData, + ops::{Deref, DerefMut}, sync::{ atomic::{AtomicU32, AtomicUsize, Ordering}, Arc, @@ -15,6 +16,9 @@ use std::{ time::{Duration, Instant}, }; +use async_std::sync::{channel, Receiver, Sender}; +use async_std::task; + /// A pool of database connections. pub struct Pool(Arc>) where @@ -68,7 +72,7 @@ where /// Returns the number of idle connections. pub fn idle(&self) -> usize { - self.0.num_idle.load(Ordering::Acquire) + self.0.pool_rx.len() } /// Returns the configured maximum pool size. @@ -170,11 +174,9 @@ where DB: Backend, { url: String, - idle: ArrayQueue>, - waiters: SegQueue>>, + pool_rx: Receiver>, + pool_tx: Sender>, size: AtomicU32, - num_waiters: AtomicUsize, - num_idle: AtomicUsize, options: Options, } @@ -185,26 +187,20 @@ where async fn new(url: &str, options: Options) -> crate::Result { // TODO: Establish [min_idle] connections + let (pool_tx, pool_rx) = channel(options.max_size as usize); + Ok(Self { url: url.to_owned(), - idle: ArrayQueue::new(options.max_size as usize), - waiters: SegQueue::new(), + pool_rx, + pool_tx, size: AtomicU32::new(0), - num_idle: AtomicUsize::new(0), - num_waiters: AtomicUsize::new(0), options, }) } #[inline] fn try_acquire(&self) -> Option> { - if let Ok(idle) = self.idle.pop() { - self.num_idle.fetch_sub(1, Ordering::AcqRel); - - return Some(idle.live); - } - - None + Some(self.pool_rx.recv().now_or_never()??.live(&self.pool_tx)) } async fn acquire(&self) -> crate::Result> { @@ -219,54 +215,23 @@ where // Too many open connections // Wait until one is available - let (sender, receiver) = oneshot::channel(); - - self.waiters.push(sender); - self.num_waiters.fetch_add(1, Ordering::AcqRel); - // Waiters are not dropped unless the pool is dropped // which would drop this future - return Ok(receiver + return Ok(self + .pool_rx + .recv() .await - .expect("waiter dropped without dropping pool")); + .expect("waiter dropped without dropping pool") + .live(&self.pool_tx)); } if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size { // Open a new connection and return directly - let raw = DB::open(&self.url).await?; - let live = Live { - raw, - since: Instant::now(), - }; - - return Ok(live); + return Ok(Live::pooled(raw, &self.pool_tx)); } } } - - pub(crate) fn release(&self, mut live: Live) { - if self.num_waiters.load(Ordering::Acquire) > 0 { - while let Ok(waiter) = self.waiters.pop() { - self.num_waiters.fetch_sub(1, Ordering::AcqRel); - - live = match waiter.send(live) { - Ok(()) => { - return; - } - - Err(live) => live, - }; - } - } - - let _ = self.idle.push(Idle { - live, - since: Instant::now(), - }); - - self.num_idle.fetch_add(1, Ordering::AcqRel); - } } impl Executor for Pool @@ -338,9 +303,7 @@ where { Box::pin(async move { let mut live = self.0.acquire().await?; - let result = live.raw.execute(query, params.into_params()).await; - self.0.release(live); - + let result = live.execute(query, params.into_params()).await; result }) } @@ -356,14 +319,11 @@ where { Box::pin(async_stream::try_stream! { let mut live = self.0.acquire().await?; - let mut s = live.raw.fetch(query, params.into_params()); + let mut s = live.fetch(query, params.into_params()); while let Some(row) = s.next().await.transpose()? { yield T::from_row(row); } - - drop(s); - self.0.release(live); }) } @@ -378,29 +338,100 @@ where { Box::pin(async move { let mut live = self.0.acquire().await?; - let row = live.raw.fetch_optional(query, params.into_params()).await?; - - self.0.release(live); - + let row = live.fetch_optional(query, params.into_params()).await?; Ok(row.map(T::from_row)) }) } } +struct Raw { + pub(crate) inner: DB, + pub(crate) created: Instant, +} + struct Idle where DB: Backend, { - live: Live, + raw: Raw, #[allow(unused)] since: Instant, } +impl Idle { + fn live(self, pool_tx: &Sender>) -> Live { + Live { + raw: Some(self.raw), + pool_tx: Some(pool_tx.clone()), + } + } +} + pub(crate) struct Live where DB: Backend, { - pub(crate) raw: DB, - #[allow(unused)] - pub(crate) since: Instant, + raw: Option>, + pool_tx: Option>>, +} + +impl Live { + pub fn unpooled(raw: DB) -> Self { + Live { + raw: Some(Raw { + inner: raw, + created: Instant::now(), + }), + pool_tx: None, + } + } + + fn pooled(raw: DB, pool_tx: &Sender>) -> Self { + Live { + raw: Some(Raw { + inner: raw, + created: Instant::now(), + }), + pool_tx: Some(pool_tx.clone()), + } + } + + pub fn release(mut self) { + self.release_mut() + } + + fn release_mut(&mut self) { + // `.release_mut()` will be called twice if `.release()` is called + if let (Some(raw), Some(pool_tx)) = (self.raw.take(), self.pool_tx.as_ref()) { + pool_tx + .send(Idle { + raw, + since: Instant::now(), + }) + .now_or_never() + .expect("(bug) connection released into a full pool") + } + } +} + +const DEREF_ERR: &str = "(bug) connection already released to pool"; + +impl Deref for Live { + type Target = DB; + + fn deref(&self) -> &DB { + &self.raw.as_ref().expect(DEREF_ERR).inner + } +} + +impl DerefMut for Live { + fn deref_mut(&mut self) -> &mut DB { + &mut self.raw.as_mut().expect(DEREF_ERR).inner + } +} + +impl Drop for Live { + fn drop(&mut self) { + self.release_mut() + } } diff --git a/sqlx-core/src/postgres/query.rs b/sqlx-core/src/postgres/query.rs index 4ec4e690..3a6f61c1 100644 --- a/sqlx-core/src/postgres/query.rs +++ b/sqlx-core/src/postgres/query.rs @@ -1,8 +1,8 @@ use super::Postgres; use crate::{ + encode::{Encode, IsNull}, io::BufMut, query::QueryParameters, - encode::{IsNull, Encode}, types::HasSqlType, }; use byteorder::{BigEndian, ByteOrder, NetworkEndian}; @@ -44,7 +44,7 @@ impl QueryParameters for PostgresQueryParameters { (self.buf.len() - pos - 4) as i32 } else { // Write a -1 for the len to indicate NULL - // TODO: It is illegal for [to_sql] to write any data if IsSql::No; fail a debug assertion + // TODO: It is illegal for [encode] to write any data if IsSql::No; fail a debug assertion -1 }; diff --git a/sqlx-core/src/postgres/types/binary.rs b/sqlx-core/src/postgres/types/binary.rs index 750a08d1..f58a2143 100644 --- a/sqlx-core/src/postgres/types/binary.rs +++ b/sqlx-core/src/postgres/types/binary.rs @@ -1,7 +1,7 @@ use crate::{ - postgres::types::{PostgresTypeFormat, PostgresTypeMetadata}, encode::IsNull, - Decode, HasSqlType, Postgres, Encode, + postgres::types::{PostgresTypeFormat, PostgresTypeMetadata}, + Decode, Encode, HasSqlType, Postgres, }; impl HasSqlType<[u8]> for Postgres { @@ -29,7 +29,7 @@ impl Encode for [u8] { impl Encode for Vec { fn encode(&self, buf: &mut Vec) -> IsNull { - <[u8] as Encode>::to_sql(self, buf) + <[u8] as Encode>::encode(self, buf) } } diff --git a/sqlx-core/src/postgres/types/boolean.rs b/sqlx-core/src/postgres/types/boolean.rs index d346bd6f..e024ee44 100644 --- a/sqlx-core/src/postgres/types/boolean.rs +++ b/sqlx-core/src/postgres/types/boolean.rs @@ -1,7 +1,7 @@ use super::{Postgres, PostgresTypeFormat, PostgresTypeMetadata}; use crate::{ decode::Decode, - encode::{IsNull, Encode}, + encode::{Encode, IsNull}, types::HasSqlType, }; diff --git a/sqlx-core/src/postgres/types/character.rs b/sqlx-core/src/postgres/types/character.rs index c610ef05..83914cd1 100644 --- a/sqlx-core/src/postgres/types/character.rs +++ b/sqlx-core/src/postgres/types/character.rs @@ -1,7 +1,7 @@ use super::{Postgres, PostgresTypeFormat, PostgresTypeMetadata}; use crate::{ decode::Decode, - encode::{IsNull, Encode}, + encode::{Encode, IsNull}, types::HasSqlType, }; use std::str; @@ -36,7 +36,7 @@ impl Encode for str { impl Encode for String { #[inline] fn encode(&self, buf: &mut Vec) -> IsNull { - >::to_sql(self.as_str(), buf) + >::encode(self.as_str(), buf) } } diff --git a/sqlx-core/src/postgres/types/numeric.rs b/sqlx-core/src/postgres/types/numeric.rs index e8f7d3ed..f727c34d 100644 --- a/sqlx-core/src/postgres/types/numeric.rs +++ b/sqlx-core/src/postgres/types/numeric.rs @@ -1,7 +1,7 @@ use super::{Postgres, PostgresTypeFormat, PostgresTypeMetadata}; use crate::{ decode::Decode, - encode::{IsNull, Encode}, + encode::{Encode, IsNull}, types::HasSqlType, }; use byteorder::{BigEndian, ByteOrder}; @@ -101,14 +101,14 @@ impl HasSqlType for Postgres { impl Encode for f32 { #[inline] fn encode(&self, buf: &mut Vec) -> IsNull { - >::to_sql(&(self.to_bits() as i32), buf) + >::encode(&(self.to_bits() as i32), buf) } } impl Decode for f32 { #[inline] fn decode(buf: Option<&[u8]>) -> Self { - f32::from_bits(>::from_sql(buf) as u32) + f32::from_bits(>::decode(buf) as u32) } } @@ -126,13 +126,13 @@ impl HasSqlType for Postgres { impl Encode for f64 { #[inline] fn encode(&self, buf: &mut Vec) -> IsNull { - >::to_sql(&(self.to_bits() as i64), buf) + >::encode(&(self.to_bits() as i64), buf) } } impl Decode for f64 { #[inline] fn decode(buf: Option<&[u8]>) -> Self { - f64::from_bits(>::from_sql(buf) as u64) + f64::from_bits(>::decode(buf) as u64) } } diff --git a/sqlx-core/src/postgres/types/uuid.rs b/sqlx-core/src/postgres/types/uuid.rs index dc3ebb41..fb6d784f 100644 --- a/sqlx-core/src/postgres/types/uuid.rs +++ b/sqlx-core/src/postgres/types/uuid.rs @@ -3,7 +3,7 @@ use uuid::Uuid; use super::{Postgres, PostgresTypeFormat, PostgresTypeMetadata}; use crate::{ decode::Decode, - encode::{IsNull, Encode}, + encode::{Encode, IsNull}, types::HasSqlType, }; diff --git a/sqlx-core/src/sql.rs b/sqlx-core/src/sql.rs index 139b85be..1c4419f7 100644 --- a/sqlx-core/src/sql.rs +++ b/sqlx-core/src/sql.rs @@ -1,6 +1,6 @@ use crate::{ - backend::Backend, error::Error, executor::Executor, query::QueryParameters, row::FromSqlRow, - encode::Encode, types::HasSqlType, + backend::Backend, encode::Encode, error::Error, executor::Executor, query::QueryParameters, + row::FromSqlRow, types::HasSqlType, }; use futures_core::{future::BoxFuture, stream::BoxStream}; diff --git a/tests/sql-macro-test.rs b/tests/sql-macro-test.rs index ce5b841a..66ef125f 100644 --- a/tests/sql-macro-test.rs +++ b/tests/sql-macro-test.rs @@ -1,8 +1,7 @@ #[async_std::test] async fn test_sqlx_macro() -> sqlx::Result<()> { let mut conn = - sqlx::Connection::::open("postgres://postgres@127.0.0.1/sqlx_test") - .await?; + sqlx::Connection::::open("postgres://postgres@127.0.0.1/sqlx_test").await?; let uuid: sqlx::types::Uuid = "256ba9c8-0048-11ea-b0f0-8f04859d047e".parse().unwrap(); let accounts = sqlx::query!("SELECT * from accounts where id != $1", None) .fetch_optional(&mut conn)