feat: convenient wrapper for Postgres advisory locks (#1641)

This commit is contained in:
Austin Bonander 2022-03-24 17:38:24 -07:00 committed by GitHub
parent 9d76f7cd9e
commit e1817f0a9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 525 additions and 9 deletions

15
Cargo.lock generated
View file

@ -1098,6 +1098,16 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hkdf"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01706d578d5c281058480e673ae4086a9f4710d8df1ad80a5b03e39ece5f886b"
dependencies = [
"digest",
"hmac",
]
[[package]]
name = "hmac"
version = "0.11.0"
@ -1978,9 +1988,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.5.4"
version = "1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
checksum = "1a11647b6b25ff05a515cb92c365cec08801e83423a235b51e231e1808747286"
dependencies = [
"aho-corasick",
"memchr",
@ -2439,6 +2449,7 @@ dependencies = [
"git2",
"hashlink",
"hex",
"hkdf",
"hmac",
"indexmap",
"ipnetwork",

View file

@ -33,6 +33,7 @@ postgres = [
"json",
"dirs",
"whoami",
"hkdf"
]
mysql = [
"sha-1",
@ -168,6 +169,7 @@ hashlink = "0.7.0"
# NOTE: *must* remain below 1.7.0 to allow users to avoid the `ahash` cyclic dependency problem by pinning the version
# https://github.com/tkaitchuck/aHash/issues/95#issuecomment-874150078
indexmap = "1.6.0"
hkdf = { version = "0.11.0", optional = true }
[dev-dependencies]
sqlx = { version = "0.5.11", path = "..", features = ["postgres", "sqlite"] }

View file

@ -59,6 +59,18 @@ impl<DB: Database> DerefMut for PoolConnection<DB> {
}
}
impl<DB: Database> AsRef<DB::Connection> for PoolConnection<DB> {
fn as_ref(&self) -> &DB::Connection {
self
}
}
impl<DB: Database> AsMut<DB::Connection> for PoolConnection<DB> {
fn as_mut(&mut self) -> &mut DB::Connection {
self
}
}
impl<DB: Database> PoolConnection<DB> {
/// Explicitly release a connection from the pool
#[deprecated = "renamed to `.detach()` for clarity"]

View file

@ -0,0 +1,421 @@
use crate::error::Result;
use crate::postgres::PgConnection;
use crate::Either;
use hkdf::Hkdf;
use once_cell::sync::OnceCell;
use sha2::Sha256;
use std::ops::{Deref, DerefMut};
/// A mutex-like type utilizing [Postgres advisory locks].
///
/// Advisory locks are a mechanism provided by Postgres to have mutually exclusive or shared
/// locks tracked in the database with application-defined semantics, as opposed to the standard
/// row-level or table-level locks which may not fit all use-cases.
///
/// This API provides a convenient wrapper for generating and storing the integer keys that
/// advisory locks use, as well as RAII guards for releasing advisory locks when they fall out
/// of scope.
///
/// This API only handles session-scoped advisory locks (explicitly locked and unlocked, or
/// automatically released when a connection is closed).
///
/// It is also possible to use transaction-scoped locks but those can be used by beginning a
/// transaction and calling the appropriate lock functions (e.g. `SELECT pg_advisory_xact_lock()`)
/// manually, and cannot be explicitly released, but are automatically released when a transaction
/// ends (is committed or rolled back).
///
/// Session-level locks can be acquired either inside or outside a transaction and are not
/// tied to transaction semantics; a lock acquired inside a transaction is still held when that
/// transaction is committed or rolled back, until explicitly released or the connection is closed.
///
/// Locks can be acquired in either shared or exclusive modes, which can be thought of as read locks
/// and write locks, respectively. Multiple shared locks are allowed for the same key, but a single
/// exclusive lock prevents any other lock being taken for a given key until it is released.
///
/// [Postgres advisory locks]: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
#[derive(Debug, Clone)]
pub struct PgAdvisoryLock {
key: PgAdvisoryLockKey,
/// The query to execute to release this lock.
release_query: OnceCell<String>,
}
/// A key type natively used by Postgres advisory locks.
///
/// Currently, Postgres advisory locks have two different key spaces: one keyed by a single
/// 64-bit integer, and one keyed by a pair of two 32-bit integers. The Postgres docs
/// specify that these key spaces "do not overlap":
///
/// https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
///
/// The documentation for the `pg_locks` system view explains further how advisory locks
/// are treated in Postgres:
///
/// https://www.postgresql.org/docs/current/view-pg-locks.html
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum PgAdvisoryLockKey {
/// The keyspace designated by a single 64-bit integer.
///
/// When [PgAdvisoryLock] is constructed with [::new()][PgAdvisoryLock::new()],
/// this is the keyspace used.
BigInt(i64),
/// The keyspace designated by two 32-bit integers.
IntPair(i32, i32),
}
/// A wrapper for `PgConnection` (or a similar type) that represents a held Postgres advisory lock.
///
/// Can be acquired by [`PgAdvisoryLock::acquire()`] or [`PgAdvisoryLock::try_acquire()`].
/// Released on-drop or via [`Self::release_now()`].
///
/// ### Note: Release-on-drop is not immediate!
/// On drop, this guard queues a `pg_advisory_unlock()` call on the connection which will be
/// flushed to the server the next time it is used, or when it is returned to
/// a [`PgPool`][crate::postgres::PgPool] in the case of
/// [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
///
/// This means the lock is not actually released as soon as the guard is dropped. To ensure the
/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()].
pub struct PgAdvisoryLockGuard<'lock, C: AsMut<PgConnection>> {
lock: &'lock PgAdvisoryLock,
conn: Option<C>,
}
impl PgAdvisoryLock {
/// Construct a `PgAdvisoryLock` using the given string as a key.
///
/// This is intended to make it easier to use an advisory lock by using a human-readable string
/// for a key as opposed to manually generating a unique integer key. The generated integer key
/// is guaranteed to be stable and in the single 64-bit integer keyspace
/// (see [`PgAdvisoryLockKey`] for details).
///
/// This is done by applying the [Hash-based Key Derivation Function (HKDF; IETF RFC 5869)][hkdf]
/// to the bytes of the input string, but in a way that the calculated integer is unlikely
/// to collide with any similar implementations (although we don't currently know of any).
/// See the source of this method for details.
///
/// [hkdf]: https://datatracker.ietf.org/doc/html/rfc5869
/// ### Example
/// ```rust
/// # extern crate sqlx_core as sqlx;
/// use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockKey};
///
/// let lock = PgAdvisoryLock::new("my first Postgres advisory lock!");
/// // Negative values are fine because of how Postgres treats advisory lock keys.
/// // See the documentation for the `pg_locks` system view for details.
/// assert_eq!(lock.key(), &PgAdvisoryLockKey::BigInt(-5560419505042474287));
/// ```
pub fn new(key_string: impl AsRef<str>) -> Self {
let input_key_material = key_string.as_ref();
// HKDF was chosen because it is designed to concentrate the entropy in a variable-length
// input key and produce a higher quality but reduced-length output key with a
// well-specified and reproducible algorithm.
//
// Granted, the input key is usually meant to be pseudorandom and not human readable,
// but we're not trying to produce an unguessable value by any means; just one that's as
// unlikely to already be in use as possible, but still deterministic.
//
// SHA-256 was chosen as the hash function because it's already used in the Postgres driver,
// which should save on codegen and optimization.
// We don't supply a salt as that is intended to be random, but we want a deterministic key.
let hkdf = Hkdf::<Sha256>::new(None, input_key_material.as_bytes());
let mut output_key_material = [0u8; 8];
// The first string is the "info" string of the HKDF which is intended to tie the output
// exclusively to SQLx. This should avoid collisions with implementations using a similar
// strategy. If you _want_ this to match some other implementation then you should get
// the calculated integer key from it and use that directly.
//
// Do *not* change this string as it will affect the output!
hkdf.expand(
b"SQLx (Rust) Postgres advisory lock",
&mut output_key_material,
)
// `Hkdf::expand()` only returns an error if you ask for more than 255 times the digest size.
// This is specified by RFC 5869 but not elaborated upon:
// https://datatracker.ietf.org/doc/html/rfc5869#section-2.3
// Since we're only asking for 8 bytes, this error shouldn't be returned.
.expect("BUG: `output_key_material` should be of acceptable length");
// For ease of use, this method assumes the user doesn't care which keyspace is used.
//
// It doesn't seem likely that someone would care about using the `(int, int)` keyspace
// specifically unless they already had keys to use, in which case they wouldn't
// care about this method. That's why we also provide `with_key()`.
//
// The choice of `from_le_bytes()` is mostly due to x86 being the most popular
// architecture for server software, so it should be a no-op there.
let key = PgAdvisoryLockKey::BigInt(i64::from_le_bytes(output_key_material));
log::trace!(
"generated {:?} from key string {:?}",
key,
input_key_material
);
Self::with_key(key)
}
/// Construct a `PgAdvisoryLock` with a manually supplied key.
pub fn with_key(key: PgAdvisoryLockKey) -> Self {
Self {
key,
release_query: OnceCell::new(),
}
}
/// Returns the current key.
pub fn key(&self) -> &PgAdvisoryLockKey {
&self.key
}
// Why doesn't this use `Acquire`? Well, I tried it and got really useless errors
// about "cannot project lifetimes to parent scope".
//
// It has something to do with how lifetimes work on the `Acquire` trait, I couldn't
// be bothered to figure it out. Probably another issue with a lack of `async fn` in traits
// or lazy normalization.
/// Acquires an exclusive lock using `pg_advisory_lock()`, waiting until the lock is acquired.
///
/// For a version that returns immediately instead of waiting, see [`Self::try_acquire()`].
///
/// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
/// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
/// any of these.
///
/// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
/// which will be executed the next time the connection is used, or when returned to a
/// [`PgPool`][crate::postgres::PgPool] in the case of `PoolConnection<Postgres>`.
///
/// Postgres allows a single connection to acquire a given lock more than once without releasing
/// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
/// must match the number of lock operations for the lock to actually be released.
///
/// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
///
/// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
pub async fn acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<PgAdvisoryLockGuard<'_, C>> {
match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query::query("SELECT pg_advisory_lock($1)")
.bind(key)
.execute(conn.as_mut())
.await?;
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query::query("SELECT pg_advisory_lock($1, $2)")
.bind(key1)
.bind(key2)
.execute(conn.as_mut())
.await?;
}
}
Ok(PgAdvisoryLockGuard::new(self, conn))
}
/// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
/// if the lock could not be acquired.
///
/// For a version that waits until the lock is acquired, see [`Self::acquire()`].
///
/// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
/// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
/// any of these. The connection is returned if the lock could not be acquired.
///
/// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
/// which will be executed the next time the connection is used, or when returned to a
/// [`PgPool`][crate::postgres::PgPool] in the case of `PoolConnection<Postgres>`.
///
/// Postgres allows a single connection to acquire a given lock more than once without releasing
/// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
/// must match the number of lock operations for the lock to actually be released.
///
/// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
///
/// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
pub async fn try_acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<Either<PgAdvisoryLockGuard<'_, C>, C>> {
let locked: bool = match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
.bind(key)
.fetch_one(conn.as_mut())
.await?
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1, $2)")
.bind(key1)
.bind(key2)
.fetch_one(conn.as_mut())
.await?
}
};
if locked {
Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn)))
} else {
Ok(Either::Right(conn))
}
}
/// Execute `pg_advisory_unlock()` for this lock's key on the given connection.
///
/// This is used by [`PgAdvisoryLockGuard::release_now()`] and is also provided for manually
/// releasing the lock from connections returned by [`PgAdvisoryLockGuard::leak()`].
///
/// An error should only be returned if there is something wrong with the connection,
/// in which case the lock will be automatically released by the connection closing anyway.
///
/// The `boolean` value is that returned by `pg_advisory_lock()`. If it is `false`, it
/// indicates that the lock was not actually held by the given connection and that a warning
/// has been logged by the Postgres server.
pub async fn force_release<C: AsMut<PgConnection>>(&self, mut conn: C) -> Result<(C, bool)> {
let released: bool = match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1)")
.bind(key)
.fetch_one(conn.as_mut())
.await?
}
PgAdvisoryLockKey::IntPair(key1, key2) => {
crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1, $2)")
.bind(key1)
.bind(key2)
.fetch_one(conn.as_mut())
.await?
}
};
Ok((conn, released))
}
fn get_release_query(&self) -> &str {
self.release_query.get_or_init(|| match &self.key {
PgAdvisoryLockKey::BigInt(key) => format!("SELECT pg_advisory_unlock({})", key),
PgAdvisoryLockKey::IntPair(key1, key2) => {
format!("SELECT pg_advisory_unlock({}, {})", key1, key2)
}
})
}
}
impl PgAdvisoryLockKey {
/// Converts `Self::Bigint(bigint)` to `Some(bigint)` and all else to `None`.
pub fn as_bigint(&self) -> Option<i64> {
if let Self::BigInt(bigint) = self {
Some(*bigint)
} else {
None
}
}
}
const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";
impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self {
PgAdvisoryLockGuard {
lock,
conn: Some(conn),
}
}
/// Immediately release the held advisory lock instead of when the connection is next used.
///
/// An error should only be returned if there is something wrong with the connection,
/// in which case the lock will be automatically released by the connection closing anyway.
///
/// If `pg_advisory_unlock()` returns `false`, a warning will be logged, both by SQLx as
/// well as the Postgres server. This would only happen if the lock was released without
/// using this guard, or the connection was swapped using [`std::mem::replace()`].
pub async fn release_now(mut self) -> Result<C> {
let (conn, released) = self
.lock
.force_release(self.conn.take().expect(NONE_ERR))
.await?;
if !released {
log::warn!(
"PgAdvisoryLockGuard: advisory lock {:?} was not held by the contained connection",
self.lock.key
);
}
Ok(conn)
}
/// Cancel the release of the advisory lock, keeping it held until the connection is closed.
///
/// To manually release the lock later, see [`PgAdvisoryLock::force_release()`].
pub fn leak(mut self) -> C {
self.conn.take().expect(NONE_ERR)
}
}
impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<'lock, C> {
type Target = PgConnection;
fn deref(&self) -> &Self::Target {
self.conn.as_ref().expect(NONE_ERR).as_ref()
}
}
/// Mutable access to the underlying connection is provided so it can still be used like normal,
/// even allowing locks to be taken recursively.
///
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut
for PgAdvisoryLockGuard<'lock, C>
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
}
impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
for PgAdvisoryLockGuard<'lock, C>
{
fn as_ref(&self) -> &PgConnection {
self.conn.as_ref().expect(NONE_ERR).as_ref()
}
}
/// Mutable access to the underlying connection is provided so it can still be used like normal,
/// even allowing locks to be taken recursively.
///
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<'lock, C> {
fn as_mut(&mut self) -> &mut PgConnection {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
}
/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed
/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::postgres::PgPool]
/// in the case of [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
impl<'lock, C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<'lock, C> {
fn drop(&mut self) {
if let Some(mut conn) = self.conn.take() {
// Queue a simple query message to execute next time the connection is used.
// The `async fn` versions can safely use the prepared statement protocol,
// but this is the safest way to queue a query to execute on the next opportunity.
conn.as_mut()
.queue_simple_query(self.lock.get_release_query());
}
}
}

