fix(pool): close when last handle is dropped, extra check in try_acquire

closes #1928
closes #2375
This commit is contained in:
Austin Bonander 2023-03-01 17:49:18 -08:00 committed by Austin Bonander
parent c17c59fc4c
commit 1fb1945aea
3 changed files with 80 additions and 16 deletions

View file

@ -17,9 +17,20 @@ pub trait Connection: Send {
/// Explicitly close this database connection.
///
/// This method is **not required** for safe and consistent operation. However, it is
/// recommended to call it instead of letting a connection `drop` as the database backend
/// will be faster at cleaning up resources.
/// This notifies the database server that the connection is closing so that it can
/// free up any server-side resources in use.
///
/// While connections can simply be dropped to clean up local resources,
/// the `Drop` handler itself cannot notify the server that the connection is being closed
/// because that may require I/O to send a termination message. That can result in a delay
/// before the server learns that the connection is gone, usually from a TCP keepalive timeout.
///
/// Creating and dropping many connections in short order without calling `.close()` may
/// lead to errors from the database server because those senescent connections will still
/// count against any connection limit or quota that is configured.
///
/// Therefore it is recommended to call `.close()` on a connection when you are done using it
/// and to `.await` the result to ensure the termination message is sent.
fn close(self) -> BoxFuture<'static, Result<(), Error>>;
/// Immediately close the connection without sending a graceful shutdown.

View file

@ -81,9 +81,13 @@ impl<DB: Database> PoolInner<DB> {
self.is_closed.load(Ordering::Acquire)
}
pub(super) fn close<'a>(self: &'a Arc<Self>) -> impl Future<Output = ()> + 'a {
fn mark_closed(&self) {
self.is_closed.store(true, Ordering::Release);
self.on_closed.notify(usize::MAX);
}
pub(super) fn close<'a>(self: &'a Arc<Self>) -> impl Future<Output = ()> + 'a {
self.mark_closed();
async move {
for permits in 1..=self.options.max_connections {
@ -209,6 +213,8 @@ impl<DB: Database> PoolInner<DB> {
}
/// Try to atomically increment the pool size for a new connection.
///
/// Returns `Err` if the pool is at max capacity already or is closed.
pub(super) fn try_increment_size<'a>(
self: &'a Arc<Self>,
permit: AsyncSemaphoreReleaser<'a>,
@ -216,12 +222,16 @@ impl<DB: Database> PoolInner<DB> {
match self
.size
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| {
if self.is_closed() {
return None;
}
size.checked_add(1)
.filter(|size| size <= &self.options.max_connections)
}) {
// we successfully incremented the size
Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)),
// the pool is at max capacity
// the pool is at max capacity or is closed
Err(_) => Err(permit),
}
}
@ -258,7 +268,9 @@ impl<DB: Database> PoolInner<DB> {
// we can open a new connection
guard
} else {
// This can happen for a child pool that's at its connection limit.
// This can happen for a child pool that's at its connection limit,
// or if the pool was closed between `acquire_permit()` and
// `try_increment_size()`.
tracing::debug!("woke but was unable to acquire idle connection or open new one; retrying");
// If so, we're likely in the current-thread runtime if it's Tokio
// and so we should yield to let any spawned release_to_pool() tasks
@ -395,6 +407,8 @@ impl<DB: Database> PoolInner<DB> {
impl<DB: Database> Drop for PoolInner<DB> {
fn drop(&mut self) {
self.mark_closed();
if let Some(parent) = &self.options.parent_pool {
// Release the stolen permits.
parent.0.semaphore.release(self.semaphore.permits());
@ -461,7 +475,9 @@ async fn check_idle_conn<DB: Database>(
}
fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
let pool = Arc::clone(&pool);
// NOTE: use `pool_weak` for the maintenance tasks so
// they don't keep `PoolInner` from being dropped.
let pool_weak = Arc::downgrade(&pool);
let period = match (pool.options.max_lifetime, pool.options.idle_timeout) {
(Some(it), None) | (None, Some(it)) => it,
@ -471,7 +487,9 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
(None, None) => {
if pool.options.min_connections > 0 {
crate::rt::spawn(async move {
pool.min_connections_maintenance(None).await;
if let Some(pool) = pool_weak.upgrade() {
pool.min_connections_maintenance(None).await;
}
});
}
@ -479,27 +497,41 @@ fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
}
};
// Immediately cancel this task if the pool is closed.
let mut close_event = pool.close_event();
crate::rt::spawn(async move {
// Immediately cancel this task if the pool is closed.
let _ = pool
.close_event()
let _ = close_event
.do_until(async {
while !pool.is_closed() {
let mut slept = true;
// If the last handle to the pool was dropped while we were sleeping
while let Some(pool) = pool_weak.upgrade() {
if pool.is_closed() {
return;
}
// Don't run the reaper right away.
if slept && !pool.idle_conns.is_empty() {
do_reap(&pool).await;
}
let next_run = Instant::now() + period;
pool.min_connections_maintenance(Some(next_run)).await;
// Don't hold a reference to the pool while sleeping.
drop(pool);
if let Some(duration) = next_run.checked_duration_since(Instant::now()) {
// `async-std` doesn't have a `sleep_until()`
crate::rt::sleep(duration).await;
} else {
// `next_run` is in the past, just yield.
crate::rt::yield_now().await;
}
// Don't run the reaper right away.
if !pool.idle_conns.is_empty() {
do_reap(&pool).await;
}
slept = true;
}
})
.await;

View file

@ -142,6 +142,27 @@ pub use self::maybe::MaybePoolConnection;
///
/// [web::Data]: https://docs.rs/actix-web/3/actix_web/web/struct.Data.html
///
/// ### Note: Drop Behavior
/// Due to a lack of async `Drop`, dropping the last `Pool` handle may not immediately clean
/// up connections by itself. The connections will be dropped locally, which is sufficient for
/// SQLite, but for client/server databases like MySQL and Postgres, that only closes the
/// client side of the connection. The server will not know the connection is closed until
/// potentially much later: this is usually dictated by the TCP keepalive timeout in the server
/// settings.
///
/// Because the connection may not be cleaned up immediately on the server side, you may run
/// into errors regarding connection limits if you are creating and dropping many pools in short
/// order.
///
/// We recommend calling [`.close().await`] to gracefully close the pool and its connections
/// when you are done using it. This will also wake any tasks that are waiting on an `.acquire()`
/// call, so for long-lived applications it's a good idea to call `.close()` during shutdown.
///
/// If you're writing tests, consider using `#[sqlx::test]` which handles the lifetime of
/// the pool for you.
///
/// [`.close().await`]: Pool::close
///
/// ### Why Use a Pool?
///
/// A single database connection (in general) cannot be used by multiple threads simultaneously