implement pool idle reaper, format relevant files

This commit is contained in:
Austin Bonander 2019-11-23 16:08:36 +00:00 committed by Austin Bonander
parent e155aa7aea
commit 8d9e949cc2
3 changed files with 90 additions and 43 deletions

View file

@ -12,7 +12,7 @@ use futures_core::stream::BoxStream;
/// Instead [sqlx::Connection] or [sqlx::Pool] should be used instead,
/// which provide concurrent access and typed retrieval of results.
#[async_trait]
pub trait Backend: HasTypeMetadata + Send + Sync + Sized {
pub trait Backend: HasTypeMetadata + Send + Sync + Sized + 'static {
/// The concrete `QueryParameters` implementation for this backend.
type QueryParameters: QueryParameters<Backend = Self>;

View file

@ -76,6 +76,8 @@ impl Display for Error {
Error::TimedOut => f.write_str("timed out while waiting for an open connection"),
Error::PoolClosed => f.write_str("attempted to acquire a connection on a closed pool"),
Error::__Nonexhaustive => unreachable!(),
}
}

View file

@ -8,18 +8,15 @@ use crate::{
};
use futures_channel::oneshot;
use futures_core::{future::BoxFuture, stream::BoxStream};
use futures_util::{future::{FutureExt, TryFutureExt}, stream::StreamExt};
use futures_util::future::{AbortHandle, AbortRegistration};
use std::{
future::Future,
marker::PhantomData,
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicU32, AtomicUsize, AtomicBool, Ordering},
Arc,
},
time::{Duration, Instant},
use futures_util::{
future::{FutureExt, TryFutureExt},
stream::StreamExt,
};
use std::{future::Future, marker::PhantomData, ops::{Deref, DerefMut}, sync::{
atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
Arc,
}, time::{Duration, Instant}, cmp};
use async_std::future::timeout;
use async_std::sync::{channel, Receiver, Sender};
@ -36,9 +33,7 @@ where
{
/// Creates a connection pool with the default configuration.
pub async fn new(url: &str) -> crate::Result<Self> {
Ok(Pool(Arc::new(
SharedPool::new(url, Options::default()).await?,
)))
Ok(Pool(SharedPool::new_arc(url, Options::default()).await?))
}
/// Returns a [Builder] to configure a new connection pool.
@ -91,8 +86,8 @@ where
self.0.options.connect_timeout
}
/// Returns the configured mimimum idle connection count.
pub fn min_idle(&self) -> Option<u32> {
/// Returns the configured minimum idle connection count.
pub fn min_idle(&self) -> u32 {
self.0.options.min_idle
}
@ -147,8 +142,8 @@ where
self
}
pub fn min_idle(mut self, min_idle: impl Into<Option<u32>>) -> Self {
self.options.min_idle = min_idle.into();
pub fn min_idle(mut self, min_idle: u32) -> Self {
self.options.min_idle = min_idle;
self
}
@ -163,14 +158,14 @@ where
}
pub async fn build(self, url: &str) -> crate::Result<Pool<DB>> {
Ok(Pool(Arc::new(SharedPool::new(url, self.options).await?)))
Ok(Pool(SharedPool::new_arc(url, self.options).await?))
}
}
struct Options {
max_size: u32,
connect_timeout: Duration,
min_idle: Option<u32>,
min_idle: u32,
max_lifetime: Option<Duration>,
idle_timeout: Option<Duration>,
}
@ -179,7 +174,7 @@ impl Default for Options {
fn default() -> Self {
Self {
max_size: 10,
min_idle: None,
min_idle: 0,
connect_timeout: Duration::from_secs(30),
max_lifetime: None,
idle_timeout: None,
@ -203,19 +198,23 @@ impl<DB> SharedPool<DB>
where
DB: Backend,
{
async fn new(url: &str, options: Options) -> crate::Result<Self> {
async fn new_arc(url: &str, options: Options) -> crate::Result<Arc<Self>> {
// TODO: Establish [min_idle] connections
let (pool_tx, pool_rx) = channel(options.max_size as usize);
Ok(Self {
let pool = Arc::new(Self {
url: url.to_owned(),
pool_rx,
pool_tx,
size: AtomicU32::new(0),
closed: AtomicBool::new(false),
options,
})
});
conn_reaper(&pool);
Ok(pool)
}
async fn close(&self) {
@ -227,9 +226,9 @@ where
// and a timeout isn't necessarily appropriate
match self.pool_rx.recv().now_or_never() {
Some(Some(idle)) => {
let _ = idle.raw.inner.close().await;
idle.close().await;
self.size.fetch_sub(1, Ordering::AcqRel);
},
}
Some(None) => panic!("we own a Sender how did this happen"),
None => task::yield_now().await,
}
@ -242,7 +241,7 @@ where
return None;
}
Some(self.pool_rx.recv().now_or_never()??.live(&self.pool_tx))
Some(self.pool_rx.recv().now_or_never()??.revive(&self.pool_tx))
}
async fn acquire(&self) -> crate::Result<Live<DB>> {
@ -261,11 +260,12 @@ where
// Wait until one is available
// get the time between the deadline and now and use that as our timeout
let max_wait = deadline.checked_duration_since(Instant::now())
let max_wait = deadline
.checked_duration_since(Instant::now())
.ok_or(Error::TimedOut)?;
// don't sleep forever
let idle = match timeout(max_wait, self.pool_rx.recv()).await {
let mut idle = match timeout(max_wait, self.pool_rx.recv()).await {
Ok(Some(idle)) => idle,
Ok(None) => panic!("this isn't possible, we own a `pool_tx`"),
// try our acquire logic again
@ -273,16 +273,15 @@ where
};
if self.closed.load(Ordering::Acquire) {
let _ = idle.raw.inner.close().await;
idle.close().await;
self.size.fetch_sub(1, Ordering::AcqRel);
return Err(Error::PoolClosed);
}
// check if idle connection was within max lifetime (or not set)
if self.options.max_lifetime.map_or(true, |max| idle.raw.created.elapsed() < max)
// and if connection wasn't idle too long (or not set)
&& self.options.idle_timeout.map_or(true, |timeout| idle.since.elapsed() < timeout)
{
if should_reap(&idle, &self.options) {
// close the connection but don't really care about the result
idle.close().await;
} else {
match idle.raw.inner.ping().await {
Ok(_) => return Ok(idle.revive(&self.pool_tx)),
// an error here means the other end has hung up or we lost connectivity
@ -290,21 +289,18 @@ where
// the error itself here isn't necessarily unexpected so WARN is too strong
Err(e) => log::info!("ping on idle connection returned error: {}", e),
}
} else {
// close the connection but don't really care about the result
let _ = idle.raw.inner.close().await;
// make sure the idle connection is gone explicitly before we open one
drop(idle);
}
// either case, make sure the idle connection is gone explicitly before we open one
drop(idle);
// while we're still at max size, acquire a new connection
return self.new_conn(deadline).await
return self.new_conn(deadline).await;
}
if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size {
// Open a new connection and return directly
return self.new_conn(deadline).await
return self.new_conn(deadline).await;
}
}
@ -469,6 +465,10 @@ impl<DB: Backend> Idle<DB> {
pool_tx: Some(pool_tx.clone()),
}
}
async fn close(self) {
let _ = self.raw.inner.close().await;
}
}
pub(crate) struct Live<DB>
@ -539,3 +539,48 @@ impl<DB: Backend> Drop for Live<DB> {
self.release_mut()
}
}
fn should_reap<DB: Backend>(idle: &Idle<DB>, options: &Options) -> bool {
// check if idle connection was within max lifetime (or not set)
options.max_lifetime.map_or(true, |max| idle.raw.created.elapsed() < max)
// and if connection wasn't idle too long (or not set)
&& options.idle_timeout.map_or(true, |timeout| idle.since.elapsed() < timeout)
}
/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections
fn conn_reaper<DB: Backend>(pool: &Arc<SharedPool<DB>>) {
if pool.options.max_lifetime.is_some() || pool.options.idle_timeout.is_some() {
let pool = pool.clone();
let reap_period = cmp::min(pool.options.max_lifetime, pool.options.idle_timeout)
.expect("one of max_lifetime/idle_timeout should be `Some` at this point");
task::spawn(async move {
while !pool.closed.load(Ordering::AcqRel) {
// reap at most the current size minus the minimum idle
let max_reaped = pool
.size
.load(Ordering::Acquire)
.saturating_sub(pool.options.min_idle);
// collect connections to reap
let (reap, mut keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.pool_rx.recv().now_or_never()?)
.partition::<Vec<_>, _>(|conn| should_reap(conn, &pool.options));
for conn in keep {
// return these connections to the pool first
pool.pool_tx.send(conn).await;
}
for conn in reap {
conn.close().await;
pool.size.fetch_sub(1, Ordering::AcqRel);
}
task::sleep(reap_period).await;
}
});
}
}