sqlite: fix inconsistent read-after-write (#3354)

* sqlite: fix inconsistent read-after-write

fetch_one/fetch_optional

* try pushing fetch_optional early-return into worker

* run cargo fmt

* fix "it_can_execute_multiple_statements" test failure

* use Option<usize> instead of bespoke enum for rows returned
This commit is contained in:
Clark Kampfe 2024-08-01 15:27:01 -05:00 committed by GitHub
parent 572e2a4ed5
commit ff0252d4c2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 36 additions and 8 deletions

View file

@ -83,7 +83,7 @@ impl AnyConnectionBackend for SqliteConnection {
Box::pin( Box::pin(
self.worker self.worker
.execute(query, args, self.row_channel_size, persistent) .execute(query, args, self.row_channel_size, persistent, None)
.map_ok(flume::Receiver::into_stream) .map_ok(flume::Receiver::into_stream)
.try_flatten_stream() .try_flatten_stream()
.map( .map(
@ -107,7 +107,7 @@ impl AnyConnectionBackend for SqliteConnection {
Box::pin(async move { Box::pin(async move {
let stream = self let stream = self
.worker .worker
.execute(query, args, self.row_channel_size, persistent) .execute(query, args, self.row_channel_size, persistent, Some(1))
.map_ok(flume::Receiver::into_stream) .map_ok(flume::Receiver::into_stream)
.await?; .await?;
futures_util::pin_mut!(stream); futures_util::pin_mut!(stream);

View file

@ -32,7 +32,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
Box::pin( Box::pin(
self.worker self.worker
.execute(sql, arguments, self.row_channel_size, persistent) .execute(sql, arguments, self.row_channel_size, persistent, None)
.map_ok(flume::Receiver::into_stream) .map_ok(flume::Receiver::into_stream)
.try_flatten_stream(), .try_flatten_stream(),
) )
@ -58,7 +58,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
Box::pin(async move { Box::pin(async move {
let stream = self let stream = self
.worker .worker
.execute(sql, arguments, self.row_channel_size, persistent) .execute(sql, arguments, self.row_channel_size, persistent, Some(1))
.map_ok(flume::Receiver::into_stream) .map_ok(flume::Receiver::into_stream)
.try_flatten_stream(); .try_flatten_stream();

View file

@ -52,6 +52,7 @@ enum Command {
arguments: Option<SqliteArguments<'static>>, arguments: Option<SqliteArguments<'static>>,
persistent: bool, persistent: bool,
tx: flume::Sender<Result<Either<SqliteQueryResult, SqliteRow>, Error>>, tx: flume::Sender<Result<Either<SqliteQueryResult, SqliteRow>, Error>>,
limit: Option<usize>,
}, },
Begin { Begin {
tx: rendezvous_oneshot::Sender<Result<(), Error>>, tx: rendezvous_oneshot::Sender<Result<(), Error>>,
@ -136,6 +137,7 @@ impl ConnectionWorker {
arguments, arguments,
persistent, persistent,
tx, tx,
limit
} => { } => {
let iter = match execute::iter(&mut conn, &query, arguments, persistent) let iter = match execute::iter(&mut conn, &query, arguments, persistent)
{ {
@ -146,11 +148,35 @@ impl ConnectionWorker {
} }
}; };
match limit {
None => {
for res in iter { for res in iter {
if tx.send(res).is_err() { if tx.send(res).is_err() {
break; break;
} }
} }
},
Some(limit) => {
let mut iter = iter;
let mut rows_returned = 0;
while let Some(res) = iter.next() {
if let Ok(ok) = &res {
if ok.is_right() {
rows_returned += 1;
if rows_returned >= limit {
drop(iter);
let _ = tx.send(res);
break;
}
}
}
if tx.send(res).is_err() {
break;
}
}
},
}
update_cached_statements_size(&conn, &shared.cached_statements_size); update_cached_statements_size(&conn, &shared.cached_statements_size);
} }
@ -284,6 +310,7 @@ impl ConnectionWorker {
args: Option<SqliteArguments<'_>>, args: Option<SqliteArguments<'_>>,
chan_size: usize, chan_size: usize,
persistent: bool, persistent: bool,
limit: Option<usize>,
) -> Result<flume::Receiver<Result<Either<SqliteQueryResult, SqliteRow>, Error>>, Error> { ) -> Result<flume::Receiver<Result<Either<SqliteQueryResult, SqliteRow>, Error>>, Error> {
let (tx, rx) = flume::bounded(chan_size); let (tx, rx) = flume::bounded(chan_size);
@ -294,6 +321,7 @@ impl ConnectionWorker {
arguments: args.map(SqliteArguments::into_static), arguments: args.map(SqliteArguments::into_static),
persistent, persistent,
tx, tx,
limit,
}, },
Span::current(), Span::current(),
)) ))