diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala index 0eab1e92568..3986365062b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala @@ -270,6 +270,23 @@ private[delta] class ConflictChecker( if (isWinnerDroppingFeatures) { throw DeltaErrors.protocolChangedException(winningCommitSummary.commitInfo) } + + if (spark.conf.get( + DeltaSQLConf.DELTA_CONFLICT_CHECKER_ENFORCE_FEATURE_ENABLEMENT_VALIDATION)) { + // Check if the winning protocol adds features that should fail concurrent transactions at + // upgrade. These features are identified by the `failConcurrentTransactionsAtUpgrade` + // method returning true. These features impose write-time requirements that need to be + // respected by all writers beyond the protocol upgrade, and there's no custom feature + // specific conflict resolution logic below to be able to have the current transaction meet + // these requirements on-the-fly. + val winningTxnAddedFeatures = TableFeature.getAddedFeatures(winningProtocol, readProtocol) + + val winningTxnUnsafeAddedFeatures = winningTxnAddedFeatures + .filter(_.failConcurrentTransactionsAtUpgrade) + if (winningTxnUnsafeAddedFeatures.nonEmpty) { + throw DeltaErrors.protocolChangedException(winningCommitSummary.commitInfo) + } + } } // When the winning transaction does not change the protocol but the losing txn is // a protocol downgrade, we re-validate the invariants of the removed feature. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 01d82ad3dcc..bada4834d8d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -115,6 +115,15 @@ sealed abstract class TableFeature( */ def isRemovable: Boolean = this.isInstanceOf[RemovableFeature] + /** + * True if the addition of this feature in the protocol is expected to fail concurrent + * transactions. This is desirable for features that are implicitly enabled by being present + * in the protocol, and also impose write-time requirements that need to be respected by all + * writers beyond the protocol upgrade. Note that features that do reconciliation at conflict + * checking time (e.g. RowTrackingFeature) should return false. + */ + def failConcurrentTransactionsAtUpgrade: Boolean = true + /** * Set of table features that this table feature depends on. I.e. the set of features that need * to be enabled if this table feature is enabled. @@ -455,6 +464,18 @@ object TableFeature { oldFeatures -- newFeatures } + /** + * Extracts the added features by comparing new and old protocols. + * Returns None if there are no added features. + */ + def getAddedFeatures( + newProtocol: Protocol, + oldProtocol: Protocol): Set[TableFeature] = { + val newFeatures = newProtocol.implicitlyAndExplicitlySupportedFeatures + val oldFeatures = oldProtocol.implicitlyAndExplicitlySupportedFeatures + newFeatures -- oldFeatures + } + /** Identifies whether there was any feature removal between two protocols. */ def isProtocolRemovingFeatures(newProtocol: Protocol, oldProtocol: Protocol): Boolean = { getDroppedFeatures(newProtocol = newProtocol, oldProtocol = oldProtocol).nonEmpty @@ -520,6 +541,7 @@ object AppendOnlyTableFeature spark: SparkSession): Boolean = { DeltaConfigs.IS_APPEND_ONLY.fromMetaData(metadata) } + override def failConcurrentTransactionsAtUpgrade: Boolean = false } object InvariantsTableFeature @@ -565,6 +587,7 @@ object ChangeDataFeedTableFeature spark: SparkSession): Boolean = { DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(metadata) } + override def failConcurrentTransactionsAtUpgrade: Boolean = false } object GeneratedColumnsTableFeature @@ -595,6 +618,8 @@ object ColumnMappingTableFeature } } + override def failConcurrentTransactionsAtUpgrade: Boolean = false + override def validateDropInvariants(table: DeltaTableV2, snapshot: Snapshot): Boolean = { val schemaHasNoColumnMappingMetadata = !DeltaColumnMapping.schemaHasColumnMappingMetadata(snapshot.schema) @@ -631,6 +656,7 @@ object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz protocol: Protocol, metadata: Metadata, spark: SparkSession): Boolean = { SchemaUtils.checkForTimestampNTZColumnsRecursively(metadata.schema) } + override def failConcurrentTransactionsAtUpgrade: Boolean = false } object RedirectReaderWriterFeature @@ -795,6 +821,8 @@ object DeletionVectorsTableFeature DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(metadata) } + override def failConcurrentTransactionsAtUpgrade: Boolean = false + /** * Validate whether all deletion vector traces are removed from the snapshot. * @@ -838,6 +866,8 @@ object RowTrackingFeature extends WriterFeature(name = "rowTracking") override def requiredFeatures: Set[TableFeature] = Set(DomainMetadataTableFeature) + override def failConcurrentTransactionsAtUpgrade: Boolean = false + /** * When dropping row tracking we remove all relevant properties at downgrade commit. * This is because concurrent transactions may still use them while the feature exists in the @@ -949,6 +979,8 @@ object DomainMetadataTableFeature snapshot.domainMetadata.isEmpty } + override def failConcurrentTransactionsAtUpgrade: Boolean = false + override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = { DomainMetadataPreDowngradeCommand(table) } @@ -962,6 +994,8 @@ object IcebergCompatV1TableFeature extends WriterFeature(name = "icebergCompatV1 override def automaticallyUpdateProtocolOfExistingTables: Boolean = true + override def failConcurrentTransactionsAtUpgrade: Boolean = false + override def metadataRequiresFeatureToBeEnabled( protocol: Protocol, metadata: Metadata, @@ -975,6 +1009,8 @@ object IcebergCompatV2TableFeature extends WriterFeature(name = "icebergCompatV2 override def automaticallyUpdateProtocolOfExistingTables: Boolean = true + override def failConcurrentTransactionsAtUpgrade: Boolean = false + override def metadataRequiresFeatureToBeEnabled( protocol: Protocol, metadata: Metadata, @@ -1034,6 +1070,13 @@ object MaterializePartitionColumnsTableFeature override def automaticallyUpdateProtocolOfExistingTables: Boolean = true + /** + * MaterializePartitionColumnsTableFeature is always enabled when present in the protocol. + * The Delta protocol does not require any metadata or domain metadata configs for this + * feature to be effective. + */ + override def failConcurrentTransactionsAtUpgrade: Boolean = true + override def metadataRequiresFeatureToBeEnabled( protocol: Protocol, metadata: Metadata, @@ -1176,6 +1219,8 @@ abstract class TypeWideningTableFeatureBase(name: String) extends ReaderWriterFe override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = TypeWideningPreDowngradeCommand(table) + + override def failConcurrentTransactionsAtUpgrade: Boolean = false } /** @@ -1227,6 +1272,7 @@ object InCommitTimestampTableFeature override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = InCommitTimestampsPreDowngradeCommand(table) + override def failConcurrentTransactionsAtUpgrade: Boolean = false /** * As per the spec, we can disable ICT by just setting diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index dca3bd82d3b..f038d0a1f14 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -458,6 +458,15 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils { .checkValue(_ >= 0, "maxNonConflictCommitAttempts has to be positive") .createWithDefault(10) + val DELTA_CONFLICT_CHECKER_ENFORCE_FEATURE_ENABLEMENT_VALIDATION = + buildConf("conflictChecker.enforceConcurrentFeatureEnablement.enabled") + .internal() + .doc("When enabled, the conflict checker will enforce that features that are marked " + + "as failing concurrent transactions at upgrade, will fail any conflicting commits with " + + "their enablement protocol changes.") + .booleanConf + .createWithDefault(false) + val FEATURE_ENABLEMENT_CONFLICT_RESOLUTION_ENABLED = buildConf("featureEnablement.conflictResolution.enabled") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index d0f24f1d1dd..34c50545cd1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -340,6 +340,42 @@ class OptimisticTransactionSuite } } + test("concurrent feature enablement with failConcurrentTransactionsAtUpgrade should conflict") { + withSQLConf( + DeltaSQLConf.DELTA_CONFLICT_CHECKER_ENFORCE_FEATURE_ENABLEMENT_VALIDATION.key -> "true") { + withTempDir { tempDir => + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + + val metadata = Metadata( + schemaString = new StructType().add("x", IntegerType).json, + partitionColumns = Seq("x")) + val protocol = Protocol(3, 7) + log.startTransaction().commit(Seq(metadata, protocol), ManualUpdate) + + // Start a transaction that will write to the table concurrently with feature enablement. + val txn = log.startTransaction() + + // Concurrently, enable the MaterializePartitionColumns feature + // This feature has failConcurrentTransactionsAtUpgrade = true + val newProtocol = txn.snapshot.protocol.withFeatures( + Set(MaterializePartitionColumnsTableFeature)) + + log.startTransaction().commit(Seq(newProtocol), ManualUpdate) + + // The original transaction should fail when trying to commit + // because a feature with failConcurrentTransactionsAtUpgrade=true was added + val e = intercept[io.delta.exceptions.ProtocolChangedException] { + txn.commit( + Seq(AddFile("test", Map("x" -> "1"), 1, 1, dataChange = true)), + ManualUpdate) + } + + // Verify the error message + assert(e.getMessage.contains("The protocol version of the Delta table has been changed")) + } + } + } + test("AddFile with different partition schema compared to metadata should fail") { withTempDir { tempDir => val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath)) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/rowtracking/RowTrackingConflictResolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/rowtracking/RowTrackingConflictResolutionSuite.scala index a5d37c419af..97f33221ffe 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/rowtracking/RowTrackingConflictResolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/rowtracking/RowTrackingConflictResolutionSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.rowtracking import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaOperations.{ManualUpdate, Truncate} import org.apache.spark.sql.delta.actions.{Action, AddFile} -import org.apache.spark.sql.delta.actions.{Metadata, RemoveFile} +import org.apache.spark.sql.delta.actions.{Metadata, Protocol, RemoveFile} import org.apache.spark.sql.delta.commands.backfill.{BackfillCommandStats, RowTrackingBackfillExecutor} import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray import org.apache.spark.sql.delta.rowid.RowIdTestUtils @@ -75,8 +75,9 @@ class RowTrackingConflictResolutionSuite extends QueryTest /** Add Row tracking table feature support. */ private def activateRowTracking(): Unit = { require(!latestSnapshot.protocol.isFeatureSupported(RowTrackingFeature)) - deltaLog.upgradeProtocol(Action.supportedProtocolVersion( - featuresToExclude = Seq(CatalogOwnedTableFeature))) + val protocolWithRowTracking = Protocol(3, 7).withFeature(RowTrackingFeature) + deltaLog.upgradeProtocol( + None, latestSnapshot, latestSnapshot.protocol.merge(protocolWithRowTracking)) } // Add 'numRecords' records to the table. @@ -116,9 +117,10 @@ class RowTrackingConflictResolutionSuite extends QueryTest val filePath = "file_path" val txn = deltaLog.startTransaction() + val protocolWithRowTracking = Protocol(3, 7).withFeature(RowTrackingFeature) deltaLog.startTransaction().commit( Seq( - Action.supportedProtocolVersion(featuresToExclude = Seq(CatalogOwnedTableFeature)), + latestSnapshot.protocol.merge(protocolWithRowTracking), addFile("other_path") ), DeltaOperations.ManualUpdate) txn.commit(Seq(addFile(filePath)), DeltaOperations.ManualUpdate)