mirror of
https://github.com/launchbadge/sqlx
synced 2024-11-10 14:34:19 +00:00
reversible migrations for cli
- adds a -r flag whihc will create a reversible migration - add revert subcommand, which reverts the last migration - add --dry-run flag to migration run command, which list the migrations that will be applied - updates add migration to check if all migration are of same type, i.e cannot mix and match reversible and simple migrations
This commit is contained in:
parent
e798409e20
commit
0921df44c1
14 changed files with 331 additions and 21 deletions
|
@ -40,5 +40,5 @@ pub async fn reset(migration_source: &str, uri: &str, confirm: bool) -> anyhow::
|
|||
|
||||
pub async fn setup(migration_source: &str, uri: &str) -> anyhow::Result<()> {
|
||||
create(uri).await?;
|
||||
migrate::run(migration_source, uri).await
|
||||
migrate::run(migration_source, uri, false).await
|
||||
}
|
||||
|
|
|
@ -31,8 +31,16 @@ hint: This command only works in the manifest directory of a Cargo package."#
|
|||
|
||||
match opt.command {
|
||||
Command::Migrate(migrate) => match migrate.command {
|
||||
MigrateCommand::Add { description } => migrate::add(&migrate.source, &description)?,
|
||||
MigrateCommand::Run => migrate::run(&migrate.source, &database_url).await?,
|
||||
MigrateCommand::Add {
|
||||
description,
|
||||
reversible,
|
||||
} => migrate::add(&migrate.source, &description, reversible).await?,
|
||||
MigrateCommand::Run { dry_run } => {
|
||||
migrate::run(&migrate.source, &database_url, dry_run).await?
|
||||
}
|
||||
MigrateCommand::Revert { dry_run } => {
|
||||
migrate::revert(&migrate.source, &database_url, dry_run).await?
|
||||
}
|
||||
MigrateCommand::Info => migrate::info(&migrate.source, &database_url).await?,
|
||||
},
|
||||
|
||||
|
|
|
@ -1,22 +1,25 @@
|
|||
use anyhow::{bail, Context};
|
||||
use chrono::Utc;
|
||||
use console::style;
|
||||
use sqlx::migrate::{Migrate, MigrateError, Migrator};
|
||||
use sqlx::migrate::{Migrate, MigrateError, MigrationType, Migrator};
|
||||
use sqlx::{AnyConnection, Connection};
|
||||
use std::fs::{self, File};
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
pub fn add(migration_source: &str, description: &str) -> anyhow::Result<()> {
|
||||
use chrono::prelude::*;
|
||||
fn create_file(
|
||||
migration_source: &str,
|
||||
file_prefix: &str,
|
||||
description: &str,
|
||||
migration_type: MigrationType,
|
||||
) -> anyhow::Result<()> {
|
||||
use std::path::PathBuf;
|
||||
|
||||
fs::create_dir_all(migration_source).context("Unable to create migrations directory")?;
|
||||
|
||||
let dt = Utc::now();
|
||||
let mut file_name = dt.format("%Y%m%d%H%M%S").to_string();
|
||||
let mut file_name = file_prefix.to_string();
|
||||
file_name.push_str("_");
|
||||
file_name.push_str(&description.replace(' ', "_"));
|
||||
file_name.push_str(".sql");
|
||||
file_name.push_str(migration_type.suffix());
|
||||
|
||||
let mut path = PathBuf::new();
|
||||
path.push(migration_source);
|
||||
|
@ -26,7 +29,49 @@ pub fn add(migration_source: &str, description: &str) -> anyhow::Result<()> {
|
|||
|
||||
let mut file = File::create(&path).context("Failed to create migration file")?;
|
||||
|
||||
file.write_all(b"-- Add migration script here\n")?;
|
||||
file.write_all(migration_type.file_content().as_bytes())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add(
|
||||
migration_source: &str,
|
||||
description: &str,
|
||||
reversible: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
fs::create_dir_all(migration_source).context("Unable to create migrations directory")?;
|
||||
|
||||
let migrator = Migrator::new(Path::new(migration_source)).await?;
|
||||
// This checks if all existing migrations are of the same type as the reverisble flag passed
|
||||
for migration in migrator.iter() {
|
||||
if migration.migration_type.is_reversible() != reversible {
|
||||
bail!(MigrateError::InvalidMixReversibleAndSimple);
|
||||
}
|
||||
}
|
||||
|
||||
let dt = Utc::now();
|
||||
let file_prefix = dt.format("%Y%m%d%H%M%S").to_string();
|
||||
if reversible {
|
||||
create_file(
|
||||
migration_source,
|
||||
&file_prefix,
|
||||
description,
|
||||
MigrationType::ReversibleUp,
|
||||
)?;
|
||||
create_file(
|
||||
migration_source,
|
||||
&file_prefix,
|
||||
description,
|
||||
MigrationType::ReversibleDown,
|
||||
)?;
|
||||
} else {
|
||||
create_file(
|
||||
migration_source,
|
||||
&file_prefix,
|
||||
description,
|
||||
MigrationType::Simple,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -55,7 +100,7 @@ pub async fn info(migration_source: &str, uri: &str) -> anyhow::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run(migration_source: &str, uri: &str) -> anyhow::Result<()> {
|
||||
pub async fn run(migration_source: &str, uri: &str, dry_run: bool) -> anyhow::Result<()> {
|
||||
let migrator = Migrator::new(Path::new(migration_source)).await?;
|
||||
let mut conn = AnyConnection::connect(uri).await?;
|
||||
|
||||
|
@ -68,13 +113,23 @@ pub async fn run(migration_source: &str, uri: &str) -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
for migration in migrator.iter() {
|
||||
if migration.migration_type.is_down_migration() {
|
||||
// Skipping down migrations
|
||||
continue;
|
||||
}
|
||||
if migration.version > version {
|
||||
let elapsed = conn.apply(migration).await?;
|
||||
let elapsed = if dry_run {
|
||||
Duration::new(0, 0)
|
||||
} else {
|
||||
conn.apply(migration).await?
|
||||
};
|
||||
let text = if dry_run { "Can apply" } else { "Applied" };
|
||||
|
||||
println!(
|
||||
"{}/{} {} {}",
|
||||
"{} {}/{} {} {}",
|
||||
text,
|
||||
style(migration.version).cyan(),
|
||||
style("migrate").green(),
|
||||
style(migration.migration_type.label()).green(),
|
||||
migration.description,
|
||||
style(format!("({:?})", elapsed)).dim()
|
||||
);
|
||||
|
@ -85,3 +140,54 @@ pub async fn run(migration_source: &str, uri: &str) -> anyhow::Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn revert(migration_source: &str, uri: &str, dry_run: bool) -> anyhow::Result<()> {
|
||||
let migrator = Migrator::new(Path::new(migration_source)).await?;
|
||||
let mut conn = AnyConnection::connect(uri).await?;
|
||||
|
||||
conn.ensure_migrations_table().await?;
|
||||
|
||||
let (version, dirty) = conn.version().await?.unwrap_or((0, false));
|
||||
|
||||
if dirty {
|
||||
bail!(MigrateError::Dirty(version));
|
||||
}
|
||||
|
||||
let mut is_applied = false;
|
||||
for migration in migrator.iter().rev() {
|
||||
if !migration.migration_type.is_down_migration() {
|
||||
// Skipping non down migration
|
||||
// This will skip any standard or up migration file
|
||||
continue;
|
||||
}
|
||||
if migration.version > version {
|
||||
// Skipping unapplied migrations
|
||||
continue;
|
||||
}
|
||||
|
||||
let elapsed = if dry_run {
|
||||
Duration::new(0, 0)
|
||||
} else {
|
||||
conn.revert(migration).await?
|
||||
};
|
||||
let text = if dry_run { "Can apply" } else { "Applied" };
|
||||
|
||||
println!(
|
||||
"{} {}/{} {} {}",
|
||||
text,
|
||||
style(migration.version).cyan(),
|
||||
style(migration.migration_type.label()).green(),
|
||||
migration.description,
|
||||
style(format!("({:?})", elapsed)).dim()
|
||||
);
|
||||
|
||||
is_applied = true;
|
||||
// Only a single migration will be reverted at a time, so we break
|
||||
break;
|
||||
}
|
||||
if !is_applied {
|
||||
println!("No migrations available to revert");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -93,10 +93,28 @@ pub struct MigrateOpt {
|
|||
pub enum MigrateCommand {
|
||||
/// Create a new migration with the given description,
|
||||
/// and the current time as the version.
|
||||
Add { description: String },
|
||||
Add {
|
||||
description: String,
|
||||
|
||||
/// If true, creates a pair of up and down migration files with same version
|
||||
/// else creates a single sql file
|
||||
#[clap(short)]
|
||||
reversible: bool,
|
||||
},
|
||||
|
||||
/// Run all pending migrations.
|
||||
Run,
|
||||
Run {
|
||||
/// List all the migrations to be run without applying
|
||||
#[clap(long)]
|
||||
dry_run: bool,
|
||||
},
|
||||
|
||||
/// Revert the latest migration with a down file.
|
||||
Revert {
|
||||
/// List the migration to be reverted without applying
|
||||
#[clap(long)]
|
||||
dry_run: bool,
|
||||
},
|
||||
|
||||
/// List all available migrations.
|
||||
Info,
|
||||
|
|
|
@ -171,4 +171,23 @@ impl Migrate for AnyConnection {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn revert<'e: 'm, 'm>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
match &mut self.0 {
|
||||
#[cfg(feature = "postgres")]
|
||||
AnyConnectionKind::Postgres(conn) => conn.revert(migration),
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
AnyConnectionKind::Sqlite(conn) => conn.revert(migration),
|
||||
|
||||
#[cfg(feature = "mysql")]
|
||||
AnyConnectionKind::MySql(conn) => conn.revert(migration),
|
||||
|
||||
#[cfg(feature = "mssql")]
|
||||
AnyConnectionKind::Mssql(conn) => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,9 @@ pub enum MigrateError {
|
|||
#[error("migration {0} was previously applied but has been modified")]
|
||||
VersionMismatch(i64),
|
||||
|
||||
#[error("cannot mix reversible migrations with simple migrations. All migrations should be reversible or simple migrations")]
|
||||
InvalidMixReversibleAndSimple,
|
||||
|
||||
// NOTE: this will only happen with a database that does not have transactional DDL (.e.g, MySQL or Oracle)
|
||||
#[error(
|
||||
"migration {0} is partially applied; fix and remove row from `_sqlx_migrations` table"
|
||||
|
|
|
@ -50,4 +50,12 @@ pub trait Migrate {
|
|||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>>;
|
||||
|
||||
// run a revert SQL from migration in a DDL transaction
|
||||
// deletes the row in [_migrations] table with specified migration version on completion (success or failure)
|
||||
// returns the time taking to run the migration SQL
|
||||
fn revert<'e: 'm, 'm>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>>;
|
||||
}
|
||||
|
|
|
@ -2,21 +2,30 @@ use std::borrow::Cow;
|
|||
|
||||
use sha2::{Digest, Sha384};
|
||||
|
||||
use super::MigrationType;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Migration {
|
||||
pub version: i64,
|
||||
pub description: Cow<'static, str>,
|
||||
pub migration_type: MigrationType,
|
||||
pub sql: Cow<'static, str>,
|
||||
pub checksum: Cow<'static, [u8]>,
|
||||
}
|
||||
|
||||
impl Migration {
|
||||
pub fn new(version: i64, description: Cow<'static, str>, sql: Cow<'static, str>) -> Self {
|
||||
pub fn new(
|
||||
version: i64,
|
||||
description: Cow<'static, str>,
|
||||
migration_type: MigrationType,
|
||||
sql: Cow<'static, str>,
|
||||
) -> Self {
|
||||
let checksum = Cow::Owned(Vec::from(Sha384::digest(sql.as_bytes()).as_slice()));
|
||||
|
||||
Migration {
|
||||
version,
|
||||
description,
|
||||
migration_type,
|
||||
sql,
|
||||
checksum,
|
||||
}
|
||||
|
|
66
sqlx-core/src/migrate/migration_type.rs
Normal file
66
sqlx-core/src/migrate/migration_type.rs
Normal file
|
@ -0,0 +1,66 @@
|
|||
/// Migration Type represents the type of migration
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum MigrationType {
|
||||
/// Simple migration are single file migrations with no up / down queries
|
||||
Simple,
|
||||
|
||||
/// ReversibleUp migrations represents the add or update part of a reversible migrations
|
||||
/// It is expected the every migration of this type will have a corresponding down file
|
||||
ReversibleUp,
|
||||
|
||||
/// ReversibleDown migrations represents the delete or downgrade part of a reversible migrations
|
||||
/// It is expected the every migration of this type will have a corresponding up file
|
||||
ReversibleDown,
|
||||
}
|
||||
|
||||
impl MigrationType {
|
||||
pub fn from_filename(filename: &str) -> Self {
|
||||
if filename.ends_with(MigrationType::ReversibleUp.suffix()) {
|
||||
MigrationType::ReversibleUp
|
||||
} else if filename.ends_with(MigrationType::ReversibleDown.suffix()) {
|
||||
MigrationType::ReversibleDown
|
||||
} else {
|
||||
MigrationType::Simple
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_reversible(&self) -> bool {
|
||||
match self {
|
||||
MigrationType::Simple => false,
|
||||
MigrationType::ReversibleUp => true,
|
||||
MigrationType::ReversibleDown => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_down_migration(&self) -> bool {
|
||||
match self {
|
||||
MigrationType::Simple => false,
|
||||
MigrationType::ReversibleUp => false,
|
||||
MigrationType::ReversibleDown => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn label(&self) -> &'static str {
|
||||
match self {
|
||||
MigrationType::Simple => "migrate",
|
||||
MigrationType::ReversibleUp => "migrate",
|
||||
MigrationType::ReversibleDown => "revert",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn suffix(&self) -> &'static str {
|
||||
match self {
|
||||
MigrationType::Simple => ".sql",
|
||||
MigrationType::ReversibleUp => ".up.sql",
|
||||
MigrationType::ReversibleDown => ".down.sql",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn file_content(&self) -> &'static str {
|
||||
match self {
|
||||
MigrationType::Simple => "-- Add migration script here\n",
|
||||
MigrationType::ReversibleUp => "-- Add up migration script here\n",
|
||||
MigrationType::ReversibleDown => "-- Add down migration script here\n",
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,11 +1,13 @@
|
|||
mod error;
|
||||
mod migrate;
|
||||
mod migration;
|
||||
mod migration_type;
|
||||
mod migrator;
|
||||
mod source;
|
||||
|
||||
pub use error::MigrateError;
|
||||
pub use migrate::{Migrate, MigrateDatabase};
|
||||
pub use migration::Migration;
|
||||
pub use migration_type::MigrationType;
|
||||
pub use migrator::Migrator;
|
||||
pub use source::MigrationSource;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::error::BoxDynError;
|
||||
use crate::migrate::Migration;
|
||||
use crate::migrate::{Migration, MigrationType};
|
||||
use futures_core::future::BoxFuture;
|
||||
use futures_util::TryStreamExt;
|
||||
use sqlx_rt::fs;
|
||||
|
@ -35,9 +35,10 @@ impl<'s> MigrationSource<'s> for &'s Path {
|
|||
|
||||
let version: i64 = parts[0].parse()?;
|
||||
|
||||
let migration_type = MigrationType::from_filename(parts[1]);
|
||||
// remove the `.sql` and replace `_` with ` `
|
||||
let description = parts[1]
|
||||
.trim_end_matches(".sql")
|
||||
.trim_end_matches(migration_type.suffix())
|
||||
.replace('_', " ")
|
||||
.to_owned();
|
||||
|
||||
|
@ -46,6 +47,7 @@ impl<'s> MigrationSource<'s> for &'s Path {
|
|||
migrations.push(Migration::new(
|
||||
version,
|
||||
Cow::Owned(description),
|
||||
migration_type,
|
||||
Cow::Owned(sql),
|
||||
));
|
||||
}
|
||||
|
|
|
@ -201,6 +201,27 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
|||
Ok(elapsed)
|
||||
})
|
||||
}
|
||||
|
||||
fn revert<'e: 'm, 'm>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
let start = Instant::now();
|
||||
|
||||
self.execute(&*migration.sql).await?;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
// language=SQL
|
||||
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#)
|
||||
.bind(migration.version)
|
||||
.execute(self)
|
||||
.await?;
|
||||
|
||||
Ok(elapsed)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn current_database(conn: &mut MySqlConnection) -> Result<String, MigrateError> {
|
||||
|
|
|
@ -211,6 +211,30 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
|||
Ok(elapsed)
|
||||
})
|
||||
}
|
||||
|
||||
fn revert<'e: 'm, 'm>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
let mut tx = self.begin().await?;
|
||||
let start = Instant::now();
|
||||
|
||||
let _ = tx.execute(&*migration.sql).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
// language=SQL
|
||||
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
|
||||
.bind(migration.version)
|
||||
.execute(self)
|
||||
.await?;
|
||||
|
||||
Ok(elapsed)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn current_database(conn: &mut PgConnection) -> Result<String, MigrateError> {
|
||||
|
|
|
@ -150,4 +150,28 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
|
|||
Ok(elapsed)
|
||||
})
|
||||
}
|
||||
|
||||
fn revert<'e: 'm, 'm>(
|
||||
&'e mut self,
|
||||
migration: &'m Migration,
|
||||
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
|
||||
Box::pin(async move {
|
||||
let mut tx = self.begin().await?;
|
||||
let start = Instant::now();
|
||||
|
||||
let _ = tx.execute(&*migration.sql).await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
// language=SQL
|
||||
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#)
|
||||
.bind(migration.version)
|
||||
.execute(self)
|
||||
.await?;
|
||||
|
||||
Ok(elapsed)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue