mirror of
https://github.com/launchbadge/sqlx
synced 2024-09-21 06:41:56 +00:00
implement connection timeouts
This commit is contained in:
parent
ab32d0a5c4
commit
812c0bee61
2 changed files with 84 additions and 12 deletions
|
@ -4,6 +4,8 @@ use std::{
|
|||
io,
|
||||
};
|
||||
|
||||
use async_std::future::TimeoutError;
|
||||
|
||||
/// A convenient Result instantiation appropriate for SQLx.
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
|
@ -35,6 +37,10 @@ pub enum Error {
|
|||
/// Context is provided by the included error message.
|
||||
Protocol(Box<str>),
|
||||
|
||||
/// A `Pool::acquire()` timed out due to connections not becoming available or
|
||||
/// because another task encountered too many errors while trying to open a new connection.
|
||||
TimedOut,
|
||||
|
||||
// TODO: Remove and replace with `#[non_exhaustive]` when possible
|
||||
#[doc(hidden)]
|
||||
__Nonexhaustive,
|
||||
|
@ -65,6 +71,8 @@ impl Display for Error {
|
|||
|
||||
Error::Protocol(ref err) => f.write_str(err),
|
||||
|
||||
Error::TimedOut => f.write_str("timed out while waiting for an open connection"),
|
||||
|
||||
Error::__Nonexhaustive => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
@ -77,6 +85,12 @@ impl From<io::Error> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<TimeoutError> for Error {
|
||||
fn from(_: TimeoutError) -> Self {
|
||||
Error::TimedOut
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ProtocolError<'_>> for Error {
|
||||
#[inline]
|
||||
fn from(err: ProtocolError) -> Self {
|
||||
|
|
|
@ -8,7 +8,8 @@ use crate::{
|
|||
};
|
||||
use futures_channel::oneshot;
|
||||
use futures_core::{future::BoxFuture, stream::BoxStream};
|
||||
use futures_util::{future::FutureExt, stream::StreamExt};
|
||||
use futures_util::{future::{FutureExt, TryFutureExt}, stream::StreamExt};
|
||||
use futures_util::future::{AbortHandle, AbortRegistration};
|
||||
use std::{
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
|
@ -20,6 +21,7 @@ use std::{
|
|||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use async_std::future::timeout;
|
||||
use async_std::sync::{channel, Receiver, Sender};
|
||||
use async_std::task;
|
||||
|
||||
|
@ -84,6 +86,11 @@ where
|
|||
self.0.options.max_size
|
||||
}
|
||||
|
||||
/// Returns the maximum time spent acquiring a new connection before an error is returned.
|
||||
pub fn connect_timeout(&self) -> Duration {
|
||||
self.0.options.connect_timeout
|
||||
}
|
||||
|
||||
/// Returns the configured mimimum idle connection count.
|
||||
pub fn min_idle(&self) -> Option<u32> {
|
||||
self.0.options.min_idle
|
||||
|
@ -135,6 +142,11 @@ where
|
|||
self
|
||||
}
|
||||
|
||||
pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self {
|
||||
self.options.connect_timeout = connect_timeout;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn min_idle(mut self, min_idle: impl Into<Option<u32>>) -> Self {
|
||||
self.options.min_idle = min_idle.into();
|
||||
self
|
||||
|
@ -157,6 +169,7 @@ where
|
|||
|
||||
struct Options {
|
||||
max_size: u32,
|
||||
connect_timeout: Duration,
|
||||
min_idle: Option<u32>,
|
||||
max_lifetime: Option<Duration>,
|
||||
idle_timeout: Option<Duration>,
|
||||
|
@ -167,6 +180,7 @@ impl Default for Options {
|
|||
Self {
|
||||
max_size: 10,
|
||||
min_idle: None,
|
||||
connect_timeout: Duration::from_secs(30),
|
||||
max_lifetime: None,
|
||||
idle_timeout: None,
|
||||
}
|
||||
|
@ -208,6 +222,9 @@ where
|
|||
}
|
||||
|
||||
async fn acquire(&self) -> crate::Result<Live<DB>> {
|
||||
let start = Instant::now();
|
||||
let deadline = start + self.options.connect_timeout;
|
||||
|
||||
if let Some(live) = self.try_acquire() {
|
||||
return Ok(live);
|
||||
}
|
||||
|
@ -219,23 +236,64 @@ where
|
|||
// Too many open connections
|
||||
// Wait until one is available
|
||||
|
||||
// Waiters are not dropped unless the pool is dropped
|
||||
// which would drop this future
|
||||
return Ok(self
|
||||
.pool_rx
|
||||
.recv()
|
||||
.await
|
||||
.expect("waiter dropped without dropping pool")
|
||||
.live(&self.pool_tx));
|
||||
// get the time between the deadline and now and use that as our timeout
|
||||
let max_wait = deadline.checked_duration_since(Instant::now())
|
||||
.ok_or(Error::TimedOut)?;
|
||||
|
||||
// don't sleep forever
|
||||
let idle = match timeout(max_wait, self.pool_rx.recv()).await {
|
||||
Ok(Some(idle)) => idle,
|
||||
Ok(None) => panic!("this isn't possible, we own a `pool_tx`"),
|
||||
// try our acquire logic again
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
// check if idle connection was within max lifetime (or not set)
|
||||
if self.options.max_lifetime.map_or(true, |max| idle.raw.created.elapsed() < max)
|
||||
// and if connection wasn't idle too long (or not set)
|
||||
&& self.options.idle_timeout.map_or(true, |timeout| idle.since.elapsed() < timeout)
|
||||
{
|
||||
match idle.raw.inner.ping().await {
|
||||
Ok(_) => return Ok(idle.revive(&self.pool_tx)),
|
||||
// an error here means the other end has hung up or we lost connectivity
|
||||
// either way we're fine to just discard the connection
|
||||
// the error itself here isn't necessarily unexpected so WARN is too strong
|
||||
Err(e) => log::info!("ping on idle connection returned error: {}", e),
|
||||
}
|
||||
} else {
|
||||
// close the connection but don't really care about the result
|
||||
let _ = idle.raw.inner.close().await;
|
||||
}
|
||||
|
||||
// either case, make sure the idle connection is gone explicitly before we open one
|
||||
drop(idle);
|
||||
|
||||
// while we're still at max size, acquire a new connection
|
||||
return self.new_conn(deadline).await
|
||||
}
|
||||
|
||||
if self.size.compare_and_swap(size, size + 1, Ordering::AcqRel) == size {
|
||||
// Open a new connection and return directly
|
||||
let raw = DB::open(&self.url).await?;
|
||||
return Ok(Live::pooled(raw, &self.pool_tx));
|
||||
return self.new_conn(deadline).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn new_conn(&self, deadline: Instant) -> crate::Result<Live<DB>> {
|
||||
while Instant::now() < deadline {
|
||||
// result here is `Result<Result<DB, Error>, TimeoutError>`
|
||||
match timeout(deadline - Instant::now(), DB::open(&self.url)).await {
|
||||
Ok(Ok(raw)) => return Ok(Live::pooled(raw, &self.pool_tx)),
|
||||
// error while connecting, this should definitely be logged
|
||||
Ok(Err(e)) => log::warn!("error establishing a connection: {}", e),
|
||||
// timed out
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
self.size.fetch_sub(1, Ordering::AcqRel);
|
||||
Err(Error::TimedOut)
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> Executor for Pool<DB>
|
||||
|
@ -368,7 +426,7 @@ where
|
|||
}
|
||||
|
||||
impl<DB: Backend> Idle<DB> {
|
||||
fn live(self, pool_tx: &Sender<Idle<DB>>) -> Live<DB> {
|
||||
fn revive(self, pool_tx: &Sender<Idle<DB>>) -> Live<DB> {
|
||||
Live {
|
||||
raw: Some(self.raw),
|
||||
pool_tx: Some(pool_tx.clone()),
|
||||
|
|
Loading…
Reference in a new issue