diff --git a/sqlx-core/src/sqlite/connection/executor.rs b/sqlx-core/src/sqlite/connection/executor.rs index fb37ede2..c2420221 100644 --- a/sqlx-core/src/sqlite/connection/executor.rs +++ b/sqlx-core/src/sqlite/connection/executor.rs @@ -12,7 +12,6 @@ use crate::sqlite::{ use either::Either; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; -use futures_util::TryStreamExt; use libsqlite3_sys::sqlite3_last_insert_rowid; use std::borrow::Cow; use std::sync::Arc; @@ -144,21 +143,60 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { fn fetch_optional<'e, 'q: 'e, E: 'q>( self, - query: E, + mut query: E, ) -> BoxFuture<'e, Result, Error>> where 'c: 'e, E: Execute<'q, Self::Database>, { - let mut s = self.fetch_many(query); + let sql = query.sql(); + let mut logger = QueryLogger::new(sql, self.log_settings.clone()); + let arguments = query.take_arguments(); + let persistent = query.persistent() && arguments.is_some(); Box::pin(async move { - while let Some(v) = s.try_next().await? { - if let Either::Right(r) = v { - return Ok(Some(r)); + let SqliteConnection { + handle: ref mut conn, + ref mut statements, + ref mut statement, + ref mut worker, + .. + } = self; + + // prepare statement object (or checkout from cache) + let virtual_stmt = prepare(statements, statement, sql, persistent)?; + + // keep track of how many arguments we have bound + let mut num_arguments = 0; + + while let Some((stmt, columns, column_names, last_row_values)) = + virtual_stmt.prepare(conn)? + { + // bind values to the statement + num_arguments += bind(stmt, &arguments, num_arguments)?; + + // save the rows from the _current_ position on the statement + // and send them to the still-live row object + SqliteRow::inflate_if_needed(stmt, &*columns, last_row_values.take()); + + // invoke [sqlite3_step] on the dedicated worker thread + // this will move us forward one row or finish the statement + match worker.step(*stmt).await? { + Either::Left(_) => (), + + Either::Right(()) => { + let (row, weak_values_ref) = + SqliteRow::current(*stmt, columns, column_names); + + *last_row_values = Some(weak_values_ref); + + logger.increment_rows(); + + virtual_stmt.reset(); + return Ok(Some(row)); + } } } - Ok(None) }) }