Use crossbeam queues to achieve fairness in the pool

This commit is contained in:
Ryan Leckey 2020-01-11 03:26:48 -08:00
parent 03251b719b
commit 95fac72abf
5 changed files with 195 additions and 172 deletions

13
Cargo.lock generated
View file

@ -278,6 +278,15 @@ dependencies = [
"crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-queue"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "crossbeam-utils"
version = "0.6.6"
@ -1163,7 +1172,10 @@ dependencies = [
"bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1556,6 +1568,7 @@ dependencies = [
"checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca"
"checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac"
"checksum crossbeam-queue 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7c979cd6cfe72335896575c6b5688da489e420d36a27a0b9eb0c73db574b4a4b"
"checksum crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db"
"checksum crossbeam-utils 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "04973fa96e96579258a5091af6003abde64af786b860f18622b82e026cca60e6"
"checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4"
"checksum data-encoding 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4f47ca1860a761136924ddd2422ba77b2ea54fe8cc75b9040804a0d9d32ad97"

View file

@ -12,12 +12,7 @@ const SECRET_KEY: &str = "this-is-the-most-secret-key-ever-secreted";
#[async_std::main]
async fn main() -> anyhow::Result<()> {
env_logger::try_init()?;
let pool = PgPool::builder()
.max_size(100)
.test_on_acquire(false)
.build(&env::var("DATABASE_URL")?).await?;
let pool = PgPool::new(&env::var("DATABASE_URL")?).await?;
let mut server = tide::with_state(pool);
@ -25,7 +20,7 @@ async fn main() -> anyhow::Result<()> {
server.at("/api/user").get(get_current_user);
server.listen(("0.0.0.0", 8080)).await?;
server.listen(("localhost", 8080)).await?;
Ok(())
}

View file

@ -24,8 +24,11 @@ async-stream = { version = "0.2.0", default-features = false }
base64 = { version = "0.11.0", default-features = false, optional = true, features = [ "std" ] }
bitflags = { version = "1.2.1", default-features = false }
byteorder = { version = "1.3.2", default-features = false }
crossbeam-queue = "0.2.1"
crossbeam-utils = { version = "0.7.0", default-features = false }
chrono = { version = "0.4.10", default-features = false, features = [ "clock" ], optional = true }
digest = { version = "0.8.1", default-features = false, optional = true, features = [ "std" ] }
futures-channel = { version = "0.3.1", default-features = false }
futures-core = { version = "0.3.1", default-features = false }
futures-util = { version = "0.3.1", default-features = false }
generic-array = { version = "0.12.3", default-features = false, optional = true }

View file

@ -3,12 +3,12 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Instant;
use crossbeam_queue::{ArrayQueue, SegQueue};
use async_std::prelude::FutureExt as _;
use async_std::sync::{channel, Receiver, Sender};
use futures_channel::oneshot::{channel, Sender};
use async_std::task;
use futures_util::future::FutureExt as _;
use super::{Idle, Options, Raw};
use super::{Idle, Options, Live};
use crate::{error::Error, Connection, Database};
pub(super) struct SharedPool<DB>
@ -16,9 +16,10 @@ where
DB: Database,
{
url: String,
pool_rx: Receiver<Idle<DB>>,
idle: ArrayQueue<Idle<DB>>,
waiters: SegQueue<Sender<Live<DB>>>,
size: AtomicU32,
closed: AtomicBool,
is_closed: AtomicBool,
options: Options,
}
@ -30,33 +31,35 @@ where
pub(super) async fn new_arc(
url: &str,
options: Options,
) -> crate::Result<(Arc<Self>, Sender<Idle<DB>>)> {
let (pool_tx, pool_rx) = channel(options.max_size as usize);
) -> crate::Result<Arc<Self>> {
let pool = Arc::new(Self {
url: url.to_owned(),
pool_rx,
idle: ArrayQueue::new(options.max_size as usize),
waiters: SegQueue::new(),
size: AtomicU32::new(0),
closed: AtomicBool::new(false),
is_closed: AtomicBool::new(false),
options,
});
// If a minimum size was configured for the pool,
// establish N connections
// TODO: Should we do this in the background?
for _ in 0..pool.options.min_size {
let raw = pool
let live = pool
.eventually_connect(Instant::now() + pool.options.connect_timeout)
.await?;
pool_tx
.send(Idle {
raw,
since: Instant::now(),
})
.await;
// Ignore error here, we are capping this loop by min_size which we
// already should make sure is less than max_size
let _ = pool.idle.push(Idle {
live,
since: Instant::now(),
});
}
conn_reaper(&pool, &pool_tx);
spawn_reaper(&pool);
Ok((pool, pool_tx))
Ok(pool)
}
pub fn options(&self) -> &Options {
@ -72,66 +75,82 @@ where
}
pub(super) fn num_idle(&self) -> usize {
self.pool_rx.len()
// NOTE: This is very expensive
self.waiters.len()
}
pub(super) fn closed(&self) -> bool {
self.closed.load(Ordering::SeqCst)
pub(super) fn is_closed(&self) -> bool {
self.is_closed.load(Ordering::Acquire)
}
pub(super) async fn close(&self) {
self.closed.store(true, Ordering::Release);
self.is_closed.store(true, Ordering::Release);
while self.size.load(Ordering::Acquire) > 0 {
// don't block on the receiver because we own one Sender so it should never return
// `None`; a `select!()` would also work but that produces more complicated code
// and a timeout isn't necessarily appropriate
match self.pool_rx.recv().now_or_never() {
Some(Some(idle)) => {
idle.close().await;
self.size.fetch_sub(1, Ordering::AcqRel);
}
Some(None) => {
log::warn!("was not able to close all connections");
break;
}
None => task::yield_now().await,
while let Ok(idle) = self.idle.pop() {
idle.close().await;
self.size.fetch_sub(1, Ordering::AcqRel);
}
task::yield_now().await
}
}
#[inline]
pub(super) fn try_acquire(&self) -> Option<Raw<DB>> {
if self.closed.load(Ordering::Acquire) {
pub(super) fn try_acquire(&self) -> Option<Live<DB>> {
if self.is_closed.load(Ordering::Acquire) {
return None;
}
Some(self.pool_rx.recv().now_or_never()??.raw)
Some(self.idle.pop().ok()?.live)
}
pub(super) async fn acquire(&self) -> crate::Result<Raw<DB>> {
pub(super) fn release(&self, mut live: Live<DB>) {
// Try waiters in (FIFO) order until one is still waiting ..
while let Ok(waiter) = self.waiters.pop() {
live = match waiter.send(live) {
// successfully released
Ok(()) => return,
Err(live) => {
live
},
};
}
// .. if there were no waiters still waiting, just push the connection
// back to the idle queue
let _ = self.idle.push(Idle {
live,
since: Instant::now(),
});
}
pub(super) async fn acquire(&self) -> crate::Result<Live<DB>> {
let start = Instant::now();
let deadline = start + self.options.connect_timeout;
// Unless the pool has been closed ...
while !self.closed.load(Ordering::Acquire) {
let size = self.size.load(Ordering::Acquire);
while !self.is_closed.load(Ordering::Acquire) {
// Attempt to immediately acquire a connection. This will return Some
// if there is an idle connection in our channel.
let mut idle = if let Some(idle) = self.pool_rx.recv().now_or_never() {
let idle = match idle {
Some(idle) => idle,
if let Some(idle) = self.idle.pop().ok() {
if let Some(live) = check_live(idle.live, &self.options).await {
return Ok(live);
}
}
// This isn't possible. [Pool] owns the sender and [SharedPool]
// owns the receiver.
None => unreachable!(),
};
idle
} else if size >= self.options.max_size {
let size = self.size.load(Ordering::Acquire);
if size >= self.options.max_size {
// Too many open connections
// Wait until one is available
let (tx, rx) = channel();
self.waiters.push(tx);
// get the time between the deadline and now and use that as our timeout
let until = deadline
@ -139,13 +158,12 @@ where
.ok_or(Error::PoolTimedOut(None))?;
// don't sleep forever
let idle = match self.pool_rx.recv().timeout(until).await {
let live = match rx.timeout(until).await {
// A connection was returned to the pool
Ok(Some(idle)) => idle,
Ok(Ok(live)) => live,
// This isn't possible. [Pool] owns the sender and [SharedPool]
// owns the receiver.
Ok(None) => unreachable!(),
// Pool dropped without dropping waiter
Ok(Err(_)) => unreachable!(),
// Timed out waiting for a connection
// Error is not forwarded as its useless context
@ -154,55 +172,27 @@ where
}
};
idle
} else if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size {
// pool has slots available; open a new connection
match self.connect(deadline).await {
Ok(Some(conn)) => return Ok(conn),
// [size] is internally decremented on _retry_ and _error_
Ok(None) => continue,
Err(e) => return Err(e),
// If pool was closed while waiting for a connection,
// release the connection
if self.is_closed.load(Ordering::Acquire) {
live.close().await;
self.size.fetch_sub(1, Ordering::AcqRel);
return Err(Error::PoolClosed);
}
} else {
match check_live(live, &self.options).await {
Some(live) => return Ok(live),
// Need to re-connect
None => {}
}
} else if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) != size {
// size was incremented while we compared it just above
continue;
};
// If pool was closed while waiting for a connection,
// release the connection
if self.closed.load(Ordering::Acquire) {
idle.close().await;
self.size.fetch_sub(1, Ordering::AcqRel);
return Err(Error::PoolClosed);
}
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_lifetime(&idle.raw, &self.options) {
// close the connection but don't really care about the result
let _ = idle.close().await;
} else if self.options.test_on_acquire {
// TODO: Check on acquire should be a configuration setting
// Check that the connection is still live
match idle.raw.inner.ping().await {
// Connection still seems to respond
Ok(_) => return Ok(idle.raw),
// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
Err(e) => log::info!("ping on idle connection returned error: {}", e),
}
// make sure the idle connection is gone explicitly before we open one
// this will close the resources for the stream on our side
drop(idle);
} else {
// No need to re-connect
return Ok(idle.raw);
}
// while there is still room in the pool, acquire a new connection
// pool has slots available; open a new connection
match self.connect(deadline).await {
Ok(Some(conn)) => return Ok(conn),
// [size] is internally decremented on _retry_ and _error_
@ -214,7 +204,7 @@ where
Err(Error::PoolClosed)
}
async fn eventually_connect(&self, deadline: Instant) -> crate::Result<Raw<DB>> {
async fn eventually_connect(&self, deadline: Instant) -> crate::Result<Live<DB>> {
loop {
// [connect] will raise an error when past deadline
// [connect] returns None if its okay to retry
@ -224,7 +214,7 @@ where
}
}
async fn connect(&self, deadline: Instant) -> crate::Result<Option<Raw<DB>>> {
async fn connect(&self, deadline: Instant) -> crate::Result<Option<Live<DB>>> {
// FIXME: Code between `-` is duplicate with [acquire]
// ---------------------------------
@ -235,7 +225,7 @@ where
// If pool was closed while waiting for a connection,
// release the connection
if self.closed.load(Ordering::Acquire) {
if self.is_closed.load(Ordering::Acquire) {
self.size.fetch_sub(1, Ordering::AcqRel); // ?
return Err(Error::PoolClosed);
@ -246,9 +236,9 @@ where
// result here is `Result<Result<DB, Error>, TimeoutError>`
match DB::Connection::open(&self.url).timeout(until).await {
// successfully established connection
Ok(Ok(inner)) => {
Ok(Some(Raw {
inner,
Ok(Ok(raw)) => {
Ok(Some(Live {
raw,
// remember when it was created so we can expire it
// if there is a [max_lifetime] set
created: Instant::now(),
@ -281,17 +271,26 @@ where
DB::Connection: Connection<Database = DB>,
{
async fn close(self) {
let _ = self.raw.inner.close().await;
self.live.close().await;
}
}
impl<DB: Database> Live<DB>
where
DB::Connection: Connection<Database = DB>,
{
async fn close(self) {
let _ = self.raw.close().await;
}
}
// NOTE: Function names here are bizzare. Helpful help would be appreciated.
fn is_beyond_lifetime<DB: Database>(raw: &Raw<DB>, options: &Options) -> bool {
fn is_beyond_lifetime<DB: Database>(live: &Live<DB>, options: &Options) -> bool {
// check if connection was within max lifetime (or not set)
options
.max_lifetime
.map_or(false, |max| raw.created.elapsed() > max)
.map_or(false, |max| live.created.elapsed() > max)
}
fn is_beyond_idle<DB: Database>(idle: &Idle<DB>, options: &Options) -> bool {
@ -301,8 +300,38 @@ fn is_beyond_idle<DB: Database>(idle: &Idle<DB>, options: &Options) -> bool {
.map_or(false, |timeout| idle.since.elapsed() > timeout)
}
async fn check_live<DB: Database>(mut live: Live<DB>, options: &Options) -> Option<Live<DB>> {
// If the connection we pulled has expired, close the connection and
// immediately create a new connection
if is_beyond_lifetime(&live, options) {
// close the connection but don't really care about the result
let _ = live.close().await;
} else if options.test_on_acquire {
// TODO: Check on acquire should be a configuration setting
// Check that the connection is still live
match live.raw.ping().await {
// Connection still seems to respond
Ok(_) => return Some(live),
// an error here means the other end has hung up or we lost connectivity
// either way we're fine to just discard the connection
// the error itself here isn't necessarily unexpected so WARN is too strong
Err(e) => log::info!("ping on idle connection returned error: {}", e),
}
// make sure the idle connection is gone explicitly before we open one
// this will close the resources for the stream on our side
drop(live);
} else {
// No need to re-connect
return Some(live);
}
None
}
/// if `max_lifetime` or `idle_timeout` is set, spawn a task that reaps senescent connections
fn conn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>, pool_tx: &Sender<Idle<DB>>)
fn spawn_reaper<DB: Database>(pool: &Arc<SharedPool<DB>>)
where
DB::Connection: Connection<Database = DB>,
{
@ -314,11 +343,10 @@ where
(None, None) => return,
};
let pool = pool.clone();
let pool_tx = pool_tx.clone();
let pool = Arc::clone(&pool);
task::spawn(async move {
while !pool.closed.load(Ordering::Acquire) {
while !pool.is_closed.load(Ordering::Acquire) {
// reap at most the current size minus the minimum idle
let max_reaped = pool
.size
@ -328,15 +356,15 @@ where
// collect connections to reap
let (reap, keep) = (0..max_reaped)
// only connections waiting in the queue
.filter_map(|_| pool.pool_rx.recv().now_or_never()?)
.filter_map(|_| pool.idle.pop().ok())
.partition::<Vec<_>, _>(|conn| {
is_beyond_idle(conn, &pool.options)
|| is_beyond_lifetime(&conn.raw, &pool.options)
|| is_beyond_lifetime(&conn.live, &pool.options)
});
for conn in keep {
// return these connections to the pool first
pool_tx.send(conn).await;
pool.idle.push(conn).expect("unreachable: pool overflowed");
}
for conn in reap {

View file

@ -7,9 +7,6 @@ use std::{
time::{Duration, Instant},
};
use async_std::sync::Sender;
use futures_util::future::FutureExt;
use crate::Database;
use self::inner::SharedPool;
@ -21,26 +18,22 @@ mod inner;
mod options;
/// A pool of database connections.
pub struct Pool<DB>
pub struct Pool<DB>(Arc<SharedPool<DB>>)
where
DB: Database,
{
inner: Arc<SharedPool<DB>>,
pool_tx: Sender<Idle<DB>>,
}
DB: Database;
struct Connection<DB: Database> {
raw: Option<Raw<DB>>,
pool_tx: Sender<Idle<DB>>,
live: Option<Live<DB>>,
pool: Arc<SharedPool<DB>>,
}
struct Raw<DB: Database> {
inner: DB::Connection,
struct Live<DB: Database> {
raw: DB::Connection,
created: Instant,
}
struct Idle<DB: Database> {
raw: Raw<DB>,
live: Live<DB>,
since: Instant,
}
@ -55,9 +48,9 @@ where
}
async fn with_options(url: &str, options: Options) -> crate::Result<Self> {
let (inner, pool_tx) = SharedPool::new_arc(url, options).await?;
let inner = SharedPool::new_arc(url, options).await?;
Ok(Pool { inner, pool_tx })
Ok(Pool(inner))
}
/// Returns a [Builder] to configure a new connection pool.
@ -69,9 +62,9 @@ where
///
/// Waits for at most the configured connection timeout before returning an error.
pub async fn acquire(&self) -> crate::Result<impl DerefMut<Target = DB::Connection>> {
self.inner.acquire().await.map(|conn| Connection {
raw: Some(conn),
pool_tx: self.pool_tx.clone(),
self.0.acquire().await.map(|conn| Connection {
live: Some(conn),
pool: Arc::clone(&self.0),
})
}
@ -79,9 +72,9 @@ where
///
/// Returns `None` immediately if there are no idle connections available in the pool.
pub fn try_acquire(&self) -> Option<impl DerefMut<Target = DB::Connection>> {
self.inner.try_acquire().map(|conn| Connection {
raw: Some(conn),
pool_tx: self.pool_tx.clone(),
self.0.try_acquire().map(|conn| Connection {
live: Some(conn),
pool: Arc::clone(&self.0),
})
}
@ -90,42 +83,42 @@ where
///
/// Does not resolve until all connections are closed.
pub async fn close(&self) {
self.inner.close().await;
self.0.close().await;
}
/// Returns the number of connections currently being managed by the pool.
pub fn size(&self) -> u32 {
self.inner.size()
self.0.size()
}
/// Returns the number of idle connections.
pub fn idle(&self) -> usize {
self.inner.num_idle()
self.0.num_idle()
}
/// Returns the configured maximum pool size.
pub fn max_size(&self) -> u32 {
self.inner.options().max_size
self.0.options().max_size
}
/// Returns the maximum time spent acquiring a new connection before an error is returned.
pub fn connect_timeout(&self) -> Duration {
self.inner.options().connect_timeout
self.0.options().connect_timeout
}
/// Returns the configured minimum idle connection count.
pub fn min_size(&self) -> u32 {
self.inner.options().min_size
self.0.options().min_size
}
/// Returns the configured maximum connection lifetime.
pub fn max_lifetime(&self) -> Option<Duration> {
self.inner.options().max_lifetime
self.0.options().max_lifetime
}
/// Returns the configured idle connection timeout.
pub fn idle_timeout(&self) -> Option<Duration> {
self.inner.options().idle_timeout
self.0.options().idle_timeout
}
}
@ -135,21 +128,18 @@ where
DB: Database,
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
pool_tx: self.pool_tx.clone(),
}
Self(Arc::clone(&self.0))
}
}
impl<DB: Database> fmt::Debug for Pool<DB> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Pool")
.field("url", &self.inner.url())
.field("size", &self.inner.size())
.field("num_idle", &self.inner.num_idle())
.field("closed", &self.inner.closed())
.field("options", self.inner.options())
.field("url", &self.0.url())
.field("size", &self.0.size())
.field("num_idle", &self.0.num_idle())
.field("is_closed", &self.0.is_closed())
.field("options", self.0.options())
.finish()
}
}
@ -160,26 +150,20 @@ impl<DB: Database> Deref for Connection<DB> {
type Target = DB::Connection;
fn deref(&self) -> &Self::Target {
&self.raw.as_ref().expect(DEREF_ERR).inner
&self.live.as_ref().expect(DEREF_ERR).raw
}
}
impl<DB: Database> DerefMut for Connection<DB> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.raw.as_mut().expect(DEREF_ERR).inner
&mut self.live.as_mut().expect(DEREF_ERR).raw
}
}
impl<DB: Database> Drop for Connection<DB> {
fn drop(&mut self) {
if let Some(conn) = self.raw.take() {
self.pool_tx
.send(Idle {
raw: conn,
since: Instant::now(),
})
.now_or_never()
.expect("(bug) connection released into a full pool")
if let Some(live) = self.live.take() {
self.pool.release(live);
}
}
}