View file

@ -12,7 +12,7 @@ use crate::executor::Executor;
use crate::ext::ustr::UStr;
use crate::io::Decode;
use crate::postgres::message::{
Close, Message, MessageFormat, ReadyForQuery, Terminate, TransactionStatus,
Close, Message, MessageFormat, Query, ReadyForQuery, Terminate, TransactionStatus,
};
use crate::postgres::statement::PgStatementMetadata;
use crate::postgres::{PgConnectOptions, PgTypeInfo, Postgres};
@ -101,6 +101,14 @@ impl PgConnection {
Ok(())
}
/// Queue a simple query (not prepared) to execute the next time this connection is used.
///
/// Used for rolling back transactions and releasing advisory locks.
pub(crate) fn queue_simple_query(&mut self, query: &str) {
self.pending_ready_for_query_count += 1;
self.stream.write(Query(query));
}
}
impl Debug for PgConnection {

View file

@ -2,6 +2,7 @@
use crate::executor::Executor;
mod advisory_lock;
mod arguments;
mod column;
mod connection;
@ -23,6 +24,7 @@ mod value;
#[cfg(feature = "migrate")]
mod migrate;
pub use advisory_lock::{PgAdvisoryLock, PgAdvisoryLockGuard, PgAdvisoryLockKey};
pub use arguments::{PgArgumentBuffer, PgArguments};
pub use column::PgColumn;
pub use connection::{PgConnection, PgConnectionInfo};

View file

@ -2,7 +2,6 @@ use futures_core::future::BoxFuture;
use crate::error::Error;
use crate::executor::Executor;
use crate::postgres::message::Query;
use crate::postgres::{PgConnection, Postgres};
use crate::transaction::{
begin_ansi_transaction_sql, commit_ansi_transaction_sql, rollback_ansi_transaction_sql,
@ -54,10 +53,7 @@ impl TransactionManager for PgTransactionManager {
fn start_rollback(conn: &mut PgConnection) {
if conn.transaction_depth > 0 {
conn.pending_ready_for_query_count += 1;
conn.stream.write(Query(&rollback_ansi_transaction_sql(
conn.transaction_depth,
)));
conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.transaction_depth));
conn.transaction_depth -= 1;
}

View file

@ -1,11 +1,12 @@
use futures::{StreamExt, TryStreamExt};
use sqlx::postgres::{
PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgSeverity,
PgAdvisoryLock, PgConnectOptions, PgConnection, PgDatabaseError, PgErrorPosition, PgSeverity,
};
use sqlx::postgres::{PgConnectionInfo, PgPoolOptions, PgRow, Postgres};
use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
use sqlx_test::{new, setup_if_needed};
use std::env;
use std::sync::Arc;
use std::time::Duration;
#[sqlx_macros::test]
@ -1445,3 +1446,66 @@ CREATE TABLE issue_1254 (id INT4 PRIMARY KEY, pairs PAIR[]);
Ok(())
}
#[sqlx_macros::test]
async fn test_advisory_locks() -> anyhow::Result<()> {
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&dotenv::var("DATABASE_URL")?)
.await?;
let lock1 = Arc::new(PgAdvisoryLock::new("sqlx-postgres-tests-1"));
let lock2 = Arc::new(PgAdvisoryLock::new("sqlx-postgres-tests-2"));
let conn1 = pool.acquire().await?;
let mut conn1_lock1 = lock1.acquire(conn1).await?;
// try acquiring a recursive lock through a mutable reference then dropping
drop(lock1.acquire(&mut conn1_lock1).await?);
let conn2 = pool.acquire().await?;
// leak so we can take it across the task boundary
let conn2_lock2 = lock2.acquire(conn2).await?.leak();
sqlx_rt::spawn({
let lock1 = lock1.clone();
let lock2 = lock2.clone();
async move {
let conn2_lock2 = lock1.try_acquire(conn2_lock2).await?.right_or_else(|_| {
panic!(
"acquired lock but wasn't supposed to! Key: {:?}",
lock1.key()
)
});
let (conn2, released) = lock2.force_release(conn2_lock2).await?;
assert!(released);
// acquire both locks but let the pool release them
let conn2_lock1 = lock1.acquire(conn2).await?;
let _conn2_lock1and2 = lock2.acquire(conn2_lock1).await?;
anyhow::Ok(())
}
});
// acquire lock2 on conn1, we leak the lock1 guard so we can manually release it before lock2
let conn1_lock1and2 = lock2.acquire(conn1_lock1.leak()).await?;
// release lock1 while holding lock2
let (conn1_lock2, released) = lock1.force_release(conn1_lock1and2).await?;
assert!(released);
let conn1 = conn1_lock2.release_now().await?;
// acquire both locks to be sure they were released
{
let conn1_lock1 = lock1.acquire(conn1).await?;
let _conn1_lock1and2 = lock2.acquire(conn1_lock1).await?;
}
pool.close().await;
Ok(())
}