feat: add should_flush and only spawn on drop for PoolConnection if we need to flush

This commit is contained in:
Ryan Leckey 2020-06-21 03:49:23 -07:00
parent d76002e110
commit 4448c0e629
7 changed files with 53 additions and 13 deletions

View file

@ -64,10 +64,12 @@ pub trait Connection: Send {
})
}
/// Flush any pending commands to the database.
#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>;
#[doc(hidden)]
fn should_flush(&self) -> bool;
#[doc(hidden)]
fn get_ref(&self) -> &<Self::Database as Database>::Connection;

View file

@ -42,6 +42,11 @@ impl Connection for MssqlConnection {
self.stream.wait_until_ready().boxed()
}
#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.wbuf.is_empty()
}
#[doc(hidden)]
fn get_ref(&self) -> &MssqlConnection {
self

View file

@ -75,6 +75,11 @@ impl Connection for MySqlConnection {
self.stream.wait_until_ready().boxed()
}
#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.wbuf.is_empty()
}
#[doc(hidden)]
fn get_ref(&self) -> &Self {
self

View file

@ -78,6 +78,11 @@ impl<DB: Database> Connection for PoolConnection<DB> {
self.get_mut().flush()
}
#[doc(hidden)]
fn should_flush(&self) -> bool {
self.get_ref().should_flush()
}
#[doc(hidden)]
fn get_ref(&self) -> &DB::Connection {
self.deref().get_ref()
@ -94,19 +99,26 @@ impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(mut live) = self.live.take() {
let pool = self.pool.clone();
spawn(async move {
// flush the connection (will immediately return if not needed) before
// we fully release to the pool
if let Err(e) = live.raw.flush().await {
log::error!("error occurred while flushing the connection: {}", e);
// we now consider the connection to be broken
// close the connection and drop from the pool
let _ = live.float(&pool).into_idle().close().await;
} else {
pool.release(live.float(&pool));
}
});
if live.raw.should_flush() {
spawn(async move {
// flush the connection (will immediately return if not needed) before
// we fully release to the pool
if let Err(e) = live.raw.flush().await {
log::error!("error occurred while flushing the connection: {}", e);
// we now consider the connection to be broken
// close the connection and drop from the pool
let _ = live.float(&pool).into_idle().close().await;
} else {
// after we have flushed successfully, release to the pool
pool.release(live.float(&pool));
}
});
} else {
// nothing to flush, release immediately outside of a spawn
pool.release(live.float(&pool));
}
}
}
}

View file

@ -124,6 +124,11 @@ impl Connection for PgConnection {
self.wait_until_ready().boxed()
}
#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.wbuf.is_empty()
}
#[doc(hidden)]
fn get_ref(&self) -> &Self {
self

View file

@ -52,11 +52,17 @@ impl Connection for SqliteConnection {
Box::pin(future::ok(()))
}
#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
// For SQLite, FLUSH does effectively nothing
Box::pin(future::ok(()))
}
#[doc(hidden)]
fn should_flush(&self) -> bool {
false
}
#[doc(hidden)]
fn get_ref(&self) -> &Self {
self

View file

@ -127,6 +127,11 @@ where
self.get_mut().flush()
}
#[doc(hidden)]
fn should_flush(&self) -> bool {
self.get_ref().should_flush()
}
#[doc(hidden)]
fn get_ref(&self) -> &<Self::Database as Database>::Connection {
self.connection.get_ref()