fix(sqlite): reset the statement when fetch_many() stream is dropped

Unlike `Executor.fetch_optional()`, `Executor.fetch_many()` does not
have a single exit.  The stream can be dropped at any time.  To catch
this event, we create a `StatementResetter` structure inside the stream
loop and reset the statement when it is dropped.

A test case `it_resets_prepared_statement_after_fetch_many` is
similar to `it_resets_prepared_statement_after_fetch_one` which tests
`Executor.fetch_optional()`.
This commit is contained in:
Alexander Krotov 2021-04-06 22:49:22 +03:00 committed by Ryan Leckey
parent 5cf1af2d19
commit 78656eb469
3 changed files with 51 additions and 2 deletions

View file

@ -59,6 +59,24 @@ fn bind(
Ok(n)
}
/// A structure holding sqlite statement handle and resetting the
/// statement when it is dropped.
struct StatementResetter {
handle: StatementHandle,
}
impl StatementResetter {
fn new(handle: StatementHandle) -> Self {
Self { handle }
}
}
impl Drop for StatementResetter {
fn drop(&mut self) {
self.handle.reset();
}
}
impl<'c> Executor<'c> for &'c mut SqliteConnection {
type Database = Sqlite;
@ -91,6 +109,12 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
let mut num_arguments = 0;
while let Some((stmt, columns, column_names, last_row_values)) = stmt.prepare(conn)? {
// Prepare to reset raw SQLite statement when the handle
// is dropped. `StatementResetter` will reliably reset the
// statement even if the stream returned from `fetch_many`
// is dropped early.
let _resetter = StatementResetter::new(*stmt);
// bind values to the statement
num_arguments += bind(stmt, &arguments, num_arguments)?;

View file

@ -13,8 +13,9 @@ use libsqlite3_sys::{
sqlite3_column_count, sqlite3_column_database_name, sqlite3_column_decltype,
sqlite3_column_double, sqlite3_column_int, sqlite3_column_int64, sqlite3_column_name,
sqlite3_column_origin_name, sqlite3_column_table_name, sqlite3_column_type,
sqlite3_column_value, sqlite3_db_handle, sqlite3_sql, sqlite3_stmt, sqlite3_stmt_readonly,
sqlite3_table_column_metadata, sqlite3_value, SQLITE_OK, SQLITE_TRANSIENT, SQLITE_UTF8,
sqlite3_column_value, sqlite3_db_handle, sqlite3_reset, sqlite3_sql, sqlite3_stmt,
sqlite3_stmt_readonly, sqlite3_table_column_metadata, sqlite3_value, SQLITE_OK,
SQLITE_TRANSIENT, SQLITE_UTF8,
};
use crate::error::{BoxDynError, Error};
@ -278,4 +279,8 @@ impl StatementHandle {
pub(crate) fn column_text(&self, index: usize) -> Result<&str, BoxDynError> {
Ok(from_utf8(self.column_blob(index))?)
}
pub(crate) fn reset(&self) {
unsafe { sqlite3_reset(self.0.as_ptr()) };
}
}

View file

@ -504,3 +504,23 @@ async fn it_resets_prepared_statement_after_fetch_one() -> anyhow::Result<()> {
Ok(())
}
#[sqlx_macros::test]
async fn it_resets_prepared_statement_after_fetch_many() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
conn.execute("CREATE TEMPORARY TABLE foobar (id INTEGER)")
.await?;
conn.execute("INSERT INTO foobar VALUES (42)").await?;
conn.execute("INSERT INTO foobar VALUES (43)").await?;
let mut rows = sqlx::query("SELECT id FROM foobar").fetch(&mut conn);
let row = rows.try_next().await?.unwrap();
let x: i32 = row.try_get("id")?;
assert_eq!(x, 42);
drop(rows);
conn.execute("DROP TABLE foobar").await?;
Ok(())
}