fix fetch_optional for sqlite

This commit is contained in:
Marcin Kaźmierczak 2020-11-24 23:21:18 +01:00 committed by Ryan Leckey
parent 603c1950fe
commit e798409e20
No known key found for this signature in database
GPG key ID: F8AA68C235AB08C9

View file

@ -12,7 +12,6 @@ use crate::sqlite::{
use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::TryStreamExt;
use libsqlite3_sys::sqlite3_last_insert_rowid;
use std::borrow::Cow;
use std::sync::Arc;
@ -144,21 +143,60 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
fn fetch_optional<'e, 'q: 'e, E: 'q>(
self,
query: E,
mut query: E,
) -> BoxFuture<'e, Result<Option<SqliteRow>, Error>>
where
'c: 'e,
E: Execute<'q, Self::Database>,
{
let mut s = self.fetch_many(query);
let sql = query.sql();
let mut logger = QueryLogger::new(sql, self.log_settings.clone());
let arguments = query.take_arguments();
let persistent = query.persistent() && arguments.is_some();
Box::pin(async move {
while let Some(v) = s.try_next().await? {
if let Either::Right(r) = v {
return Ok(Some(r));
let SqliteConnection {
handle: ref mut conn,
ref mut statements,
ref mut statement,
ref mut worker,
..
} = self;
// prepare statement object (or checkout from cache)
let virtual_stmt = prepare(statements, statement, sql, persistent)?;
// keep track of how many arguments we have bound
let mut num_arguments = 0;
while let Some((stmt, columns, column_names, last_row_values)) =
virtual_stmt.prepare(conn)?
{
// bind values to the statement
num_arguments += bind(stmt, &arguments, num_arguments)?;
// save the rows from the _current_ position on the statement
// and send them to the still-live row object
SqliteRow::inflate_if_needed(stmt, &*columns, last_row_values.take());
// invoke [sqlite3_step] on the dedicated worker thread
// this will move us forward one row or finish the statement
match worker.step(*stmt).await? {
Either::Left(_) => (),
Either::Right(()) => {
let (row, weak_values_ref) =
SqliteRow::current(*stmt, columns, column_names);
*last_row_values = Some(weak_values_ref);
logger.increment_rows();
virtual_stmt.reset();
return Ok(Some(row));
}
}
}
Ok(None)
})
}