Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions sable_history/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::convert::Infallible;
use std::sync::Arc;

use anyhow::Context;
use anyhow::{Context, Result};
use diesel::migration::MigrationSource;
use diesel::prelude::*;
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
Expand All @@ -18,6 +18,9 @@ use sable_server::ServerType;
mod sync;
mod update_handler;

/// Advisory lock key for serializing database migrations across concurrent processes.
const MIGRATION_LOCK_KEY: i64 = 0x5361626c48697374; // value is "SablHist"

pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -59,28 +62,43 @@ impl ServerType for HistoryServer {
// run_pending_migrations only support sync connections
let mut conn = AsyncConnectionWrapper::<AsyncPgConnection>::establish(&database)
.context("Couldn't connect to database")?;
tracing::info!("Running database migrations");
tracing::trace!(
"Required migrations: {}",
MIGRATIONS
.migrations()
.map_err(|e| anyhow::anyhow!("Couldn't get migrations: {e}"))?
.iter()
.map(diesel::migration::Migration::<diesel::pg::Pg>::name)
.join(", ")
);
let migrations = conn
.run_pending_migrations(MIGRATIONS)
.map_err(|e| anyhow::anyhow!("Database migrations failed: {e}"))?;
if migrations.is_empty() {
tracing::info!("No database migrations to run");
} else {
tracing::info!(
"Applied database migrations: {}",
migrations.iter().map(ToString::to_string).join(", ")
)
}
Ok(())

// Prevent multiple migrations from running at the same time, or processes starting
// while migrations are still running.
diesel::sql_query(format!("SELECT pg_advisory_lock({MIGRATION_LOCK_KEY})"))
.execute(&mut conn)
.context("Couldn't acquire migration advisory lock")?;

let res = (|| -> Result<_> {
tracing::info!("Running database migrations");
tracing::trace!(
"Required migrations: {}",
MIGRATIONS
.migrations()
.map_err(|e| anyhow::anyhow!("Couldn't get migrations: {e}"))?
.iter()
.map(diesel::migration::Migration::<diesel::pg::Pg>::name)
.join(", ")
);
let migrations = conn
.run_pending_migrations(MIGRATIONS)
.map_err(|e| anyhow::anyhow!("Database migrations failed: {e}"))?;
if migrations.is_empty() {
tracing::info!("No database migrations to run");
} else {
tracing::info!(
"Applied database migrations: {}",
migrations.iter().map(ToString::to_string).join(", ")
)
}
Ok(())
})();

diesel::sql_query(format!("SELECT pg_advisory_unlock({MIGRATION_LOCK_KEY})"))
.execute(&mut conn)
.context("Couldn't release migration advisory lock")?;

res
})
.await
.context("Couldn't join migration task")??;
Expand Down
Loading