fix: pool internals improvements

* fix `DecrementSizeGuard::drop()` only waking one `Waiter` regardless of whether that waiter was already woken
* fix connect-backoff loop giving up the size guard
* don't cut in line to open a new connection
* have tasks waiting on `acquire()` wake periodically to check if there's a connection in the queue

Signed-off-by: Austin Bonander <austin@launchbadge.com>
This commit is contained in:
Austin Bonander 2021-04-01 19:18:01 -07:00 committed by Ryan Leckey
parent 64e872ffd8
commit 5295ff10a5
4 changed files with 148 additions and 138 deletions

View file

@ -69,7 +69,7 @@ impl<DB: Database> PoolConnection<DB> {
/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(mut live) = self.live.take() {
if let Some(live) = self.live.take() {
let pool = self.pool.clone();
spawn(async move {
let mut floating = live.float(&pool);

View file

@ -7,14 +7,17 @@ use crate::pool::{deadline_as_timeout, PoolOptions};
use crossbeam_queue::{ArrayQueue, SegQueue};
use futures_core::task::{Poll, Waker};
use futures_util::future;
use sqlx_rt::{sleep, spawn, timeout};
use std::cmp;
use std::mem;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use std::task::Context;
use std::time::Instant;
use std::time::{Duration, Instant};
/// Waiters should wake at least every this often to check if a connection has not come available
/// since they went to sleep.
const MIN_WAKE_PERIOD: Duration = Duration::from_millis(500);
pub(crate) struct SharedPool<DB: Database> {
pub(super) connect_options: <DB::Connection as Connection>::Options,
@ -26,6 +29,26 @@ pub(crate) struct SharedPool<DB: Database> {
}
impl<DB: Database> SharedPool<DB> {
pub(super) fn new_arc(
options: PoolOptions<DB>,
connect_options: <DB::Connection as Connection>::Options,
) -> Arc<Self> {
let pool = Self {
connect_options,
idle_conns: ArrayQueue::new(options.max_connections as usize),
waiters: SegQueue::new(),
size: AtomicU32::new(0),
is_closed: AtomicBool::new(false),
options,
};
let pool = Arc::new(pool);
spawn_reaper(&pool);
pool
}
pub(super) fn size(&self) -> u32 {
self.size.load(Ordering::Acquire)
}
@ -94,12 +117,7 @@ impl<DB: Database> SharedPool<DB> {
panic!("BUG: connection queue overflow in release()");
}
while let Some(waker) = self.waiters.pop() {
if let Some(waker) = waker.upgrade() {
waker.wake();
break;
}
}
wake_one(&self.waiters);
}
/// Try to atomically increment the pool size for a new connection.
@ -125,68 +143,20 @@ impl<DB: Database> SharedPool<DB> {
None
}
/// Wait for a connection, if either `size` drops below `max_connections` so we can
/// open a new connection, or if an idle connection is returned to the pool.
///
/// Returns an error if `deadline` elapses before we are woken.
async fn wait_for_conn(&self, deadline: Instant) -> Result<(), Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
let mut waiter = None;
timeout(
deadline_as_timeout::<DB>(deadline)?,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|cx| -> Poll<()> {
let waiter = waiter.get_or_insert_with(|| {
let waiter = Waiter::new(cx);
self.waiters.push(Arc::downgrade(&waiter));
waiter
});
if waiter.is_woken() {
Poll::Ready(())
} else {
Poll::Pending
}
}),
)
.await
.map_err(|_| Error::PoolTimedOut)
}
pub(super) fn new_arc(
options: PoolOptions<DB>,
connect_options: <DB::Connection as Connection>::Options,
) -> Arc<Self> {
let pool = Self {
connect_options,
idle_conns: ArrayQueue::new(options.max_connections as usize),
waiters: SegQueue::new(),
size: AtomicU32::new(0),
is_closed: AtomicBool::new(false),
options,
};
let pool = Arc::new(pool);
spawn_reaper(&pool);
pool
}
#[allow(clippy::needless_lifetimes)]
pub(super) async fn acquire<'s>(&'s self) -> Result<Floating<'s, Live<DB>>, Error> {
let start = Instant::now();
let deadline = start + self.options.connect_timeout;
let mut waited = !self.options.fair;
let mut backoff = 0.01;
// the strong ref of the `Weak<Waiter>` that we push to the queue
// initialized during the `timeout()` call below
// as long as we own this, we keep our place in line
let mut waiter = None;
// Unless the pool has been closed ...
while !self.is_closed() {
// Don't cut in line
// Don't cut in line unless no one is waiting
if waited || self.waiters.is_empty() {
// Attempt to immediately acquire a connection. This will return Some
// if there is an idle connection in our channel.
@ -195,28 +165,40 @@ impl<DB: Database> SharedPool<DB> {
return Ok(live);
}
}
}
if let Some(guard) = self.try_increment_size() {
// pool has slots available; open a new connection
match self.connection(deadline, guard).await {
Ok(Some(conn)) => return Ok(conn),
// [size] is internally decremented on _retry_ and _error_
Ok(None) => {
// If the connection is refused wait in exponentially
// increasing steps for the server to come up, capped by
// two seconds.
sqlx_rt::sleep(std::time::Duration::from_secs_f64(backoff)).await;
backoff = f64::min(backoff * 2.0, 2.0);
continue;
}
Err(e) => return Err(e),
// check if we can open a new connection
if let Some(guard) = self.try_increment_size() {
// pool has slots available; open a new connection
return self.connection(deadline, guard).await;
}
}
// Wait for a connection to become available (or we are allowed to open a new one)
// Returns an error if `deadline` passes
self.wait_for_conn(deadline).await?;
let timeout_duration = cmp::min(
// Returns an error if `deadline` passes
deadline_as_timeout::<DB>(deadline)?,
MIN_WAKE_PERIOD,
);
sqlx_rt::timeout(
timeout_duration,
// `poll_fn` gets us easy access to a `Waker` that we can push to our queue
future::poll_fn(|cx| -> Poll<()> {
let waiter = waiter.get_or_insert_with(|| {
let waiter = Waiter::new(cx);
self.waiters.push(Arc::downgrade(&waiter));
waiter
});
if waiter.is_woken() {
Poll::Ready(())
} else {
Poll::Pending
}
}),
)
.await
.ok(); // timeout is no longer fatal here; we check if the deadline expired above
waited = true;
}
@ -228,39 +210,51 @@ impl<DB: Database> SharedPool<DB> {
&'s self,
deadline: Instant,
guard: DecrementSizeGuard<'s>,
) -> Result<Option<Floating<'s, Live<DB>>>, Error> {
) -> Result<Floating<'s, Live<DB>>, Error> {
if self.is_closed() {
return Err(Error::PoolClosed);
}
let timeout = super::deadline_as_timeout::<DB>(deadline)?;
let mut backoff = Duration::from_millis(10);
let max_backoff = deadline_as_timeout::<DB>(deadline)? / 5;
// result here is `Result<Result<C, Error>, TimeoutError>`
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
if let Some(callback) = &self.options.after_connect {
callback(&mut raw).await?;
loop {
let timeout = deadline_as_timeout::<DB>(deadline)?;
// result here is `Result<Result<C, Error>, TimeoutError>`
// if this block does not return, sleep for the backoff timeout and try again
match sqlx_rt::timeout(timeout, self.connect_options.connect()).await {
// successfully established connection
Ok(Ok(mut raw)) => {
if let Some(callback) = &self.options.after_connect {
callback(&mut raw).await?;
}
return Ok(Floating::new_live(raw, guard));
}
Ok(Some(Floating::new_live(raw, guard)))
// an IO error while connecting is assumed to be the system starting up
Ok(Err(Error::Io(e))) if e.kind() == std::io::ErrorKind::ConnectionRefused => (),
// TODO: Handle other database "boot period"s
// [postgres] the database system is starting up
// TODO: Make this check actually check if this is postgres
Ok(Err(Error::Database(error))) if error.code().as_deref() == Some("57P03") => (),
// Any other error while connection should immediately
// terminate and bubble the error up
Ok(Err(e)) => return Err(e),
// timed out
Err(_) => return Err(Error::PoolTimedOut),
}
// an IO error while connecting is assumed to be the system starting up
Ok(Err(Error::Io(e))) if e.kind() == std::io::ErrorKind::ConnectionRefused => Ok(None),
// TODO: Handle other database "boot period"s
// [postgres] the database system is starting up
// TODO: Make this check actually check if this is postgres
Ok(Err(Error::Database(error))) if error.code().as_deref() == Some("57P03") => Ok(None),
// Any other error while connection should immediately
// terminate and bubble the error up
Ok(Err(e)) => Err(e),
// timed out
Err(_) => Err(Error::PoolTimedOut),
// If the connection is refused wait in exponentially
// increasing steps for the server to come up,
// capped by a factor of the remaining time until the deadline
sqlx_rt::sleep(backoff).await;
backoff = cmp::min(backoff * 2, max_backoff);
}
}
}
@ -334,7 +328,7 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
let pool = Arc::clone(&pool);
spawn(async move {
sqlx_rt::spawn(async move {
while !pool.is_closed.load(Ordering::Acquire) {
// reap at most the current size minus the minimum idle
let max_reaped = pool.size().saturating_sub(pool.options.min_connections);
@ -360,11 +354,21 @@ fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>) {
let _ = conn.close().await;
}
sleep(period).await;
sqlx_rt::sleep(period).await;
}
});
}
fn wake_one(waiters: &SegQueue<Weak<Waiter>>) {
while let Some(weak) = waiters.pop() {
if let Some(waiter) = weak.upgrade() {
if waiter.wake() {
return;
}
}
}
}
/// RAII guard returned by `Pool::try_increment_size()` and others.
///
/// Will decrement the pool size if dropped, to avoid semantically "leaking" connections
@ -399,11 +403,7 @@ impl Drop for DecrementSizeGuard<'_> {
assert!(!self.dropped, "double-dropped!");
self.dropped = true;
self.size.fetch_sub(1, Ordering::SeqCst);
if let Some(waker) = self.waiters.pop() {
if let Some(waker) = waker.upgrade() {
waker.wake();
}
}
wake_one(&self.waiters);
}
}
@ -420,9 +420,20 @@ impl Waiter {
})
}
fn wake(&self) {
self.woken.store(true, Ordering::Release);
self.waker.wake_by_ref();
/// Wake this waiter if it has not previously been woken.
///
/// Return `true` if this waiter was newly woken, or `false` if it was already woken.
fn wake(&self) -> bool {
// if we were the thread to flip this boolean from false to true
if let Ok(_) = self
.woken
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
{
self.waker.wake_by_ref();
return true;
}
false
}
fn is_woken(&self) -> bool {

View file

@ -235,16 +235,14 @@ async fn init_min_connections<DB: Database>(pool: &SharedPool<DB>) -> Result<(),
// this guard will prevent us from exceeding `max_size`
if let Some(guard) = pool.try_increment_size() {
// [connect] will raise an error when past deadline
// [connect] returns None if its okay to retry
if let Some(conn) = pool.connection(deadline, guard).await? {
let is_ok = pool
.idle_conns
.push(conn.into_idle().into_leakable())
.is_ok();
let conn = pool.connection(deadline, guard).await?;
let is_ok = pool
.idle_conns
.push(conn.into_idle().into_leakable())
.is_ok();
if !is_ok {
panic!("BUG: connection queue overflow in init_min_connections");
}
if !is_ok {
panic!("BUG: connection queue overflow in init_min_connections");
}
}
}

View file

@ -6,7 +6,6 @@ use sqlx::postgres::{PgPoolOptions, PgRow, Postgres};
use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
use sqlx_test::{new, setup_if_needed};
use std::env;
use std::thread;
use std::time::Duration;
#[sqlx_macros::test]
@ -505,11 +504,7 @@ async fn it_can_drop_multiple_transactions() -> anyhow::Result<()> {
#[ignore]
#[sqlx_macros::test]
async fn pool_smoke_test() -> anyhow::Result<()> {
#[cfg(any(feature = "_rt-tokio", feature = "_rt-actix"))]
use tokio::{task::spawn, time::sleep, time::timeout};
#[cfg(feature = "_rt-async-std")]
use async_std::{future::timeout, task::sleep, task::spawn};
use futures::{future, task::Poll, Future};
eprintln!("starting pool");
@ -523,7 +518,7 @@ async fn pool_smoke_test() -> anyhow::Result<()> {
// spin up more tasks than connections available, and ensure we don't deadlock
for i in 0..20 {
let pool = pool.clone();
spawn(async move {
sqlx_rt::spawn(async move {
loop {
if let Err(e) = sqlx::query("select 1 + 1").execute(&pool).await {
eprintln!("pool task {} dying due to {}", i, e);
@ -535,26 +530,32 @@ async fn pool_smoke_test() -> anyhow::Result<()> {
for _ in 0..5 {
let pool = pool.clone();
// we don't need async, just need this to run concurrently
// if we use `task::spawn()` we risk starving the event loop because we don't yield
thread::spawn(move || {
sqlx_rt::spawn(async move {
while !pool.is_closed() {
// drop acquire() futures in a hot loop
// https://github.com/launchbadge/sqlx/issues/83
drop(pool.acquire());
let acquire = pool.acquire();
futures::pin_mut!(acquire);
// poll the acquire future once to put the waiter in the queue
future::poll_fn(move |cx| {
let _ = acquire.as_mut().poll(cx);
Poll::Ready(())
})
.await;
sqlx_rt::yield_now().await;
}
});
}
eprintln!("sleeping for 30 seconds");
sleep(Duration::from_secs(30)).await;
sqlx_rt::sleep(Duration::from_secs(30)).await;
// assert_eq!(pool.size(), 10);
eprintln!("closing pool");
timeout(Duration::from_secs(30), pool.close()).await?;
sqlx_rt::timeout(Duration::from_secs(30), pool.close()).await?;
eprintln!("pool closed successfully");