Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,23 @@ private[delta] class ConflictChecker(
if (isWinnerDroppingFeatures) {
throw DeltaErrors.protocolChangedException(winningCommitSummary.commitInfo)
}

if (spark.conf.get(
DeltaSQLConf.DELTA_CONFLICT_CHECKER_ENFORCE_FEATURE_ENABLEMENT_VALIDATION)) {
// Check if the winning protocol adds features that should fail concurrent transactions at
// upgrade. These features are identified by the `failConcurrentTransactionsAtUpgrade`
// method returning true. These features impose write-time requirements that need to be
// respected by all writers beyond the protocol upgrade, and there's no custom feature
// specific conflict resolution logic below to be able to have the current transaction meet
// these requirements on-the-fly.
val winningTxnAddedFeatures = TableFeature.getAddedFeatures(winningProtocol, readProtocol)

val winningTxnUnsafeAddedFeatures = winningTxnAddedFeatures
.filter(_.failConcurrentTransactionsAtUpgrade)
if (winningTxnUnsafeAddedFeatures.nonEmpty) {
throw DeltaErrors.protocolChangedException(winningCommitSummary.commitInfo)
}
}
}
// When the winning transaction does not change the protocol but the losing txn is
// a protocol downgrade, we re-validate the invariants of the removed feature.
Expand Down
46 changes: 46 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ sealed abstract class TableFeature(
*/
def isRemovable: Boolean = this.isInstanceOf[RemovableFeature]

/**
* True if the addition of this feature in the protocol is expected to fail concurrent
* transactions. This is desirable for features that are implicitly enabled by being present
* in the protocol, and also impose write-time requirements that need to be respected by all
* writers beyond the protocol upgrade. Note that features that do reconciliation at conflict
* checking time (e.g. RowTrackingFeature) should return false.
*/
def failConcurrentTransactionsAtUpgrade: Boolean = true

/**
* Set of table features that this table feature depends on. I.e. the set of features that need
* to be enabled if this table feature is enabled.
Expand Down Expand Up @@ -455,6 +464,18 @@ object TableFeature {
oldFeatures -- newFeatures
}

/**
* Extracts the added features by comparing new and old protocols.
* Returns None if there are no added features.
*/
def getAddedFeatures(
newProtocol: Protocol,
oldProtocol: Protocol): Set[TableFeature] = {
val newFeatures = newProtocol.implicitlyAndExplicitlySupportedFeatures
val oldFeatures = oldProtocol.implicitlyAndExplicitlySupportedFeatures
newFeatures -- oldFeatures
}

/** Identifies whether there was any feature removal between two protocols. */
def isProtocolRemovingFeatures(newProtocol: Protocol, oldProtocol: Protocol): Boolean = {
getDroppedFeatures(newProtocol = newProtocol, oldProtocol = oldProtocol).nonEmpty
Expand Down Expand Up @@ -520,6 +541,7 @@ object AppendOnlyTableFeature
spark: SparkSession): Boolean = {
DeltaConfigs.IS_APPEND_ONLY.fromMetaData(metadata)
}
override def failConcurrentTransactionsAtUpgrade: Boolean = false
}

object InvariantsTableFeature
Expand Down Expand Up @@ -565,6 +587,7 @@ object ChangeDataFeedTableFeature
spark: SparkSession): Boolean = {
DeltaConfigs.CHANGE_DATA_FEED.fromMetaData(metadata)
}
override def failConcurrentTransactionsAtUpgrade: Boolean = false
}

object GeneratedColumnsTableFeature
Expand Down Expand Up @@ -595,6 +618,8 @@ object ColumnMappingTableFeature
}
}

override def failConcurrentTransactionsAtUpgrade: Boolean = false

override def validateDropInvariants(table: DeltaTableV2, snapshot: Snapshot): Boolean = {
val schemaHasNoColumnMappingMetadata =
!DeltaColumnMapping.schemaHasColumnMappingMetadata(snapshot.schema)
Expand Down Expand Up @@ -631,6 +656,7 @@ object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz
protocol: Protocol, metadata: Metadata, spark: SparkSession): Boolean = {
SchemaUtils.checkForTimestampNTZColumnsRecursively(metadata.schema)
}
override def failConcurrentTransactionsAtUpgrade: Boolean = false
}

object RedirectReaderWriterFeature
Expand Down Expand Up @@ -795,6 +821,8 @@ object DeletionVectorsTableFeature
DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(metadata)
}

override def failConcurrentTransactionsAtUpgrade: Boolean = false

/**
* Validate whether all deletion vector traces are removed from the snapshot.
*
Expand Down Expand Up @@ -838,6 +866,8 @@ object RowTrackingFeature extends WriterFeature(name = "rowTracking")

override def requiredFeatures: Set[TableFeature] = Set(DomainMetadataTableFeature)

override def failConcurrentTransactionsAtUpgrade: Boolean = false

/**
* When dropping row tracking we remove all relevant properties at downgrade commit.
* This is because concurrent transactions may still use them while the feature exists in the
Expand Down Expand Up @@ -949,6 +979,8 @@ object DomainMetadataTableFeature
snapshot.domainMetadata.isEmpty
}

override def failConcurrentTransactionsAtUpgrade: Boolean = false

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = {
DomainMetadataPreDowngradeCommand(table)
}
Expand All @@ -962,6 +994,8 @@ object IcebergCompatV1TableFeature extends WriterFeature(name = "icebergCompatV1

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def failConcurrentTransactionsAtUpgrade: Boolean = false

override def metadataRequiresFeatureToBeEnabled(
protocol: Protocol,
metadata: Metadata,
Expand All @@ -975,6 +1009,8 @@ object IcebergCompatV2TableFeature extends WriterFeature(name = "icebergCompatV2

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def failConcurrentTransactionsAtUpgrade: Boolean = false

override def metadataRequiresFeatureToBeEnabled(
protocol: Protocol,
metadata: Metadata,
Expand Down Expand Up @@ -1034,6 +1070,13 @@ object MaterializePartitionColumnsTableFeature

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

/**
* MaterializePartitionColumnsTableFeature is always enabled when present in the protocol.
* The Delta protocol does not require any metadata or domain metadata configs for this
* feature to be effective.
*/
override def failConcurrentTransactionsAtUpgrade: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
protocol: Protocol,
metadata: Metadata,
Expand Down Expand Up @@ -1176,6 +1219,8 @@ abstract class TypeWideningTableFeatureBase(name: String) extends ReaderWriterFe

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
TypeWideningPreDowngradeCommand(table)

