feat(pool): implement close-event notification (#1776)

fix(postgres): integrate pool close-event into `PgListener`

closes #1764
This commit is contained in:
Austin Bonander 2022-04-05 16:17:35 -07:00 committed by GitHub
parent acb3da8a34
commit f1c635d739
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 231 additions and 13 deletions

6
Cargo.lock generated
View file

@ -808,9 +808,9 @@ dependencies = [
[[package]]
name = "event-listener"
version = "2.5.1"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
[[package]]
name = "fastrand"
@ -2439,6 +2439,7 @@ dependencies = [
"dirs",
"either",
"encoding_rs",
"event-listener",
"flume",
"futures-channel",
"futures-core",
@ -2480,6 +2481,7 @@ dependencies = [
"stringprep",
"thiserror",
"time 0.2.27",
"tokio",
"tokio-stream",
"url",
"uuid",

View file

@ -170,6 +170,8 @@ hashlink = "0.7.0"
# https://github.com/tkaitchuck/aHash/issues/95#issuecomment-874150078
indexmap = "1.6.0"
hkdf = { version = "0.11.0", optional = true }
event-listener = "2.5.2"
[dev-dependencies]
sqlx = { version = "0.5.11", path = "..", features = ["postgres", "sqlite"] }
tokio = { version = "1", features = ["rt"] }

View file

@ -28,6 +28,7 @@ pub(crate) struct SharedPool<DB: Database> {
pub(super) semaphore: Semaphore,
pub(super) size: AtomicU32,
is_closed: AtomicBool,
pub(super) on_closed: event_listener::Event,
pub(super) options: PoolOptions<DB>,
}
@ -50,6 +51,7 @@ impl<DB: Database> SharedPool<DB> {
semaphore: Semaphore::new(options.fair, capacity),
size: AtomicU32::new(0),
is_closed: AtomicBool::new(false),
on_closed: event_listener::Event::new(),
options,
};
@ -81,6 +83,7 @@ impl<DB: Database> SharedPool<DB> {
// we can't just do `usize::MAX` because that would overflow
// and we can't do this more than once cause that would _also_ overflow
self.semaphore.release(WAKE_ALL_PERMITS);
self.on_closed.notify(usize::MAX);
}
// wait for all permits to be released

View file

@ -61,9 +61,14 @@ use crate::connection::Connection;
use crate::database::Database;
use crate::error::Error;
use crate::transaction::Transaction;
use event_listener::EventListener;
use futures_core::FusedFuture;
use futures_util::FutureExt;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
#[macro_use]
@ -229,6 +234,13 @@ pub use self::options::PoolOptions;
/// well and queries will generally benefit from these caches being "warm" (populated with data).
pub struct Pool<DB: Database>(pub(crate) Arc<SharedPool<DB>>);
/// A future that resolves when the pool is closed.
///
/// See [`Pool::close_event()`] for details.
pub struct CloseEvent {
listener: Option<EventListener>,
}
impl<DB: Database> Pool<DB> {
/// Creates a new connection pool with a default pool configuration and
/// the given connection URI; and, immediately establishes one connection.
@ -324,6 +336,84 @@ impl<DB: Database> Pool<DB> {
self.0.is_closed()
}
/// Get a future that resolves when [`Pool::close()`] is called.
///
/// If the pool is already closed, the future resolves immediately.
///
/// This can be used to cancel long-running operations that hold onto a [`PoolConnection`]
/// so they don't prevent the pool from closing (which would otherwise wait until all
/// connections are returned).
///
/// Examples
/// ========
/// These examples use Postgres and Tokio, but should suffice to demonstrate the concept.
///
/// Do something when the pool is closed:
/// ```rust,no_run
/// # #[cfg(feature = "postgres")]
/// # async fn bleh() -> sqlx_core::error::Result<()> {
/// use sqlx::PgPool;
///
/// let pool = PgPool::connect("postgresql://...").await?;
///
/// let pool2 = pool.clone();
///
/// tokio::spawn(async move {
/// // Demonstrates that `CloseEvent` is itself a `Future` you can wait on.
/// // This lets you implement any kind of on-close event that you like.
/// pool2.close_event().await;
///
/// println!("Pool is closing!");
///
/// // Imagine maybe recording application statistics or logging a report, etc.
/// });
///
/// // The rest of the application executes normally...
///
/// // Close the pool before the application exits...
/// pool.close().await;
///
/// # Ok(())
/// # }
/// ```
///
/// Cancel a long-running operation:
/// ```rust,no_run
/// # #[cfg(feature = "postgres")]
/// # async fn bleh() -> sqlx_core::error::Result<()> {
/// use sqlx::{Executor, PgPool};
///
/// let pool = PgPool::connect("postgresql://...").await?;
///
/// let pool2 = pool.clone();
///
/// tokio::spawn(async move {
/// pool2.close_event().do_until(async {
/// // This statement normally won't return for 30 days!
/// // (Assuming the connection doesn't time out first, of course.)
/// pool2.execute("SELECT pg_sleep('30 days')").await;
///
/// // If the pool is closed before the statement completes, this won't be printed.
/// // This is because `.do_until()` cancels the future it's given if the
/// // pool is closed first.
/// println!("Waited!");
/// }).await;
/// });
///
/// // This normally wouldn't return until the above statement completed and the connection
/// // was returned to the pool. However, thanks to `.do_until()`, the operation was
/// // cancelled as soon as we called `.close().await`.
/// pool.close().await;
///
/// # Ok(())
/// # }
/// ```
pub fn close_event(&self) -> CloseEvent {
CloseEvent {
listener: (!self.is_closed()).then(|| self.0.on_closed.listen()),
}
}
/// Returns the number of connections currently active. This includes idle connections.
pub fn size(&self) -> u32 {
self.0.size()
@ -368,6 +458,65 @@ impl<DB: Database> fmt::Debug for Pool<DB> {
}
}
impl CloseEvent {
/// Execute the given future until it returns or the pool is closed.
///
/// Cancels the future and returns `Err(PoolClosed)` if/when the pool is closed.
/// If the pool was already closed, the future is never run.
pub async fn do_until<Fut: Future>(&mut self, fut: Fut) -> Result<Fut::Output, Error> {
// Check that the pool wasn't closed already.
//
// We use `poll_immediate()` as it will use the correct waker instead of
// a no-op one like `.now_or_never()`, but it won't actually suspend execution here.
futures_util::future::poll_immediate(&mut *self)
.await
.map_or(Ok(()), |_| Err(Error::PoolClosed))?;
futures_util::pin_mut!(fut);
// I find that this is clearer in intent than `futures_util::future::select()`
// or `futures_util::select_biased!{}` (which isn't enabled anyway).
futures_util::future::poll_fn(|cx| {
// Poll `fut` first as the wakeup event is more likely for it than `self`.
if let Poll::Ready(ret) = fut.as_mut().poll(cx) {
return Poll::Ready(Ok(ret));
}
// Can't really factor out mapping to `Err(Error::PoolClosed)` though it seems like
// we should because that results in a different `Ok` type each time.
//
// Ideally we'd map to something like `Result<!, Error>` but using `!` as a type
// is not allowed on stable Rust yet.
self.poll_unpin(cx).map(|_| Err(Error::PoolClosed))
})
.await
}
}
impl Future for CloseEvent {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(listener) = &mut self.listener {
futures_core::ready!(listener.poll_unpin(cx));
}
// `EventListener` doesn't like being polled after it yields, and even if it did it
// would probably just wait for the next event, neither of which we want.
//
// So this way, once we get our close event, we fuse this future to immediately return.
self.listener = None;
Poll::Ready(())
}
}
impl FusedFuture for CloseEvent {
fn is_terminated(&self) -> bool {
self.listener.is_none()
}
}
/// get the time between the deadline and now and use that as our timeout
///
/// returns `Error::PoolTimedOut` if the deadline is in the past

View file

@ -1,3 +1,12 @@
use std::fmt::{self, Debug};
use std::io;
use std::str::from_utf8;
use either::Either;
use futures_channel::mpsc;
use futures_core::future::BoxFuture;
use futures_core::stream::{BoxStream, Stream};
use crate::describe::Describe;
use crate::error::Error;
use crate::executor::{Execute, Executor};
@ -5,13 +14,6 @@ use crate::pool::PoolOptions;
use crate::pool::{Pool, PoolConnection};
use crate::postgres::message::{MessageFormat, Notification};
use crate::postgres::{PgConnection, PgQueryResult, PgRow, PgStatement, PgTypeInfo, Postgres};
use either::Either;
use futures_channel::mpsc;
use futures_core::future::BoxFuture;
use futures_core::stream::{BoxStream, Stream};
use std::fmt::{self, Debug};
use std::io;
use std::str::from_utf8;
/// A stream of asynchronous notifications from Postgres.
///
@ -25,6 +27,7 @@ pub struct PgListener {
buffer_rx: mpsc::UnboundedReceiver<Notification>,
buffer_tx: Option<mpsc::UnboundedSender<Notification>>,
channels: Vec<String>,
ignore_close_event: bool,
}
/// An asynchronous notification from Postgres.
@ -41,7 +44,11 @@ impl PgListener {
.connect(uri)
.await?;
Self::connect_with(&pool).await
let mut this = Self::connect_with(&pool).await?;
// We don't need to handle close events
this.ignore_close_event = true;
Ok(this)
}
pub async fn connect_with(pool: &Pool<Postgres>) -> Result<Self, Error> {
@ -58,9 +65,33 @@ impl PgListener {
buffer_rx: receiver,
buffer_tx: None,
channels: Vec::new(),
ignore_close_event: false,
})
}
/// Set whether or not to ignore [`Pool::close_event()`]. Defaults to `false`.
///
/// By default, when [`Pool::close()`] is called on the pool this listener is using
/// while [`Self::recv()`] or [`Self::try_recv()`] are waiting for a message, the wait is
/// cancelled and `Err(PoolClosed)` is returned.
///
/// This is because `Pool::close()` will wait until _all_ connections are returned and closed,
/// including the one being used by this listener.
///
/// Otherwise, `pool.close().await` would have to wait until `PgListener` encountered a
/// need to acquire a new connection (timeout, error, etc.) and dropped the one it was
/// currently holding, at which point `.recv()` or `.try_recv()` would return `Err(PoolClosed)`
/// on the attempt to acquire a new connection anyway.
///
/// However, if you want `PgListener` to ignore the close event and continue waiting for a
/// message as long as it can, set this to `true`.
///
/// Does nothing if this was constructed with [`PgListener::connect()`], as that creates an
/// internal pool just for the new instance of `PgListener` which cannot be closed manually.
pub fn ignore_pool_close_event(&mut self, val: bool) {
self.ignore_close_event = val;
}
/// Starts listening for notifications on a channel.
/// The channel name is quoted here to ensure case sensitivity.
pub async fn listen(&mut self, channel: &str) -> Result<(), Error> {
@ -202,11 +233,24 @@ impl PgListener {
return Ok(Some(PgNotification(notification)));
}
// Fetch our `CloseEvent` listener, if applicable.
let mut close_event = (!self.ignore_close_event).then(|| self.pool.close_event());
loop {
// Ensure we have an active connection to work with.
self.connect_if_needed().await?;
let message = match self.connection().stream.recv_unchecked().await {
let next_message = self.connection().stream.recv_unchecked();
let res = if let Some(ref mut close_event) = close_event {
// cancels the wait and returns `Err(PoolClosed)` if the pool is closed
// before `next_message` returns, or if the pool was already closed
close_event.do_until(next_message).await?
} else {
next_message.await
};
let message = match res {
Ok(message) => message,
// The connection is dead, ensure that it is dropped,

View file

@ -1,10 +1,11 @@
use futures::{StreamExt, TryStreamExt};
use sqlx::postgres::{
PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgSeverity,
PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgListener,
PgSeverity,
};
use sqlx::postgres::{PgConnectionInfo, PgPoolOptions, PgRow, Postgres};
use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
use sqlx_test::{new, setup_if_needed};
use sqlx_test::{new, pool, setup_if_needed};
use std::env;
use std::sync::Arc;
use std::time::Duration;
@ -967,6 +968,23 @@ async fn test_listener_cleanup() -> anyhow::Result<()> {
Ok(())
}
#[sqlx_macros::test]
async fn test_pg_listener_allows_pool_to_close() -> anyhow::Result<()> {
let pool = pool::<Postgres>().await?;
// acquires and holds a connection which would normally prevent the pool from closing
let mut listener = PgListener::connect_with(&pool).await?;
sqlx_rt::spawn(async move {
listener.recv().await;
});
// would previously hang forever since `PgListener` had no way to know the pool wanted to close
pool.close().await;
Ok(())
}
#[sqlx_macros::test]
async fn it_supports_domain_types_in_composite_domain_types() -> anyhow::Result<()> {
// Only supported in Postgres 11+