feat: no tx migration (#3181)

* test: add a failing test

* feat: add no_tx to migration struct

* feat: execute migration with no tx block

* fix: expected string literal compilation error

* test: update no tx to content comment

* refactor: use the sql comment instead of file name semantics

* docs: remove no_tx from file format comment

* fix: remove filename matches

* fix: messed up merge

* refactor: dedupe migration

* fix: move comment to where it makes sense

* fix: linter error
This commit is contained in:
Jaime 2024-04-19 17:42:44 -05:00 committed by GitHub
parent 25efb2f7f4
commit 40aef6da2c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 77 additions and 28 deletions

View file

@ -11,6 +11,7 @@ pub struct Migration {
pub migration_type: MigrationType,
pub sql: Cow<'static, str>,
pub checksum: Cow<'static, [u8]>,
pub no_tx: bool,
}
impl Migration {
@ -19,6 +20,7 @@ impl Migration {
description: Cow<'static, str>,
migration_type: MigrationType,
sql: Cow<'static, str>,
no_tx: bool,
) -> Self {
let checksum = Cow::Owned(Vec::from(Sha384::digest(sql.as_bytes()).as_slice()));
@ -28,6 +30,7 @@ impl Migration {
migration_type,
sql,
checksum,
no_tx,
}
}
}

View file

@ -21,6 +21,8 @@ pub struct Migrator {
pub ignore_missing: bool,
#[doc(hidden)]
pub locking: bool,
#[doc(hidden)]
pub no_tx: bool,
}
fn validate_applied_migrations(
@ -47,6 +49,7 @@ impl Migrator {
pub const DEFAULT: Migrator = Migrator {
migrations: Cow::Borrowed(&[]),
ignore_missing: false,
no_tx: false,
locking: true,
};

View file

@ -97,7 +97,7 @@ pub fn resolve_blocking(path: PathBuf) -> Result<Vec<(Migration, PathBuf)>, Reso
let parts = file_name.splitn(2, '_').collect::<Vec<_>>();
if parts.len() != 2 || !parts[1].ends_with(".sql") {
// not of the format: <VERSION>_<DESCRIPTION>.sql; ignore
// not of the format: <VERSION>_<DESCRIPTION>.<REVERSIBLE_DIRECTION>.sql; ignore
continue;
}
@ -108,6 +108,7 @@ pub fn resolve_blocking(path: PathBuf) -> Result<Vec<(Migration, PathBuf)>, Reso
})?;
let migration_type = MigrationType::from_filename(parts[1]);
// remove the `.sql` and replace `_` with ` `
let description = parts[1]
.trim_end_matches(migration_type.suffix())
@ -122,12 +123,16 @@ pub fn resolve_blocking(path: PathBuf) -> Result<Vec<(Migration, PathBuf)>, Reso
source: Some(e),
})?;
// opt-out of migration transaction
let no_tx = sql.starts_with("-- no-transaction");
migrations.push((
Migration::new(
version,
Cow::Owned(description),
migration_type,
Cow::Owned(sql),
no_tx,
),
entry_path,
));

View file

@ -36,6 +36,7 @@ impl ToTokens for QuoteMigration {
description,
migration_type,
checksum,
no_tx,
..
} = &self.migration;
@ -69,6 +70,7 @@ impl ToTokens for QuoteMigration {
description: ::std::borrow::Cow::Borrowed(#description),
migration_type: #migration_type,
sql: ::std::borrow::Cow::Borrowed(#sql),
no_tx: #no_tx,
checksum: ::std::borrow::Cow::Borrowed(&[
#(#checksum),*
]),

View file

@ -208,38 +208,25 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
migration: &'m Migration,
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
Box::pin(async move {
let mut tx = self.begin().await?;
let start = Instant::now();
// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
// The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
// data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
// and update it once the actual transaction completed.
let _ = tx
.execute(&*migration.sql)
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
// language=SQL
let _ = query(
r#"
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
VALUES ( $1, $2, TRUE, $3, -1 )
"#,
)
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(&mut *tx)
.await?;
tx.commit().await?;
// execute migration queries
if migration.no_tx {
execute_migration(self, migration).await?;
} else {
// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
// The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
// data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
// and update it once the actual transaction completed.
let mut tx = self.begin().await?;
execute_migration(&mut tx, migration).await?;
tx.commit().await?;
}
// Update `elapsed_time`.
// NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
// this small risk since this value is not super important.
let elapsed = start.elapsed();
// language=SQL
@ -286,6 +273,31 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
}
}
async fn execute_migration(
conn: &mut PgConnection,
migration: &Migration,
) -> Result<(), MigrateError> {
let _ = conn
.execute(&*migration.sql)
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
// language=SQL
let _ = query(
r#"
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
VALUES ( $1, $2, TRUE, $3, -1 )
"#,
)
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(conn)
.await?;
Ok(())
}
async fn current_database(conn: &mut PgConnection) -> Result<String, MigrateError> {
// language=SQL
Ok(query_scalar("SELECT current_database()")

View file

@ -133,6 +133,7 @@ use sqlx::{PgPool, Row};
# migrations: Cow::Borrowed(&[]),
# ignore_missing: false,
# locking: true,
# no_tx: false
# };
# }

View file

@ -66,8 +66,28 @@ async fn reversible(mut conn: PoolConnection<Postgres>) -> anyhow::Result<()> {
Ok(())
}
#[sqlx::test(migrations = false)]
async fn no_tx(mut conn: PoolConnection<Postgres>) -> anyhow::Result<()> {
clean_up(&mut conn).await?;
let migrator = Migrator::new(Path::new("tests/postgres/migrations_no_tx")).await?;
// run migration
migrator.run(&mut conn).await?;
// check outcome
let res: String = conn
.fetch_one("SELECT datname FROM pg_database WHERE datname = 'test_db'")
.await?
.get(0);
assert_eq!(res, "test_db");
Ok(())
}
/// Ensure that we have a clean initial state.
async fn clean_up(conn: &mut PgConnection) -> anyhow::Result<()> {
conn.execute("DROP DATABASE IF EXISTS test_db").await.ok();
conn.execute("DROP TABLE migrations_simple_test").await.ok();
conn.execute("DROP TABLE migrations_reversible_test")
.await

View file

@ -0,0 +1,3 @@
-- no-transaction
CREATE DATABASE test_db;

View file

@ -127,7 +127,7 @@ async fn it_gets_posts_mixed_fixtures_path(pool: PgPool) -> sqlx::Result<()> {
// This should apply migrations and then `../fixtures/postgres/users.sql` and `../fixtures/postgres/posts.sql`
#[sqlx::test(
migrations = "tests/postgres/migrations",
fixtures(path = "../fixtures/postgres", scripts("users.sql", "posts"))
fixtures("../fixtures/postgres/users.sql", "../fixtures/postgres/posts.sql")
)]
async fn it_gets_posts_custom_relative_fixtures_path(pool: PgPool) -> sqlx::Result<()> {
let post_contents: Vec<String> =