override def failConcurrentTransactionsAtUpgrade: Boolean = false
}

/**
Expand Down Expand Up @@ -1227,6 +1272,7 @@ object InCommitTimestampTableFeature
override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
InCommitTimestampsPreDowngradeCommand(table)

override def failConcurrentTransactionsAtUpgrade: Boolean = false

/**
* As per the spec, we can disable ICT by just setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,15 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
.checkValue(_ >= 0, "maxNonConflictCommitAttempts has to be positive")
.createWithDefault(10)

val DELTA_CONFLICT_CHECKER_ENFORCE_FEATURE_ENABLEMENT_VALIDATION =
buildConf("conflictChecker.enforceConcurrentFeatureEnablement.enabled")
.internal()
.doc("When enabled, the conflict checker will enforce that features that are marked " +
"as failing concurrent transactions at upgrade, will fail any conflicting commits with " +
"their enablement protocol changes.")
.booleanConf
.createWithDefault(false)

val FEATURE_ENABLEMENT_CONFLICT_RESOLUTION_ENABLED =
buildConf("featureEnablement.conflictResolution.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,42 @@ class OptimisticTransactionSuite
}
}

test("concurrent feature enablement with failConcurrentTransactionsAtUpgrade should conflict") {
withSQLConf(
DeltaSQLConf.DELTA_CONFLICT_CHECKER_ENFORCE_FEATURE_ENABLEMENT_VALIDATION.key -> "true") {
withTempDir { tempDir =>
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))

val metadata = Metadata(
schemaString = new StructType().add("x", IntegerType).json,
partitionColumns = Seq("x"))
val protocol = Protocol(3, 7)
log.startTransaction().commit(Seq(metadata, protocol), ManualUpdate)

// Start a transaction that will write to the table concurrently with feature enablement.
val txn = log.startTransaction()

// Concurrently, enable the MaterializePartitionColumns feature
// This feature has failConcurrentTransactionsAtUpgrade = true
val newProtocol = txn.snapshot.protocol.withFeatures(
Set(MaterializePartitionColumnsTableFeature))

log.startTransaction().commit(Seq(newProtocol), ManualUpdate)

// The original transaction should fail when trying to commit
// because a feature with failConcurrentTransactionsAtUpgrade=true was added
val e = intercept[io.delta.exceptions.ProtocolChangedException] {
txn.commit(
Seq(AddFile("test", Map("x" -> "1"), 1, 1, dataChange = true)),
ManualUpdate)
}

// Verify the error message
assert(e.getMessage.contains("The protocol version of the Delta table has been changed"))
}
}
}

test("AddFile with different partition schema compared to metadata should fail") {
withTempDir { tempDir =>
val log = DeltaLog.forTable(spark, new Path(tempDir.getAbsolutePath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.rowtracking
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaOperations.{ManualUpdate, Truncate}
import org.apache.spark.sql.delta.actions.{Action, AddFile}
import org.apache.spark.sql.delta.actions.{Metadata, RemoveFile}
import org.apache.spark.sql.delta.actions.{Metadata, Protocol, RemoveFile}
import org.apache.spark.sql.delta.commands.backfill.{BackfillCommandStats, RowTrackingBackfillExecutor}
import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray
import org.apache.spark.sql.delta.rowid.RowIdTestUtils
Expand Down Expand Up @@ -75,8 +75,9 @@ class RowTrackingConflictResolutionSuite extends QueryTest
/** Add Row tracking table feature support. */
private def activateRowTracking(): Unit = {
require(!latestSnapshot.protocol.isFeatureSupported(RowTrackingFeature))
deltaLog.upgradeProtocol(Action.supportedProtocolVersion(
featuresToExclude = Seq(CatalogOwnedTableFeature)))
val protocolWithRowTracking = Protocol(3, 7).withFeature(RowTrackingFeature)
deltaLog.upgradeProtocol(
None, latestSnapshot, latestSnapshot.protocol.merge(protocolWithRowTracking))
}

// Add 'numRecords' records to the table.
Expand Down Expand Up @@ -116,9 +117,10 @@ class RowTrackingConflictResolutionSuite extends QueryTest
val filePath = "file_path"

val txn = deltaLog.startTransaction()
val protocolWithRowTracking = Protocol(3, 7).withFeature(RowTrackingFeature)
deltaLog.startTransaction().commit(
Seq(
Action.supportedProtocolVersion(featuresToExclude = Seq(CatalogOwnedTableFeature)),
latestSnapshot.protocol.merge(protocolWithRowTracking),
addFile("other_path")
), DeltaOperations.ManualUpdate)
txn.commit(Seq(addFile(filePath)), DeltaOperations.ManualUpdate)
Expand Down
Loading