From 0e8ffb564b0228110bc2fb0de0b45f3f32c7b10a Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Fri, 20 Aug 2021 16:44:46 -0700 Subject: [PATCH] fix(postgres): avoid recursively spawning tasks in `PgListener::drop()` (#1393) refactor(pool): deprecate `PoolConnection::release()`, provide renamed alts --- sqlx-core/src/pool/connection.rs | 89 +++++++++++++++++++++--------- sqlx-core/src/postgres/listener.rs | 5 ++ 2 files changed, 68 insertions(+), 26 deletions(-) diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index ea31990a..643f8df1 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -10,6 +10,7 @@ use crate::database::Database; use crate::error::Error; use super::inner::{DecrementSizeGuard, SharedPool}; +use std::future::Future; /// A connection managed by a [`Pool`][crate::pool::Pool]. /// @@ -60,43 +61,79 @@ impl DerefMut for PoolConnection { impl PoolConnection { /// Explicitly release a connection from the pool - pub fn release(mut self) -> DB::Connection { + #[deprecated = "renamed to `.detach()` for clarity"] + pub fn release(self) -> DB::Connection { + self.detach() + } + + /// Detach this connection from the pool, allowing it to open a replacement. + /// + /// Note that if your application uses a single shared pool, this + /// effectively lets the application exceed the `max_connections` setting. + /// + /// If you want the pool to treat this connection as permanently checked-out, + /// use [`.leak()`][Self::leak] instead. + pub fn detach(mut self) -> DB::Connection { self.live .take() .expect("PoolConnection double-dropped") .float(&self.pool) .detach() } + + /// Detach this connection from the pool, treating it as permanently checked-out. + /// + /// This effectively will reduce the maximum capacity of the pool by 1 every time it is used. + /// + /// If you don't want to impact the pool's capacity, use [`.detach()`][Self::detach] instead. + pub fn leak(mut self) -> DB::Connection { + self.live.take().expect("PoolConnection double-dropped").raw + } + + /// Test the connection to make sure it is still live before returning it to the pool. + /// + /// This effectively runs the drop handler eagerly instead of spawning a task to do it. + pub(crate) fn return_to_pool(&mut self) -> impl Future + Send + 'static { + // we want these to happen synchronously so the drop handler doesn't try to spawn a task anyway + // this also makes the returned future `'static` + let live = self.live.take(); + let pool = self.pool.clone(); + + async move { + let mut floating = if let Some(live) = live { + live.float(&pool) + } else { + return; + }; + + // test the connection on-release to ensure it is still viable + // if an Executor future/stream is dropped during an `.await` call, the connection + // is likely to be left in an inconsistent state, in which case it should not be + // returned to the pool; also of course, if it was dropped due to an error + // this is simply a band-aid as SQLx-next (0.6) connections should be able + // to recover from cancellations + if let Err(e) = floating.raw.ping().await { + log::warn!( + "error occurred while testing the connection on-release: {}", + e + ); + + // we now consider the connection to be broken; just drop it to close + // trying to close gracefully might cause something weird to happen + drop(floating); + } else { + // if the connection is still viable, release it to the pool + pool.release(floating); + } + } + } } /// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from. impl Drop for PoolConnection { fn drop(&mut self) { - if let Some(live) = self.live.take() { - let pool = self.pool.clone(); - sqlx_rt::spawn(async move { - let mut floating = live.float(&pool); - - // test the connection on-release to ensure it is still viable - // if an Executor future/stream is dropped during an `.await` call, the connection - // is likely to be left in an inconsistent state, in which case it should not be - // returned to the pool; also of course, if it was dropped due to an error - // this is simply a band-aid as SQLx-next (0.6) connections should be able - // to recover from cancellations - if let Err(e) = floating.raw.ping().await { - log::warn!( - "error occurred while testing the connection on-release: {}", - e - ); - - // we now consider the connection to be broken; just drop it to close - // trying to close gracefully might cause something weird to happen - drop(floating); - } else { - // if the connection is still viable, release it to th epool - pool.release(floating); - } - }); + if self.live.is_some() { + sqlx_rt::spawn(self.return_to_pool()); } } } diff --git a/sqlx-core/src/postgres/listener.rs b/sqlx-core/src/postgres/listener.rs index 82c0460f..b9968f02 100644 --- a/sqlx-core/src/postgres/listener.rs +++ b/sqlx-core/src/postgres/listener.rs @@ -263,6 +263,11 @@ impl Drop for PgListener { // Unregister any listeners before returning the connection to the pool. sqlx_rt::spawn(async move { let _ = conn.execute("UNLISTEN *").await; + + // inline the drop handler from `PoolConnection` so it doesn't try to spawn another task + // otherwise, it may trigger a panic if this task is dropped because the runtime is going away: + // https://github.com/launchbadge/sqlx/issues/1389 + conn.return_to_pool().await; }); } }