Skip to content

Commit 144c60f

Browse files
committed
fix incremental DDL dry run: save/restore tx snapshot between statements
The incremental dry run created a fresh durable transaction for each statement but processed ops against an accumulated CatalogState from previous dry runs. For the 2nd+ statement, the tx (reflecting durable storage) and the state (reflecting accumulated changes) were out of sync, causing 'retraction does not match existing value' panics when applying diffs. Fix: after each dry run, export the transaction's current state as a Snapshot (Transaction::current_snapshot). On the next dry run, initialize the transaction from this saved snapshot via DurableCatalogState::transaction_from_snapshot, keeping tx and state in sync. This preserves the O(N)-per-statement optimization.
1 parent 17a25c0 commit 144c60f

File tree

8 files changed

+120
-57
lines changed

8 files changed

+120
-57
lines changed

src/adapter/src/catalog/transact.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use mz_audit_log::{
2727
};
2828
use mz_catalog::SYSTEM_CONN_ID;
2929
use mz_catalog::builtin::BuiltinLog;
30-
use mz_catalog::durable::{NetworkPolicy, Transaction};
30+
use mz_catalog::durable::{NetworkPolicy, Snapshot, Transaction};
3131
use mz_catalog::expr_cache::LocalExpressions;
3232
use mz_catalog::memory::error::{AmbiguousRename, Error, ErrorKind};
3333
use mz_catalog::memory::objects::{
@@ -504,17 +504,26 @@ impl Catalog {
504504

505505
/// Performs an incremental dry-run catalog transaction: processes only the
506506
/// NEW ops against an accumulated `CatalogState` from previous dry runs.
507-
/// This avoids the O(N^2) replay cost of `catalog_transact_with_ddl_transaction`.
507+
/// This avoids the O(N^2) replay cost of replaying all accumulated ops.
508508
///
509-
/// Returns the new accumulated state and the OID allocator position.
509+
/// The durable transaction is intentionally never committed and no storage
510+
/// controller prepare-state side effects are run.
511+
///
512+
/// If `prev_snapshot` is `Some`, the transaction is initialized from that
513+
/// snapshot (which represents the tx state after the previous dry run),
514+
/// ensuring it starts in sync with `base_state`. If `None` (first
515+
/// statement), a fresh transaction is loaded from durable storage.
516+
///
517+
/// Returns the new accumulated state and a snapshot of the transaction's
518+
/// state for use in subsequent incremental dry runs.
510519
pub async fn transact_incremental_dry_run(
511520
&self,
512521
base_state: &CatalogState,
513522
ops: Vec<Op>,
514523
session: Option<&ConnMeta>,
515-
prev_next_oid: Option<u64>,
524+
prev_snapshot: Option<Snapshot>,
516525
oracle_write_ts: mz_repr::Timestamp,
517-
) -> Result<(CatalogState, u64), AdapterError> {
526+
) -> Result<(CatalogState, Snapshot), AdapterError> {
518527
// For DDL transactions, items are not temporary (CREATE TABLE FROM SOURCE, etc.)
519528
// but we still need to check for collisions.
520529
let temporary_ids = self.temporary_ids(&ops, BTreeSet::new())?;
@@ -523,15 +532,20 @@ impl Catalog {
523532
let mut catalog_updates = vec![];
524533
let mut audit_events = vec![];
525534
let mut storage = self.storage().await;
526-
let mut tx = storage
527-
.transaction()
528-
.await
529-
.unwrap_or_terminate("starting catalog transaction");
530-
531-
// Advance OID counter past previously-allocated OIDs from earlier dry runs.
532-
if let Some(next_oid) = prev_next_oid {
533-
tx.set_next_oid(next_oid)?;
534-
}
535+
let mut tx = if let Some(snapshot) = prev_snapshot {
536+
// Restore transaction from saved snapshot so it starts in sync
537+
// with the accumulated CatalogState from previous dry runs.
538+
storage
539+
.transaction_from_snapshot(snapshot)
540+
.unwrap_or_terminate("starting catalog transaction from snapshot")
541+
} else {
542+
// First statement: fresh transaction from durable storage, which
543+
// is in sync with the real catalog state.
544+
storage
545+
.transaction()
546+
.await
547+
.unwrap_or_terminate("starting catalog transaction")
548+
};
535549

536550
// Process only the new ops against the accumulated state in dry-run mode.
537551
let new_state = Self::transact_inner(
@@ -549,14 +563,16 @@ impl Catalog {
549563
)
550564
.await?;
551565

552-
let new_next_oid = tx.get_next_oid();
566+
// Save the transaction's current state as a snapshot for the next
567+
// incremental dry run.
568+
let new_snapshot = tx.current_snapshot();
553569

554570
// Transaction is NOT committed — drop it.
555571
drop(storage);
556572

557573
// transact_inner returns Some(state) when ops produced changes.
558574
let state = new_state.unwrap_or_else(|| base_state.clone());
559-
Ok((state, new_next_oid))
575+
Ok((state, new_snapshot))
560576
}
561577

562578
/// Extracts optimized expressions from `Op::CreateItem` operations for views

src/adapter/src/coord/command_handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1151,7 +1151,7 @@ impl Coordinator {
11511151
state,
11521152
revision,
11531153
side_effects: vec![],
1154-
next_oid: None,
1154+
snapshot: None,
11551155
}) {
11561156
return ctx.retire(Err(err));
11571157
}

src/adapter/src/coord/ddl.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ impl Coordinator {
226226
ops: txn_ops,
227227
revision: txn_revision,
228228
state: txn_state,
229-
next_oid: txn_next_oid,
229+
snapshot: txn_snapshot,
230230
side_effects: _,
231231
},
232232
..
@@ -254,7 +254,7 @@ impl Coordinator {
254254
// Clone what we need from the session before taking &mut below.
255255
let txn_ops_clone = txn_ops.clone();
256256
let txn_state_clone = txn_state.clone();
257-
let prev_next_oid = *txn_next_oid;
257+
let prev_snapshot = txn_snapshot.clone();
258258

259259
// Validate resource limits with all accumulated + new ops (cheap O(N) counting).
260260
let mut combined_ops = txn_ops_clone;
@@ -268,14 +268,18 @@ impl Coordinator {
268268
// Get ConnMeta for the session.
269269
let conn = self.active_conns.get(ctx.session().conn_id());
270270

271-
// Incremental dry run: only process NEW ops against accumulated state.
272-
let (new_state, new_next_oid) = self
271+
// Incremental dry run: process only NEW ops against accumulated state.
272+
// If we have a saved snapshot from a previous dry run, use it to
273+
// initialize the transaction so it starts in sync with the accumulated
274+
// state. Otherwise (first statement), the fresh durable transaction is
275+
// already in sync with the real catalog state.
276+
let (new_state, new_snapshot) = self
273277
.catalog()
274278
.transact_incremental_dry_run(
275279
&txn_state_clone,
276280
ops.clone(),
277281
conn,
278-
prev_next_oid,
282+
prev_snapshot,
279283
oracle_write_ts,
280284
)
281285
.await?;
@@ -289,7 +293,7 @@ impl Coordinator {
289293
state: new_state,
290294
side_effects: vec![Box::new(side_effect)],
291295
revision: self.catalog().transient_revision(),
292-
next_oid: Some(new_next_oid),
296+
snapshot: Some(new_snapshot),
293297
});
294298

295299
self.metrics

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2163,7 +2163,7 @@ impl Coordinator {
21632163
state: _,
21642164
side_effects,
21652165
revision,
2166-
next_oid: _,
2166+
snapshot: _,
21672167
} => {
21682168
// Make sure our catalog hasn't changed.
21692169
if *revision != self.catalog().transient_revision() {

src/adapter/src/session.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use uuid::Uuid;
5353

5454
use crate::catalog::CatalogState;
5555
use crate::client::RecordFirstRowStream;
56+
use mz_catalog::durable::Snapshot;
5657
use crate::coord::appends::BuiltinTableAppendNotify;
5758
use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
5859
use crate::coord::peek::PeekResponseUnary;
@@ -1397,14 +1398,14 @@ impl<T: TimestampManipulation> TransactionStatus<T> {
13971398
revision: og_revision,
13981399
state: og_state,
13991400
side_effects,
1400-
next_oid: og_next_oid,
1401+
snapshot: og_snapshot,
14011402
} => match add_ops {
14021403
TransactionOps::DDL {
14031404
ops: new_ops,
14041405
revision: new_revision,
14051406
side_effects: mut net_new_side_effects,
14061407
state: new_state,
1407-
next_oid: new_next_oid,
1408+
snapshot: new_snapshot,
14081409
} => {
14091410
if *og_revision != new_revision {
14101411
return Err(AdapterError::DDLTransactionRace);
@@ -1413,7 +1414,7 @@ impl<T: TimestampManipulation> TransactionStatus<T> {
14131414
if !new_ops.is_empty() {
14141415
*og_ops = new_ops;
14151416
*og_state = new_state;
1416-
*og_next_oid = new_next_oid;
1417+
*og_snapshot = new_snapshot;
14171418
}
14181419
side_effects.append(&mut net_new_side_effects);
14191420
}
@@ -1604,10 +1605,11 @@ pub enum TransactionOps<T> {
16041605
>,
16051606
/// Transient revision of the `Catalog` when this transaction started.
16061607
revision: u64,
1607-
/// Tracks the OID allocator position after the last dry run, so that
1608-
/// incremental dry runs can advance past previously-allocated OIDs.
1609-
/// `None` for the first statement in the transaction (before any dry run).
1610-
next_oid: Option<u64>,
1608+
/// Snapshot of the durable transaction state after the last dry run.
1609+
/// Used to initialize the next dry run's transaction so it starts
1610+
/// in sync with the accumulated `state`. `None` for the first
1611+
/// statement in the transaction (before any dry run).
1612+
snapshot: Option<Snapshot>,
16111613
},
16121614
}
16131615

src/catalog/src/durable.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
3131
pub use crate::durable::metrics::Metrics;
3232
pub use crate::durable::objects::state_update::StateUpdate;
3333
use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
34-
use crate::durable::objects::{AuditLog, Snapshot};
34+
use crate::durable::objects::AuditLog;
35+
pub use crate::durable::objects::Snapshot;
3536
pub use crate::durable::objects::{
3637
Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
3738
Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
@@ -298,6 +299,16 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
298299
/// Creates a new durable catalog state transaction.
299300
async fn transaction(&mut self) -> Result<Transaction, CatalogError>;
300301

302+
/// Creates a new transaction initialized from the given [`Snapshot`]
303+
/// instead of reading from durable storage. Used for incremental DDL
304+
/// dry runs where the transaction state from a previous dry run has been
305+
/// saved and needs to be restored so it stays in sync with the accumulated
306+
/// `CatalogState`.
307+
fn transaction_from_snapshot(
308+
&mut self,
309+
snapshot: Snapshot,
310+
) -> Result<Transaction, CatalogError>;
311+
301312
/// Commits a durable catalog state transaction. The transaction will be committed at
302313
/// `commit_ts`.
303314
///

src/catalog/src/durable/persist.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1669,6 +1669,14 @@ impl DurableCatalogState for PersistCatalogState {
16691669
Transaction::new(self, snapshot, commit_ts)
16701670
}
16711671

1672+
fn transaction_from_snapshot(
1673+
&mut self,
1674+
snapshot: Snapshot,
1675+
) -> Result<Transaction, CatalogError> {
1676+
let commit_ts = self.upper.clone();
1677+
Transaction::new(self, snapshot, commit_ts)
1678+
}
1679+
16721680
#[mz_ore::instrument(level = "debug")]
16731681
async fn commit_transaction(
16741682
&mut self,

src/catalog/src/durable/transaction.rs

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,31 +1022,37 @@ impl<'a> Transaction<'a> {
10221022
.map(|oids| oids.into_element())
10231023
}
10241024

1025-
/// Returns the current next-OID value from the id allocator.
1026-
pub fn get_next_oid(&self) -> u64 {
1027-
self.id_allocator
1028-
.items()
1029-
.get(&IdAllocKey {
1030-
name: OID_ALLOC_KEY.to_string(),
1031-
})
1032-
.unwrap_or_else(|| panic!("{OID_ALLOC_KEY} id allocator missing"))
1033-
.next_id
1034-
}
1035-
1036-
/// Advances the OID allocator to `next_oid`. Used for incremental DDL
1037-
/// transaction dry runs where a fresh Transaction needs to skip past OIDs
1038-
/// already allocated in previous dry runs.
1039-
pub fn set_next_oid(&mut self, next_oid: u64) -> Result<(), CatalogError> {
1040-
self.id_allocator.set(
1041-
IdAllocKey {
1042-
name: OID_ALLOC_KEY.to_string(),
1043-
},
1044-
Some(IdAllocValue {
1045-
next_id: next_oid,
1046-
}),
1047-
self.op_id,
1048-
)?;
1049-
Ok(())
1025+
/// Exports the current state of this transaction as a [`Snapshot`].
1026+
///
1027+
/// This merges each `TableTransaction`'s initial data with its pending
1028+
/// changes to produce the current view, then converts back to proto types.
1029+
/// Used to persist transaction state between incremental DDL dry runs so
1030+
/// the next dry run's fresh `Transaction` starts in sync with the
1031+
/// accumulated `CatalogState`.
1032+
pub fn current_snapshot(&self) -> Snapshot {
1033+
Snapshot {
1034+
databases: self.databases.current_items_proto(),
1035+
schemas: self.schemas.current_items_proto(),
1036+
roles: self.roles.current_items_proto(),
1037+
role_auth: self.role_auth.current_items_proto(),
1038+
items: self.items.current_items_proto(),
1039+
comments: self.comments.current_items_proto(),
1040+
clusters: self.clusters.current_items_proto(),
1041+
network_policies: self.network_policies.current_items_proto(),
1042+
cluster_replicas: self.cluster_replicas.current_items_proto(),
1043+
introspection_sources: self.introspection_sources.current_items_proto(),
1044+
id_allocator: self.id_allocator.current_items_proto(),
1045+
configs: self.configs.current_items_proto(),
1046+
settings: self.settings.current_items_proto(),
1047+
system_object_mappings: self.system_gid_mapping.current_items_proto(),
1048+
system_configurations: self.system_configurations.current_items_proto(),
1049+
default_privileges: self.default_privileges.current_items_proto(),
1050+
source_references: self.source_references.current_items_proto(),
1051+
system_privileges: self.system_privileges.current_items_proto(),
1052+
storage_collection_metadata: self.storage_collection_metadata.current_items_proto(),
1053+
unfinalized_shards: self.unfinalized_shards.current_items_proto(),
1054+
txn_wal_shard: self.txn_wal_shard.current_items_proto(),
1055+
}
10501056
}
10511057

10521058
pub(crate) fn insert_id_allocator(
@@ -3130,6 +3136,22 @@ where
31303136
items
31313137
}
31323138

3139+
/// Returns the current items as proto-typed key-value pairs, suitable for
3140+
/// constructing a [`Snapshot`]. This merges `initial` and `pending` to
3141+
/// produce the current view and converts back to proto types.
3142+
fn current_items_proto<KP, VP>(&self) -> BTreeMap<KP, VP>
3143+
where
3144+
K: RustType<KP>,
3145+
V: RustType<VP>,
3146+
KP: Ord,
3147+
{
3148+
let mut items = BTreeMap::new();
3149+
self.for_values(|k, v| {
3150+
items.insert(k.into_proto(), v.into_proto());
3151+
});
3152+
items
3153+
}
3154+
31333155
/// Returns the items viewable in the current transaction as references. Returns a map
31343156
/// of references.
31353157
fn items(&self) -> BTreeMap<&K, &V> {

0 commit comments

Comments
 (0)