sqlx/tests/sqlite/sqlite.rs
Austin Bonander ca518b7185
feat: add raw_sql API (#3007)
This is meant to be much easier to discover than the current approach of directly invoking `Executor` methods.

In addition, I'm improving documentation for the `query*()` functions across the board.
2024-02-18 15:38:23 -08:00

796 lines
21 KiB
Rust

use futures::TryStreamExt;
use rand::{Rng, SeedableRng};
use rand_xoshiro::Xoshiro256PlusPlus;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{
query, sqlite::Sqlite, sqlite::SqliteRow, Column, ConnectOptions, Connection, Executor, Row,
SqliteConnection, SqlitePool, Statement, TypeInfo,
};
use sqlx_test::new;
use std::sync::Arc;
#[sqlx_macros::test]
async fn it_connects() -> anyhow::Result<()> {
Ok(new::<Sqlite>().await?.ping().await?)
}
#[sqlx_macros::test]
async fn it_fetches_and_inflates_row() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
// process rows, one-at-a-time
// this reuses the memory of the row
{
let expected = [15, 39, 51];
let mut i = 0;
let mut s = conn.fetch("SELECT 15 UNION SELECT 51 UNION SELECT 39");
while let Some(row) = s.try_next().await? {
let v1 = row.get::<i32, _>(0);
assert_eq!(expected[i], v1);
i += 1;
}
}
// same query, but fetch all rows at once
// this triggers the internal inflation
let rows = conn
.fetch_all("SELECT 15 UNION SELECT 51 UNION SELECT 39")
.await?;
assert_eq!(rows.len(), 3);
assert_eq!(rows[0].get::<i32, _>(0), 15);
assert_eq!(rows[1].get::<i32, _>(0), 39);
assert_eq!(rows[2].get::<i32, _>(0), 51);
// same query but fetch the first row a few times from a non-persistent query
// these rows should be immediately inflated
let row1 = conn
.fetch_one("SELECT 15 UNION SELECT 51 UNION SELECT 39")
.await?;
assert_eq!(row1.get::<i32, _>(0), 15);
let row2 = conn
.fetch_one("SELECT 15 UNION SELECT 51 UNION SELECT 39")
.await?;
assert_eq!(row1.get::<i32, _>(0), 15);
assert_eq!(row2.get::<i32, _>(0), 15);
// same query (again) but make it persistent
// and fetch the first row a few times
let row1 = conn
.fetch_one(query("SELECT 15 UNION SELECT 51 UNION SELECT 39"))
.await?;
assert_eq!(row1.get::<i32, _>(0), 15);
let row2 = conn
.fetch_one(query("SELECT 15 UNION SELECT 51 UNION SELECT 39"))
.await?;
assert_eq!(row1.get::<i32, _>(0), 15);
assert_eq!(row2.get::<i32, _>(0), 15);
Ok(())
}
#[sqlx_macros::test]
async fn it_maths() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let value = sqlx::query("select 1 + ?1")
.bind(5_i32)
.try_map(|row: SqliteRow| row.try_get::<i32, _>(0))
.fetch_one(&mut conn)
.await?;
assert_eq!(6i32, value);
Ok(())
}
#[sqlx_macros::test]
async fn test_bind_multiple_statements_multiple_values() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let values: Vec<i32> = sqlx::query_scalar::<_, i32>("select ?; select ?")
.bind(5_i32)
.bind(15_i32)
.fetch_all(&mut conn)
.await?;
assert_eq!(values.len(), 2);
assert_eq!(values[0], 5);
assert_eq!(values[1], 15);
Ok(())
}
#[sqlx_macros::test]
async fn test_bind_multiple_statements_same_value() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let values: Vec<i32> = sqlx::query_scalar::<_, i32>("select ?1; select ?1")
.bind(25_i32)
.fetch_all(&mut conn)
.await?;
assert_eq!(values.len(), 2);
assert_eq!(values[0], 25);
assert_eq!(values[1], 25);
Ok(())
}
#[sqlx_macros::test]
async fn it_can_describe_with_pragma() -> anyhow::Result<()> {
use sqlx::{Decode, TypeInfo, ValueRef};
let mut conn = new::<Sqlite>().await?;
let defaults = sqlx::query("pragma table_info (tweet)")
.try_map(|row: SqliteRow| {
let val = row.try_get_raw("dflt_value")?;
let ty = val.type_info().clone().into_owned();
let val: Option<i32> = Decode::<Sqlite>::decode(val).map_err(sqlx::Error::Decode)?;
if val.is_some() {
assert_eq!(ty.name(), "TEXT");
}
Ok(val)
})
.fetch_all(&mut conn)
.await?;
assert_eq!(defaults[0], None);
assert_eq!(defaults[2], Some(0));
Ok(())
}
#[sqlx_macros::test]
async fn it_binds_positional_parameters_issue_467() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let row: (i32, i32, i32, i32) = sqlx::query_as("select ?1, ?1, ?3, ?2")
.bind(5_i32)
.bind(500_i32)
.bind(1020_i32)
.fetch_one(&mut conn)
.await?;
assert_eq!(row.0, 5);
assert_eq!(row.1, 5);
assert_eq!(row.2, 1020);
assert_eq!(row.3, 500);
Ok(())
}
#[sqlx_macros::test]
async fn it_fetches_in_loop() -> anyhow::Result<()> {
// this is trying to check for any data races
// there were a few that triggered *sometimes* while building out StatementWorker
for _ in 0..1000_usize {
let mut conn = new::<Sqlite>().await?;
let v: Vec<(i32,)> = sqlx::query_as("SELECT 1").fetch_all(&mut conn).await?;
assert_eq!(v[0].0, 1);
}
Ok(())
}
#[sqlx_macros::test]
async fn it_executes_with_pool() -> anyhow::Result<()> {
let pool: SqlitePool = SqlitePoolOptions::new()
.min_connections(2)
.max_connections(2)
.test_before_acquire(false)
.connect(&dotenvy::var("DATABASE_URL")?)
.await?;
let rows = pool.fetch_all("SELECT 1; SElECT 2").await?;
assert_eq!(rows.len(), 2);
Ok(())
}
#[cfg(sqlite_ipaddr)]
#[sqlx_macros::test]
async fn it_opens_with_extension() -> anyhow::Result<()> {
use std::str::FromStr;
let opts = SqliteConnectOptions::from_str(&dotenvy::var("DATABASE_URL")?)?.extension("ipaddr");
let mut conn = SqliteConnection::connect_with(&opts).await?;
conn.execute("SELECT ipmasklen('192.168.16.12/24');")
.await?;
conn.close().await?;
Ok(())
}
#[sqlx_macros::test]
async fn it_opens_in_memory() -> anyhow::Result<()> {
// If the filename is ":memory:", then a private, temporary in-memory database
// is created for the connection.
let conn = SqliteConnection::connect(":memory:").await?;
conn.close().await?;
Ok(())
}
#[sqlx_macros::test]
async fn it_opens_temp_on_disk() -> anyhow::Result<()> {
// If the filename is an empty string, then a private, temporary on-disk database will
// be created.
let conn = SqliteConnection::connect("").await?;
conn.close().await?;
Ok(())
}
#[sqlx_macros::test]
async fn it_fails_to_parse() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let res = sqlx::raw_sql("SEELCT 1").execute(&mut conn).await;
assert!(res.is_err());
let err = res.unwrap_err().to_string();
assert_eq!(
"error returned from database: (code: 1) near \"SEELCT\": syntax error",
err
);
Ok(())
}
#[sqlx_macros::test]
async fn it_handles_empty_queries() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let done = conn.execute("").await?;
assert_eq!(done.rows_affected(), 0);
Ok(())
}
#[sqlx_macros::test]
fn it_binds_parameters() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let v: i32 = sqlx::query_scalar("SELECT ?")
.bind(10_i32)
.fetch_one(&mut conn)
.await?;
assert_eq!(v, 10);
let v: (i32, i32) = sqlx::query_as("SELECT ?1, ?")
.bind(10_i32)
.fetch_one(&mut conn)
.await?;
assert_eq!(v.0, 10);
assert_eq!(v.1, 10);
Ok(())
}
#[sqlx_macros::test]
fn it_binds_dollar_parameters() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let v: (i32, i32) = sqlx::query_as("SELECT $1, $2")
.bind(10_i32)
.bind(11_i32)
.fetch_one(&mut conn)
.await?;
assert_eq!(v.0, 10);
assert_eq!(v.1, 11);
Ok(())
}
#[sqlx_macros::test]
async fn it_executes_queries() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let _ = conn
.execute(
r#"
CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY)
"#,
)
.await?;
for index in 1..=10_i32 {
let done = sqlx::query("INSERT INTO users (id) VALUES (?)")
.bind(index * 2)
.execute(&mut conn)
.await?;
assert_eq!(done.rows_affected(), 1);
}
let sum: i32 = sqlx::query_as("SELECT id FROM users")
.fetch(&mut conn)
.try_fold(0_i32, |acc, (x,): (i32,)| async move { Ok(acc + x) })
.await?;
assert_eq!(sum, 110);
Ok(())
}
#[sqlx_macros::test]
async fn it_can_execute_multiple_statements() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let done = conn
.execute(
r#"
CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY, other INTEGER);
INSERT INTO users DEFAULT VALUES;
"#,
)
.await?;
assert_eq!(done.rows_affected(), 1);
for index in 2..5_i32 {
let (id, other): (i32, i32) = sqlx::query_as(
r#"
INSERT INTO users (other) VALUES (?);
SELECT id, other FROM users WHERE id = last_insert_rowid();
"#,
)
.bind(index)
.fetch_one(&mut conn)
.await?;
assert_eq!(id, index);
assert_eq!(other, index);
}
Ok(())
}
#[sqlx_macros::test]
async fn it_interleaves_reads_and_writes() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let mut cursor = conn.fetch(
"
CREATE TABLE IF NOT EXISTS _sqlx_test (
id INT PRIMARY KEY,
text TEXT NOT NULL
);
SELECT 'Hello World' as _1;
INSERT INTO _sqlx_test (text) VALUES ('this is a test');
SELECT id, text FROM _sqlx_test;
",
);
let row = cursor.try_next().await?.unwrap();
assert!("Hello World" == row.try_get::<&str, _>("_1")?);
let row = cursor.try_next().await?.unwrap();
let id: i64 = row.try_get("id")?;
let text: &str = row.try_get("text")?;
assert_eq!(0, id);
assert_eq!("this is a test", text);
Ok(())
}
#[sqlx_macros::test]
async fn it_supports_collations() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
// also tests `.lock_handle()`
conn.lock_handle()
.await?
.create_collation("test_collation", |l, r| l.cmp(r).reverse())?;
let _ = conn
.execute(
r#"
CREATE TEMPORARY TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL COLLATE test_collation)
"#,
)
.await?;
sqlx::query("INSERT INTO users (name) VALUES (?)")
.bind("a")
.execute(&mut conn)
.await?;
sqlx::query("INSERT INTO users (name) VALUES (?)")
.bind("b")
.execute(&mut conn)
.await?;
let row: SqliteRow = conn
.fetch_one("SELECT name FROM users ORDER BY name ASC")
.await?;
let name: &str = row.try_get(0)?;
assert_eq!(name, "b");
Ok(())
}
#[sqlx_macros::test]
async fn it_caches_statements() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
// Initial PRAGMAs are not cached as we are not going to execute
// them more than once.
assert_eq!(0, conn.cached_statements_size());
// `&str` queries are not persistent.
let row = conn.fetch_one("SELECT 100 AS val").await?;
let val: i32 = row.get("val");
assert_eq!(val, 100);
assert_eq!(0, conn.cached_statements_size());
// `Query` is persistent by default.
let mut conn = new::<Sqlite>().await?;
for i in 0..2 {
let row = sqlx::query("SELECT ? AS val")
.bind(i)
.fetch_one(&mut conn)
.await?;
let val: i32 = row.get("val");
assert_eq!(i, val);
}
assert_eq!(1, conn.cached_statements_size());
// Cache can be cleared.
conn.clear_cached_statements().await?;
assert_eq!(0, conn.cached_statements_size());
// `Query` is not persistent if `.persistent(false)` is used
// explicitly.
let mut conn = new::<Sqlite>().await?;
for i in 0..2 {
let row = sqlx::query("SELECT ? AS val")
.bind(i)
.persistent(false)
.fetch_one(&mut conn)
.await?;
let val: i32 = row.get("val");
assert_eq!(i, val);
}
assert_eq!(0, conn.cached_statements_size());
Ok(())
}
#[sqlx_macros::test]
async fn it_can_prepare_then_execute() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
let mut tx = conn.begin().await?;
let _ = sqlx::query("INSERT INTO tweet ( id, text ) VALUES ( 2, 'Hello, World' )")
.execute(&mut *tx)
.await?;
let tweet_id: i32 = 2;
let statement = tx.prepare("SELECT * FROM tweet WHERE id = ?1").await?;
assert_eq!(statement.column(0).name(), "id");
assert_eq!(statement.column(1).name(), "text");
assert_eq!(statement.column(2).name(), "is_sent");
assert_eq!(statement.column(3).name(), "owner_id");
assert_eq!(statement.column(0).type_info().name(), "INTEGER");
assert_eq!(statement.column(1).type_info().name(), "TEXT");
assert_eq!(statement.column(2).type_info().name(), "BOOLEAN");
assert_eq!(statement.column(3).type_info().name(), "INTEGER");
let row = statement.query().bind(tweet_id).fetch_one(&mut *tx).await?;
let tweet_text: &str = row.try_get("text")?;
assert_eq!(tweet_text, "Hello, World");
Ok(())
}
#[sqlx_macros::test]
async fn it_resets_prepared_statement_after_fetch_one() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
conn.execute("CREATE TEMPORARY TABLE foobar (id INTEGER)")
.await?;
conn.execute("INSERT INTO foobar VALUES (42)").await?;
let r = sqlx::query("SELECT id FROM foobar")
.fetch_one(&mut conn)
.await?;
let x: i32 = r.try_get("id")?;
assert_eq!(x, 42);
conn.execute("DROP TABLE foobar").await?;
Ok(())
}
#[sqlx_macros::test]
async fn it_resets_prepared_statement_after_fetch_many() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
conn.execute("CREATE TEMPORARY TABLE foobar (id INTEGER)")
.await?;
conn.execute("INSERT INTO foobar VALUES (42)").await?;
conn.execute("INSERT INTO foobar VALUES (43)").await?;
let mut rows = sqlx::query("SELECT id FROM foobar").fetch(&mut conn);
let row = rows.try_next().await?.unwrap();
let x: i32 = row.try_get("id")?;
assert_eq!(x, 42);
drop(rows);
conn.execute("DROP TABLE foobar").await?;
Ok(())
}
// https://github.com/launchbadge/sqlx/issues/1300
#[sqlx_macros::test]
async fn concurrent_resets_dont_segfault() {
use sqlx::{sqlite::SqliteConnectOptions, ConnectOptions};
use std::{str::FromStr, time::Duration};
let mut conn = SqliteConnectOptions::from_str(":memory:")
.unwrap()
.connect()
.await
.unwrap();
sqlx::query("CREATE TABLE stuff (name INTEGER, value INTEGER)")
.execute(&mut conn)
.await
.unwrap();
sqlx_core::rt::spawn(async move {
for i in 0..1000 {
sqlx::query("INSERT INTO stuff (name, value) VALUES (?, ?)")
.bind(i)
.bind(0)
.execute(&mut conn)
.await
.unwrap();
}
});
sqlx_core::rt::sleep(Duration::from_millis(1)).await;
}
// https://github.com/launchbadge/sqlx/issues/1419
// note: this passes before and after the fix; you need to run it with `--nocapture`
// to see the panic from the worker thread, which doesn't happen after the fix
#[sqlx_macros::test]
async fn row_dropped_after_connection_doesnt_panic() {
let mut conn = SqliteConnection::connect(":memory:").await.unwrap();
let books = sqlx::query("SELECT 'hello' AS title")
.fetch_all(&mut conn)
.await
.unwrap();
for book in &books {
// force the row to be inflated
let _title: String = book.get("title");
}
// hold `books` past the lifetime of `conn`
drop(conn);
sqlx_core::rt::sleep(std::time::Duration::from_secs(1)).await;
drop(books);
}
// note: to repro issue #1467 this should be run in release mode
// May spuriously fail with UNIQUE constraint failures (which aren't relevant to the original issue)
// which I have tried to reproduce using the same seed as printed from CI but to no avail.
// It may be due to some nondeterminism in SQLite itself for all I know.
#[sqlx_macros::test]
#[ignore]
async fn issue_1467() -> anyhow::Result<()> {
let mut conn = SqliteConnectOptions::new()
.filename(":memory:")
.connect()
.await?;
sqlx::query(
r#"
CREATE TABLE kv (k PRIMARY KEY, v);
CREATE INDEX idx_kv ON kv (v);
"#,
)
.execute(&mut conn)
.await?;
// Random seed:
let seed: [u8; 32] = rand::random();
println!("RNG seed: {}", hex::encode(&seed));
// Pre-determined seed:
// let mut seed: [u8; 32] = [0u8; 32];
// hex::decode_to_slice(
// "135234871d03fc0479e22f2f06395b6074761bac5fe7dcf205dbe01eef9f7794",
// &mut seed,
// )?;
// reproducible RNG for testing
let mut rng = Xoshiro256PlusPlus::from_seed(seed);
for i in 0..1_000_000 {
if i % 1_000 == 0 {
println!("{i}");
}
let key = rng.gen_range(0..1_000);
let value = rng.gen_range(0..1_000);
let mut tx = conn.begin().await?;
let exists = sqlx::query("SELECT 1 FROM kv WHERE k = ?")
.bind(key)
.fetch_optional(&mut *tx)
.await?;
if exists.is_some() {
sqlx::query("UPDATE kv SET v = ? WHERE k = ?")
.bind(value)
.bind(key)
.execute(&mut *tx)
.await?;
} else {
sqlx::query("INSERT INTO kv(k, v) VALUES (?, ?)")
.bind(key)
.bind(value)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
}
Ok(())
}
#[sqlx_macros::test]
async fn concurrent_read_and_write() {
let pool: SqlitePool = SqlitePoolOptions::new()
.min_connections(2)
.connect(":memory:")
.await
.unwrap();
sqlx::query("CREATE TABLE kv (k PRIMARY KEY, v)")
.execute(&pool)
.await
.unwrap();
let n = 100;
let read = sqlx_core::rt::spawn({
let mut conn = pool.acquire().await.unwrap();
async move {
for i in 0u32..n {
sqlx::query("SELECT v FROM kv")
.bind(i)
.fetch_all(&mut *conn)
.await
.unwrap();
}
}
});
let write = sqlx_core::rt::spawn({
let mut conn = pool.acquire().await.unwrap();
async move {
for i in 0u32..n {
sqlx::query("INSERT INTO kv (k, v) VALUES (?, ?)")
.bind(i)
.bind(i * i)
.execute(&mut *conn)
.await
.unwrap();
}
}
});
read.await;
write.await;
}
#[sqlx_macros::test]
async fn test_query_with_progress_handler() -> anyhow::Result<()> {
let mut conn = new::<Sqlite>().await?;
// Using this string as a canary to ensure the callback doesn't get called with the wrong data pointer.
let state = format!("test");
conn.lock_handle().await?.set_progress_handler(1, move || {
assert_eq!(state, "test");
false
});
match sqlx::query("SELECT 'hello' AS title")
.fetch_all(&mut conn)
.await
{
Err(sqlx::Error::Database(err)) => assert_eq!(err.message(), String::from("interrupted")),
_ => panic!("expected an interrupt"),
}
Ok(())
}
#[sqlx_macros::test]
async fn test_multiple_set_progress_handler_calls_drop_old_handler() -> anyhow::Result<()> {
let ref_counted_object = Arc::new(0);
assert_eq!(1, Arc::strong_count(&ref_counted_object));
{
let mut conn = new::<Sqlite>().await?;
let o = ref_counted_object.clone();
conn.lock_handle().await?.set_progress_handler(1, move || {
println!("{o:?}");
false
});
assert_eq!(2, Arc::strong_count(&ref_counted_object));
let o = ref_counted_object.clone();
conn.lock_handle().await?.set_progress_handler(1, move || {
println!("{o:?}");
false
});
assert_eq!(2, Arc::strong_count(&ref_counted_object));
let o = ref_counted_object.clone();
conn.lock_handle().await?.set_progress_handler(1, move || {
println!("{o:?}");
false
});
assert_eq!(2, Arc::strong_count(&ref_counted_object));
match sqlx::query("SELECT 'hello' AS title")
.fetch_all(&mut conn)
.await
{
Err(sqlx::Error::Database(err)) => {
assert_eq!(err.message(), String::from("interrupted"))
}
_ => panic!("expected an interrupt"),
}
conn.lock_handle().await?.remove_progress_handler();
}
assert_eq!(1, Arc::strong_count(&ref_counted_object));
Ok(())
}