fix: make begin,commit,rollback cancel-safe in sqlite (#2054) (#2057)

This commit is contained in:
Adam Cigánek 2022-09-13 03:20:27 +02:00 committed by GitHub
parent 09717e1b83
commit f38c739f7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -55,13 +55,13 @@ enum Command {
tx: flume::Sender<Result<Either<SqliteQueryResult, SqliteRow>, Error>>,
},
Begin {
tx: oneshot::Sender<Result<(), Error>>,
tx: rendezvous_oneshot::Sender<Result<(), Error>>,
},
Commit {
tx: oneshot::Sender<Result<(), Error>>,
tx: rendezvous_oneshot::Sender<Result<(), Error>>,
},
Rollback {
tx: Option<oneshot::Sender<Result<(), Error>>>,
tx: Option<rendezvous_oneshot::Sender<Result<(), Error>>>,
},
CreateCollation {
create_collation:
@ -116,6 +116,11 @@ impl ConnectionWorker {
return;
}
// If COMMIT or ROLLBACK is processed but not acknowledged, there would be another
// ROLLBACK sent when the `Transaction` drops. We need to ignore it otherwise we
// would rollback an already completed transaction.
let mut ignore_next_start_rollback = false;
for cmd in command_rx {
match cmd {
Command::Prepare { query, tx } => {
@ -162,8 +167,27 @@ impl ConnectionWorker {
.map(|_| {
conn.transaction_depth += 1;
});
let res_ok = res.is_ok();
tx.send(res).ok();
if tx.blocking_send(res).is_err() && res_ok {
// The BEGIN was processed but not acknowledged. This means no
// `Transaction` was created and so there is no way to commit /
// rollback this transaction. We need to roll it back
// immediately otherwise it would remain started forever.
if let Err(e) = conn
.handle
.exec(rollback_ansi_transaction_sql(depth + 1))
.map(|_| {
conn.transaction_depth -= 1;
})
{
// The rollback failed. To prevent leaving the connection
// in an inconsistent state we shutdown this worker which
// causes any subsequent operation on the connection to fail.
log::error!("failed to rollback cancelled transaction: {}", e);
break;
}
}
}
Command::Commit { tx } => {
let depth = conn.transaction_depth;
@ -177,10 +201,21 @@ impl ConnectionWorker {
} else {
Ok(())
};
let res_ok = res.is_ok();
tx.send(res).ok();
if tx.blocking_send(res).is_err() && res_ok {
// The COMMIT was processed but not acknowledged. This means that
// the `Transaction` doesn't know it was committed and will try to
// rollback on drop. We need to ignore that rollback.
ignore_next_start_rollback = true;
}
}
Command::Rollback { tx } => {
if ignore_next_start_rollback && tx.is_none() {
ignore_next_start_rollback = false;
continue;
}
let depth = conn.transaction_depth;
let res = if depth > 0 {
@ -193,8 +228,16 @@ impl ConnectionWorker {
Ok(())
};
let res_ok = res.is_ok();
if let Some(tx) = tx {
tx.send(res).ok();
if tx.blocking_send(res).is_err() && res_ok {
// The ROLLBACK was processed but not acknowledged. This means
// that the `Transaction` doesn't know it was rolled back and
// will try to rollback again on drop. We need to ignore that
// rollback.
ignore_next_start_rollback = true;
}
}
}
Command::CreateCollation { create_collation } => {
@ -268,15 +311,17 @@ impl ConnectionWorker {
}
pub(crate) async fn begin(&mut self) -> Result<(), Error> {
self.oneshot_cmd(|tx| Command::Begin { tx }).await?
self.oneshot_cmd_with_ack(|tx| Command::Begin { tx })
.await?
}
pub(crate) async fn commit(&mut self) -> Result<(), Error> {
self.oneshot_cmd(|tx| Command::Commit { tx }).await?
self.oneshot_cmd_with_ack(|tx| Command::Commit { tx })
.await?
}
pub(crate) async fn rollback(&mut self) -> Result<(), Error> {
self.oneshot_cmd(|tx| Command::Rollback { tx: Some(tx) })
self.oneshot_cmd_with_ack(|tx| Command::Rollback { tx: Some(tx) })
.await?
}
@ -304,6 +349,20 @@ impl ConnectionWorker {
rx.await.map_err(|_| Error::WorkerCrashed)
}
async fn oneshot_cmd_with_ack<F, T>(&mut self, command: F) -> Result<T, Error>
where
F: FnOnce(rendezvous_oneshot::Sender<T>) -> Command,
{
let (tx, rx) = rendezvous_oneshot::channel();
self.command_tx
.send_async(command(tx))
.await
.map_err(|_| Error::WorkerCrashed)?;
rx.recv().await.map_err(|_| Error::WorkerCrashed)
}
pub fn create_collation(
&mut self,
name: &str,
@ -387,3 +446,41 @@ fn prepare(conn: &mut ConnectionState, query: &str) -> Result<SqliteStatement<'s
fn update_cached_statements_size(conn: &ConnectionState, size: &AtomicUsize) {
size.store(conn.statements.len(), Ordering::Release);
}
// A oneshot channel where send completes only after the receiver receives the value.
mod rendezvous_oneshot {
use super::oneshot::{self, Canceled};
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (inner_tx, inner_rx) = oneshot::channel();
(Sender { inner: inner_tx }, Receiver { inner: inner_rx })
}
pub struct Sender<T> {
inner: oneshot::Sender<(T, oneshot::Sender<()>)>,
}
impl<T> Sender<T> {
pub async fn send(self, value: T) -> Result<(), Canceled> {
let (ack_tx, ack_rx) = oneshot::channel();
self.inner.send((value, ack_tx)).map_err(|_| Canceled)?;
ack_rx.await
}
pub fn blocking_send(self, value: T) -> Result<(), Canceled> {
futures_executor::block_on(self.send(value))
}
}
pub struct Receiver<T> {
inner: oneshot::Receiver<(T, oneshot::Sender<()>)>,
}
impl<T> Receiver<T> {
pub async fn recv(self) -> Result<T, Canceled> {
let (value, ack_tx) = self.inner.await?;
ack_tx.send(()).map_err(|_| Canceled)?;
Ok(value)
}
}
}