Remove all listeners before returning a connection to the pool

This commit is contained in:
Diggory Blake 2021-03-13 21:27:56 +00:00 committed by Ryan Leckey
parent 01bef75cb9
commit a3b73f8e08
2 changed files with 76 additions and 0 deletions

View file

@ -257,6 +257,17 @@ impl PgListener {
}
}
impl Drop for PgListener {
fn drop(&mut self) {
if let Some(mut conn) = self.connection.take() {
// Unregister any listeners before returning the connection to the pool.
sqlx_rt::spawn(async move {
let _ = conn.execute("UNLISTEN *").await;
});
}
}
}
impl<'c> Executor<'c> for &'c mut PgListener {
type Database = Postgres;

View file

@ -888,3 +888,68 @@ from (values (null)) vals(val)
Ok(())
}
#[sqlx_macros::test]
async fn test_listener_cleanup() -> anyhow::Result<()> {
#[cfg(any(feature = "_rt-tokio", feature = "_rt-actix"))]
use tokio::time::timeout;
#[cfg(feature = "_rt-async-std")]
use async_std::future::timeout;
use sqlx::pool::PoolOptions;
use sqlx::postgres::PgListener;
// Create a connection on which to send notifications
let mut notify_conn = new::<Postgres>().await?;
// Create a pool with exactly one connection so we can
// deterministically test the cleanup.
let pool = PoolOptions::<Postgres>::new()
.min_connections(1)
.max_connections(1)
.test_before_acquire(true)
.connect(&env::var("DATABASE_URL")?)
.await?;
let mut listener = PgListener::connect_with(&pool).await?;
listener.listen("test_channel").await?;
// Checks for a notification on the test channel
async fn try_recv(listener: &mut PgListener) -> anyhow::Result<bool> {
match timeout(Duration::from_millis(100), listener.recv()).await {
Ok(res) => {
res?;
Ok(true)
}
Err(_) => Ok(false),
}
}
// Check no notification is received before one is sent
assert!(!try_recv(&mut listener).await?, "Notification not sent");
// Check notification is sent and received
notify_conn.execute("NOTIFY test_channel").await?;
assert!(
try_recv(&mut listener).await?,
"Notification sent and received"
);
assert!(
!try_recv(&mut listener).await?,
"Notification is not duplicated"
);
// Test that cleanup stops listening on the channel
drop(listener);
let mut listener = PgListener::connect_with(&pool).await?;
// Check notification is sent but not received
notify_conn.execute("NOTIFY test_channel").await?;
assert!(
!try_recv(&mut listener).await?,
"Notification is not received on fresh listener"
);
Ok(())
}