Skip to content

feat: enforce and test non-null column invariant on writes#2483

Merged
sanujbasu merged 1 commit intodelta-io:mainfrom
sanujbasu:stack/enforce-non-null-columns
May 2, 2026
Merged

feat: enforce and test non-null column invariant on writes#2483
sanujbasu merged 1 commit intodelta-io:mainfrom
sanujbasu:stack/enforce-non-null-columns

Conversation

@sanujbasu
Copy link
Copy Markdown
Collaborator

@sanujbasu sanujbasu commented Apr 28, 2026

What changes are proposed in this pull request?

Fixes #2465.

Closes the write-time enforcement gap left by #2418, which auto-enables the invariants writer feature on schemas with non-null columns but does not itself reject nulls on writes. Kernel now rejects Scalar::Null partition values for partition columns with nullable: false in partitioned_write_context, matching Delta-Spark's DELTA_NOT_NULL_CONSTRAINT_VIOLATED (SQLSTATE 23502).

The check sits in validate_types alongside the existing type checks. The Scalar::Null inner type continues to be ignored on nulls, so connectors can keep constructing null partition values without first looking up the schema; only the schema field's nullability is enforced. This matches Catalyst's NullType permissiveness and the type-erased on-wire format (partitionValues serializes nulls as JSON null with no type information).

How was this change tested?

Adds integration tests covering all three non-null write paths:

1/ Non-materialized partitioned writes (default): kernel rejects the null Scalar at partitioned_write_context time, before serialization. Covered across all column-mapping modes via the kernel create_table builder.

2/ Materialized partitioned writes (materializePartitionColumns): kernel delegates to the engine's ParquetHandler. The default engine inherits the guarantee from arrow-rs, whose RecordBatch::try_new rejects nulls in fields with nullable: false. The table for this case is set up via the kernel create_table builder with delta.feature.materializePartitionColumns=supported (allow-listed by #2481).

3/ Plain non-null data columns: same engine-delegation seam, exercised across a representative primitive type matrix (integer, long, string, binary, boolean, timestamp, decimal). The table is created via the kernel create_table builder so the non-null schema drives the auto-enablement of the invariants writer feature end-to-end; the test asserts both the auto-enable and the downstream RecordBatch::try_new rejection. Pins the full chain documented on maybe_enable_invariants in transaction/builder/create_table.rs.

Local verification:

1/ cargo +nightly fmt --check
2/ cargo clippy --workspace --benches --tests --all-features -- -D warnings
3/ cargo doc --workspace --all-features --no-deps
4/ cargo test -p delta_kernel --all-features for affected paths

@sanujbasu sanujbasu changed the title test creation feat: enforce and test non-null column invariant on writes Apr 28, 2026
@sanujbasu sanujbasu force-pushed the stack/enforce-non-null-columns branch from 5e327f6 to 26a8b68 Compare April 28, 2026 05:56
@sanujbasu sanujbasu marked this pull request as ready for review April 28, 2026 05:59
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 28, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 88.50%. Comparing base (b61bf96) to head (063dda3).
⚠️ Report is 7 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2483      +/-   ##
==========================================
+ Coverage   88.47%   88.50%   +0.03%     
==========================================
  Files         178      179       +1     
  Lines       58807    59031     +224     
  Branches    58807    59031     +224     
==========================================
+ Hits        52028    52245     +217     
+ Misses       4793     4785       -8     
- Partials     1986     2001      +15     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@sanujbasu sanujbasu force-pushed the stack/enforce-non-null-columns branch from 26a8b68 to eb70829 Compare April 28, 2026 06:20
Copy link
Copy Markdown
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Minor comments overall -- core production logic is sound.

Note claude also identified:

  • Transaction::partitioned_write_context --- Says "null is valid for any primitive partition column". After this PR that is wrong. The companion docstring on validate_types was updated; the public-API surface should match.

Comment thread kernel/src/partition/validation.rs Outdated
Comment thread kernel/src/partition/validation.rs Outdated
Comment thread kernel/src/partition/validation.rs Outdated
Comment thread kernel/tests/integration/write/types.rs Outdated
Comment thread kernel/tests/integration/write/types.rs Outdated
Comment thread kernel/tests/integration/write/partitioned.rs Outdated
Comment thread kernel/tests/integration/write/partitioned.rs Outdated
Comment thread kernel/tests/integration/write/partitioned.rs Outdated
Comment thread kernel/tests/integration/write/types.rs Outdated
@sanujbasu sanujbasu force-pushed the stack/enforce-non-null-columns branch 3 times, most recently from 7c554cc to fb4cfa6 Compare April 29, 2026 00:54
@sanujbasu
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/materialize-partition-columns-cleanup (7c554cc -> fb4cfa6)
kernel/tests/integration/write/partitioned.rs
@@ -0,0 +1,175 @@
+diff --git a/kernel/tests/integration/write/partitioned.rs b/kernel/tests/integration/write/partitioned.rs
+--- a/kernel/tests/integration/write/partitioned.rs
++++ b/kernel/tests/integration/write/partitioned.rs
+ 
+     Ok(())
+ }
++
++// ==============================================================================
++// NOT NULL partition column tests
++// ==============================================================================
++//
++// See [#2465](https://github.com/delta-io/delta-kernel-rs/issues/2465). These tests pin the
++// kernel contract that mirrors Delta-Spark's `DELTA_NOT_NULL_CONSTRAINT_VIOLATED` (SQLSTATE
++// 23502): a NULL written into a `NOT NULL` partition column must be rejected on the default
++// engine. The two arms exercise the two enforcement seams:
++//
++// 1. **Non-materialized** (default): `Transaction::generate_logical_to_physical` drops partition
++//    columns from the batch before Parquet write, so the partition-value map is the authority. The
++//    fix in `validate_types` rejects null `Scalar`s for `nullable: false` partition columns at
++//    `partitioned_write_context` time.
++// 2. **Materialized** (`materializePartitionColumns` writer feature): partition columns stay in the
++//    batch through the transform, so a null in the NOT NULL column is rejected by `arrow-rs` at
++//    `RecordBatch::try_new`, matching the data-column NOT NULL contract.
++
++/// Non-materialized: a null partition `Scalar` for a `NOT NULL` partition column is rejected
++/// at `partitioned_write_context` (i.e. before `engine.write_parquet` is called and before
++/// any path encoding). This is the partition-map authority described in the issue plan.
++///
++/// The CM axis is orthogonal to nullability validation -- `validate_types` runs against
++/// logical column names and field nullability before serialization. Unit tests in
++/// `kernel/src/partition/validation.rs` cover the type matrix; this test only confirms the
++/// e2e wiring through `Transaction`.
++#[rstest]
++#[case::cm_none(ColumnMappingMode::None)]
++#[case::cm_name(ColumnMappingMode::Name)]
++#[case::cm_id(ColumnMappingMode::Id)]
++#[tokio::test(flavor = "multi_thread")]
++async fn test_partition_not_null_rejects_null_scalar_non_materialized(
++    #[case] cm_mode: ColumnMappingMode,
++) -> Result<(), Box<dyn std::error::Error>> {
++    let _ = tracing_subscriber::fmt::try_init();
++
++    // Schema: a nullable data column + a NOT NULL string partition column.
++    let schema = Arc::new(StructType::try_new(vec![
++        StructField::nullable("value", DataType::INTEGER),
++        StructField::not_null("p", DataType::STRING),
++    ])?);
++    let partition_cols = ["p"];
++
++    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
++    let snapshot = create_partitioned_table(
++        &table_path,
++        engine.as_ref(),
++        schema,
++        &partition_cols,
++        cm_mode,
++    )?;
++
++    // The transform drops the partition column from the batch, so the partition-value map
++    // is the only place a null can show up for `p`. Driving `partitioned_write_context` with
++    // `Scalar::Null` must surface kernel's NOT NULL rejection from `validate_types`.
++    let txn = snapshot
++        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
++        .with_engine_info("default engine");
++    let result = txn.partitioned_write_context(HashMap::from([(
++        "p".to_string(),
++        Scalar::Null(DataType::STRING),
++    )]));
++
++    let err = result
++        .err()
++        .ok_or("expected partitioned_write_context to error for null into NOT NULL partition")?
++        .to_string();
++    assert!(err.contains("not nullable"), "{err}");
++    assert!(err.contains("'p'"), "{err}");
++
++    Ok(())
++}
++
++/// Non-materialized counterpart: a non-null partition `Scalar` for a `NOT NULL` partition
++/// column is accepted, and the partition-map authority test does not regress the legal
++/// path. Confirms our error case in
++/// [`test_partition_not_null_rejects_null_scalar_non_materialized`] is shape-driven, not a
++/// blanket rejection.
++#[tokio::test(flavor = "multi_thread")]
++async fn test_partition_not_null_accepts_non_null_scalar_non_materialized(
++) -> Result<(), Box<dyn std::error::Error>> {
++    let _ = tracing_subscriber::fmt::try_init();
++
++    let schema = Arc::new(StructType::try_new(vec![
++        StructField::nullable("value", DataType::INTEGER),
++        StructField::not_null("p", DataType::STRING),
++    ])?);
++    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
++    let snapshot = create_partitioned_table(
++        &table_path,
++        engine.as_ref(),
++        schema,
++        &["p"],
++        ColumnMappingMode::None,
++    )?;
++
++    let txn = snapshot
++        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
++        .with_engine_info("default engine");
++    txn.partitioned_write_context(HashMap::from([(
++        "p".to_string(),
++        Scalar::String("a".into()),
++    )]))?;
++
++    Ok(())
++}
++
++/// Materialized arm: with `materializePartitionColumns` enabled, partition columns stay in
++/// the logical batch (`Transaction::generate_logical_to_physical` does not drop them). The
++/// kernel-derived Arrow schema preserves `nullable: false`, and `arrow-rs` rejects a null
++/// in the NOT NULL partition column at `RecordBatch::try_new` -- the same enforcement seam
++/// as a NOT NULL data column. The feature is enabled at create time via the explicit
++/// feature signal `delta.feature.materializePartitionColumns=supported`.
++#[tokio::test(flavor = "multi_thread")]
++async fn test_partition_not_null_rejects_null_in_batch_materialized(
++) -> Result<(), Box<dyn std::error::Error>> {
++    let _ = tracing_subscriber::fmt::try_init();
++
++    let schema = Arc::new(StructType::try_new(vec![
++        StructField::nullable("value", DataType::INTEGER),
++        StructField::not_null("p", DataType::STRING),
++    ])?);
++
++    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
++    let _ = create_table(&table_path, schema.clone(), "test/1.0")
++        .with_data_layout(DataLayout::partitioned(["p"]))
++        .with_table_properties([("delta.feature.materializePartitionColumns", "supported")])
++        .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
++        .commit(engine.as_ref())?;
++
++    let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
++    assert!(
++        snapshot.table_configuration().is_feature_supported(
++            &delta_kernel::table_features::TableFeature::MaterializePartitionColumns
++        ),
++        "test setup must enable materializePartitionColumns"
++    );
++
++    // Partitioned write context with a non-null partition value. The materialize feature
++    // means the partition column stays in the logical batch; constructing that batch with a
++    // null in the NOT NULL `p` column is the failure surface we care about.
++    let txn = snapshot
++        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
++        .with_engine_info("default engine");
++    let _write_context = txn.partitioned_write_context(HashMap::from([(
++        "p".to_string(),
++        Scalar::String("a".into()),
++    )]))?;
++
++    let arrow_schema: ArrowSchema = schema.as_ref().try_into_arrow()?;
++    assert!(
++        !arrow_schema.field_with_name("p")?.is_nullable(),
++        "kernel `not_null` partition field must produce Arrow `nullable: false`",
++    );
++
++    let result = RecordBatch::try_new(
++        Arc::new(arrow_schema),
++        vec![
++            Arc::new(Int32Array::from(vec![Some(1)])),
++            Arc::new(StringArray::from(vec![None as Option<&str>])),
++        ],
++    );
++    assert!(
++        result.is_err(),
++        "RecordBatch::try_new should reject null in NOT NULL materialized partition column; got: {result:?}",
++    );
++
++    Ok(())
++}
\ No newline at end of file
kernel/tests/integration/write/types.rs
@@ -0,0 +1,159 @@
+diff --git a/kernel/tests/integration/write/types.rs b/kernel/tests/integration/write/types.rs
+--- a/kernel/tests/integration/write/types.rs
++++ b/kernel/tests/integration/write/types.rs
+ use std::sync::Arc;
+ 
+ use delta_kernel::arrow::array::{
+-    ArrayRef, BinaryArray, Int32Array, StructArray, TimestampMicrosecondArray,
++    ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Int32Array, Int64Array, StringArray,
++    StructArray, TimestampMicrosecondArray,
+ };
+ use delta_kernel::arrow::buffer::NullBuffer;
+ use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field};
+ use delta_kernel::object_store::path::Path;
+ use delta_kernel::object_store::ObjectStoreExt as _;
+ use delta_kernel::schema::{DataType, StructField, StructType};
++use delta_kernel::transaction::create_table::create_table as kernel_create_table;
+ use delta_kernel::{Engine, Error as KernelError, Snapshot};
+ use itertools::Itertools;
++use rstest::rstest;
+ use serde_json::Deserializer;
+ use tempfile::tempdir;
+-use test_utils::{create_table, engine_store_setup, test_read};
++use test_utils::{create_table, engine_store_setup, test_read, test_table_setup};
+ use url::Url;
+ 
+ #[tokio::test]
+ 
+     Ok(())
+ }
++
++// =============================================================================
++// NOT NULL data column tests (default engine)
++// =============================================================================
++//
++// These tests pin the contract documented on `maybe_enable_invariants` (see
++// `kernel/src/transaction/builder/create_table.rs`): kernel does not enforce
++// nullability at write time -- it relies on the engine's `ParquetHandler` to
++// do so. The default engine inherits the guarantee from `arrow-rs`, which
++// rejects null values for fields with `nullable: false` at
++// `RecordBatch::try_new`.
++//
++// Each case creates the table via the kernel `create_table` builder so the
++// non-null schema drives the auto-enablement of the `invariants` writer
++// feature end-to-end (the protocol is not hand-crafted). The test then opens
++// the standard write path (snapshot + transaction + `unpartitioned_write_context`)
++// and asserts that the input `RecordBatch` for `engine.write_parquet` cannot
++// be constructed with a null in the NOT NULL column. The failure surfaces
++// before the engine is ever invoked.
++
++fn null_array_int32() -> ArrayRef {
++    Arc::new(Int32Array::from(vec![None as Option<i32>]))
++}
++
++fn null_array_int64() -> ArrayRef {
++    Arc::new(Int64Array::from(vec![None as Option<i64>]))
++}
++
++fn null_array_string() -> ArrayRef {
++    Arc::new(StringArray::from(vec![None as Option<&str>]))
++}
++
++fn null_array_binary() -> ArrayRef {
++    Arc::new(BinaryArray::from(vec![None as Option<&[u8]>]))
++}
++
++fn null_array_boolean() -> ArrayRef {
++    Arc::new(BooleanArray::from(vec![None as Option<bool>]))
++}
++
++fn null_array_timestamp_utc() -> ArrayRef {
++    Arc::new(TimestampMicrosecondArray::from(vec![None as Option<i64>]).with_timezone("UTC"))
++}
++
++fn null_array_decimal_10_2() -> ArrayRef {
++    Arc::new(
++        Decimal128Array::from(vec![None as Option<i128>])
++            .with_precision_and_scale(10, 2)
++            .unwrap(),
++    )
++}
++
++/// Verifies the default-engine NOT NULL contract for non-partition columns
++/// across a representative set of primitive types. The contract:
++///
++/// 1. `nullable: false` propagates from kernel's `StructField` into the Arrow logical schema
++///    produced by `try_into_arrow`.
++/// 2. Constructing the Arrow `RecordBatch` an engine would hand to `engine.write_parquet` with a
++///    null in the NOT NULL column fails at `RecordBatch::try_new`, before the engine is invoked.
++///
++/// See [#2465](https://github.com/delta-io/delta-kernel-rs/issues/2465).
++#[rstest]
++#[case::integer(DataType::INTEGER, null_array_int32 as fn() -> ArrayRef)]
++#[case::long(DataType::LONG, null_array_int64 as fn() -> ArrayRef)]
++#[case::string(DataType::STRING, null_array_string as fn() -> ArrayRef)]
++#[case::binary(DataType::BINARY, null_array_binary as fn() -> ArrayRef)]
++#[case::boolean(DataType::BOOLEAN, null_array_boolean as fn() -> ArrayRef)]
++#[case::timestamp(DataType::TIMESTAMP, null_array_timestamp_utc as fn() -> ArrayRef)]
++#[case::decimal(DataType::decimal(10, 2).unwrap(), null_array_decimal_10_2 as fn() -> ArrayRef)]
++#[tokio::test]
++async fn test_not_null_data_column_rejects_null_in_batch(
++    #[case] data_type: DataType,
++    #[case] null_array_fn: fn() -> ArrayRef,
++) -> Result<(), Box<dyn std::error::Error>> {
++    let _ = tracing_subscriber::fmt::try_init();
++
++    // Step 1: Create the table via the kernel builder. The non-null schema is
++    // the only signal needed -- `maybe_enable_invariants` auto-adds the
++    // `invariants` writer feature. The test exercises the full chain
++    // (schema -> auto-enable -> engine enforcement) rather than hand-crafting
++    // the protocol.
++    let schema = Arc::new(StructType::try_new(vec![StructField::not_null(
++        "c",
++        data_type.clone(),
++    )])?);
++    let (_tmp_dir, table_path, engine) = test_table_setup()?;
++    let _ = kernel_create_table(&table_path, schema.clone(), "test/1.0")
++        .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
++        .commit(engine.as_ref())?;
++
++    // Step 2: Confirm the protocol auto-enabled `invariants` -- the upstream
++    // half of the contract this test pins.
++    let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
++    assert!(
++        snapshot
++            .table_configuration()
++            .protocol()
++            .writer_features()
++            .is_some_and(|f| f.contains(&delta_kernel::table_features::TableFeature::Invariants)),
++        "non-null schema must auto-enable the `invariants` writer feature",
++    );
++
++    // Step 3: Open the standard default-engine write path. Going through
++    // `Snapshot::transaction` and `unpartitioned_write_context` exercises the
++    // exact setup an engine performs prior to calling `engine.write_parquet`.
++    let txn = snapshot
++        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
++        .with_engine_info("default engine");
++    let _write_context = txn.unpartitioned_write_context()?;
++
++    // Step 4: Confirm `nullable: false` propagates from kernel into the
++    // logical Arrow schema engines build batches against.
++    let arrow_schema: delta_kernel::arrow::datatypes::Schema = schema.as_ref().try_into_arrow()?;
++    assert!(
++        !arrow_schema.field(0).is_nullable(),
++        "kernel `not_null` field must produce Arrow `nullable: false`",
++    );
++
++    // Step 5: Building the input `RecordBatch` for `engine.write_parquet` with
++    // a null in the NOT NULL column must fail at `RecordBatch::try_new`,
++    // before the engine is called. This is the default-engine NOT NULL
++    // enforcement seam documented on `maybe_enable_invariants`.
++    let result = RecordBatch::try_new(Arc::new(arrow_schema), vec![null_array_fn()]);
++    assert!(
++        result.is_err(),
++        "RecordBatch::try_new should reject null in NOT NULL column ({data_type:?}); got: {result:?}",
++    );
++
++    Ok(())
++}
\ No newline at end of file
kernel/tests/write/partitioned.rs
@@ -1,175 +0,0 @@
-diff --git a/kernel/tests/write/partitioned.rs b/kernel/tests/write/partitioned.rs
---- a/kernel/tests/write/partitioned.rs
-+++ b/kernel/tests/write/partitioned.rs
- 
-     Ok(())
- }
-+
-+// ==============================================================================
-+// NOT NULL partition column tests
-+// ==============================================================================
-+//
-+// See [#2465](https://github.com/delta-io/delta-kernel-rs/issues/2465). These tests pin the
-+// kernel contract that mirrors Delta-Spark's `DELTA_NOT_NULL_CONSTRAINT_VIOLATED` (SQLSTATE
-+// 23502): a NULL written into a `NOT NULL` partition column must be rejected on the default
-+// engine. The two arms exercise the two enforcement seams:
-+//
-+// 1. **Non-materialized** (default): `Transaction::generate_logical_to_physical` drops partition
-+//    columns from the batch before Parquet write, so the partition-value map is the authority. The
-+//    fix in `validate_types` rejects null `Scalar`s for `nullable: false` partition columns at
-+//    `partitioned_write_context` time.
-+// 2. **Materialized** (`materializePartitionColumns` writer feature): partition columns stay in the
-+//    batch through the transform, so a null in the NOT NULL column is rejected by `arrow-rs` at
-+//    `RecordBatch::try_new`, matching the data-column NOT NULL contract.
-+
-+/// Non-materialized: a null partition `Scalar` for a `NOT NULL` partition column is rejected
-+/// at `partitioned_write_context` (i.e. before `engine.write_parquet` is called and before
-+/// any path encoding). This is the partition-map authority described in the issue plan.
-+///
-+/// The CM axis is orthogonal to nullability validation -- `validate_types` runs against
-+/// logical column names and field nullability before serialization. Unit tests in
-+/// `kernel/src/partition/validation.rs` cover the type matrix; this test only confirms the
-+/// e2e wiring through `Transaction`.
-+#[rstest]
-+#[case::cm_none(ColumnMappingMode::None)]
-+#[case::cm_name(ColumnMappingMode::Name)]
-+#[case::cm_id(ColumnMappingMode::Id)]
-+#[tokio::test(flavor = "multi_thread")]
-+async fn test_partition_not_null_rejects_null_scalar_non_materialized(
-+    #[case] cm_mode: ColumnMappingMode,
-+) -> Result<(), Box<dyn std::error::Error>> {
-+    let _ = tracing_subscriber::fmt::try_init();
-+
-+    // Schema: a nullable data column + a NOT NULL string partition column.
-+    let schema = Arc::new(StructType::try_new(vec![
-+        StructField::nullable("value", DataType::INTEGER),
-+        StructField::not_null("p", DataType::STRING),
-+    ])?);
-+    let partition_cols = ["p"];
-+
-+    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
-+    let snapshot = create_partitioned_table(
-+        &table_path,
-+        engine.as_ref(),
-+        schema,
-+        &partition_cols,
-+        cm_mode,
-+    )?;
-+
-+    // The transform drops the partition column from the batch, so the partition-value map
-+    // is the only place a null can show up for `p`. Driving `partitioned_write_context` with
-+    // `Scalar::Null` must surface kernel's NOT NULL rejection from `validate_types`.
-+    let txn = snapshot
-+        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
-+        .with_engine_info("default engine");
-+    let result = txn.partitioned_write_context(HashMap::from([(
-+        "p".to_string(),
-+        Scalar::Null(DataType::STRING),
-+    )]));
-+
-+    let err = result
-+        .err()
-+        .ok_or("expected partitioned_write_context to error for null into NOT NULL partition")?
-+        .to_string();
-+    assert!(err.contains("not nullable"), "{err}");
-+    assert!(err.contains("'p'"), "{err}");
-+
-+    Ok(())
-+}
-+
-+/// Non-materialized counterpart: a non-null partition `Scalar` for a `NOT NULL` partition
-+/// column is accepted, and the partition-map authority test does not regress the legal
-+/// path. Confirms our error case in
-+/// [`test_partition_not_null_rejects_null_scalar_non_materialized`] is shape-driven, not a
-+/// blanket rejection.
-+#[tokio::test(flavor = "multi_thread")]
-+async fn test_partition_not_null_accepts_non_null_scalar_non_materialized(
-+) -> Result<(), Box<dyn std::error::Error>> {
-+    let _ = tracing_subscriber::fmt::try_init();
-+
-+    let schema = Arc::new(StructType::try_new(vec![
-+        StructField::nullable("value", DataType::INTEGER),
-+        StructField::not_null("p", DataType::STRING),
-+    ])?);
-+    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
-+    let snapshot = create_partitioned_table(
-+        &table_path,
-+        engine.as_ref(),
-+        schema,
-+        &["p"],
-+        ColumnMappingMode::None,
-+    )?;
-+
-+    let txn = snapshot
-+        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
-+        .with_engine_info("default engine");
-+    txn.partitioned_write_context(HashMap::from([(
-+        "p".to_string(),
-+        Scalar::String("a".into()),
-+    )]))?;
-+
-+    Ok(())
-+}
-+
-+/// Materialized arm: with `materializePartitionColumns` enabled, partition columns stay in
-+/// the logical batch (`Transaction::generate_logical_to_physical` does not drop them). The
-+/// kernel-derived Arrow schema preserves `nullable: false`, and `arrow-rs` rejects a null
-+/// in the NOT NULL partition column at `RecordBatch::try_new` -- the same enforcement seam
-+/// as a NOT NULL data column. The feature is enabled at create time via the explicit
-+/// feature signal `delta.feature.materializePartitionColumns=supported`.
-+#[tokio::test(flavor = "multi_thread")]
-+async fn test_partition_not_null_rejects_null_in_batch_materialized(
-+) -> Result<(), Box<dyn std::error::Error>> {
-+    let _ = tracing_subscriber::fmt::try_init();
-+
-+    let schema = Arc::new(StructType::try_new(vec![
-+        StructField::nullable("value", DataType::INTEGER),
-+        StructField::not_null("p", DataType::STRING),
-+    ])?);
-+
-+    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
-+    let _ = create_table(&table_path, schema.clone(), "test/1.0")
-+        .with_data_layout(DataLayout::partitioned(["p"]))
-+        .with_table_properties([("delta.feature.materializePartitionColumns", "supported")])
-+        .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
-+        .commit(engine.as_ref())?;
-+
-+    let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
-+    assert!(
-+        snapshot.table_configuration().is_feature_supported(
-+            &delta_kernel::table_features::TableFeature::MaterializePartitionColumns
-+        ),
-+        "test setup must enable materializePartitionColumns"
-+    );
-+
-+    // Partitioned write context with a non-null partition value. The materialize feature
-+    // means the partition column stays in the logical batch; constructing that batch with a
-+    // null in the NOT NULL `p` column is the failure surface we care about.
-+    let txn = snapshot
-+        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
-+        .with_engine_info("default engine");
-+    let _write_context = txn.partitioned_write_context(HashMap::from([(
-+        "p".to_string(),
-+        Scalar::String("a".into()),
-+    )]))?;
-+
-+    let arrow_schema: ArrowSchema = schema.as_ref().try_into_arrow()?;
-+    assert!(
-+        !arrow_schema.field_with_name("p")?.is_nullable(),
-+        "kernel `not_null` partition field must produce Arrow `nullable: false`",
-+    );
-+
-+    let result = RecordBatch::try_new(
-+        Arc::new(arrow_schema),
-+        vec![
-+            Arc::new(Int32Array::from(vec![Some(1)])),
-+            Arc::new(StringArray::from(vec![None as Option<&str>])),
-+        ],
-+    );
-+    assert!(
-+        result.is_err(),
-+        "RecordBatch::try_new should reject null in NOT NULL materialized partition column; got: {result:?}",
-+    );
-+
-+    Ok(())
-+}
\ No newline at end of file
kernel/tests/write/types.rs
@@ -1,159 +0,0 @@
-diff --git a/kernel/tests/write/types.rs b/kernel/tests/write/types.rs
---- a/kernel/tests/write/types.rs
-+++ b/kernel/tests/write/types.rs
- use std::sync::Arc;
- 
- use delta_kernel::arrow::array::{
--    ArrayRef, BinaryArray, Int32Array, StructArray, TimestampMicrosecondArray,
-+    ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Int32Array, Int64Array, StringArray,
-+    StructArray, TimestampMicrosecondArray,
- };
- use delta_kernel::arrow::buffer::NullBuffer;
- use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field};
- use delta_kernel::object_store::path::Path;
- use delta_kernel::object_store::ObjectStoreExt as _;
- use delta_kernel::schema::{DataType, StructField, StructType};
-+use delta_kernel::transaction::create_table::create_table as kernel_create_table;
- use delta_kernel::{Engine, Error as KernelError, Snapshot};
- use itertools::Itertools;
-+use rstest::rstest;
- use serde_json::Deserializer;
- use tempfile::tempdir;
--use test_utils::{create_table, engine_store_setup, test_read};
-+use test_utils::{create_table, engine_store_setup, test_read, test_table_setup};
- use url::Url;
- 
- #[tokio::test]
- 
-     Ok(())
- }
-+
-+// =============================================================================
-+// NOT NULL data column tests (default engine)
-+// =============================================================================
-+//
-+// These tests pin the contract documented on `maybe_enable_invariants` (see
-+// `kernel/src/transaction/builder/create_table.rs`): kernel does not enforce
-+// nullability at write time -- it relies on the engine's `ParquetHandler` to
-+// do so. The default engine inherits the guarantee from `arrow-rs`, which
-+// rejects null values for fields with `nullable: false` at
-+// `RecordBatch::try_new`.
-+//
-+// Each case creates the table via the kernel `create_table` builder so the
-+// non-null schema drives the auto-enablement of the `invariants` writer
-+// feature end-to-end (the protocol is not hand-crafted). The test then opens
-+// the standard write path (snapshot + transaction + `unpartitioned_write_context`)
-+// and asserts that the input `RecordBatch` for `engine.write_parquet` cannot
-+// be constructed with a null in the NOT NULL column. The failure surfaces
-+// before the engine is ever invoked.
-+
-+fn null_array_int32() -> ArrayRef {
-+    Arc::new(Int32Array::from(vec![None as Option<i32>]))
-+}
-+
-+fn null_array_int64() -> ArrayRef {
-+    Arc::new(Int64Array::from(vec![None as Option<i64>]))
-+}
-+
-+fn null_array_string() -> ArrayRef {
-+    Arc::new(StringArray::from(vec![None as Option<&str>]))
-+}
-+
-+fn null_array_binary() -> ArrayRef {
-+    Arc::new(BinaryArray::from(vec![None as Option<&[u8]>]))
-+}
-+
-+fn null_array_boolean() -> ArrayRef {
-+    Arc::new(BooleanArray::from(vec![None as Option<bool>]))
-+}
-+
-+fn null_array_timestamp_utc() -> ArrayRef {
-+    Arc::new(TimestampMicrosecondArray::from(vec![None as Option<i64>]).with_timezone("UTC"))
-+}
-+
-+fn null_array_decimal_10_2() -> ArrayRef {
-+    Arc::new(
-+        Decimal128Array::from(vec![None as Option<i128>])
-+            .with_precision_and_scale(10, 2)
-+            .unwrap(),
-+    )
-+}
-+
-+/// Verifies the default-engine NOT NULL contract for non-partition columns
-+/// across a representative set of primitive types. The contract:
-+///
-+/// 1. `nullable: false` propagates from kernel's `StructField` into the Arrow logical schema
-+///    produced by `try_into_arrow`.
-+/// 2. Constructing the Arrow `RecordBatch` an engine would hand to `engine.write_parquet` with a
-+///    null in the NOT NULL column fails at `RecordBatch::try_new`, before the engine is invoked.
-+///
-+/// See [#2465](https://github.com/delta-io/delta-kernel-rs/issues/2465).
-+#[rstest]
-+#[case::integer(DataType::INTEGER, null_array_int32 as fn() -> ArrayRef)]
-+#[case::long(DataType::LONG, null_array_int64 as fn() -> ArrayRef)]
-+#[case::string(DataType::STRING, null_array_string as fn() -> ArrayRef)]
-+#[case::binary(DataType::BINARY, null_array_binary as fn() -> ArrayRef)]
-+#[case::boolean(DataType::BOOLEAN, null_array_boolean as fn() -> ArrayRef)]
-+#[case::timestamp(DataType::TIMESTAMP, null_array_timestamp_utc as fn() -> ArrayRef)]
-+#[case::decimal(DataType::decimal(10, 2).unwrap(), null_array_decimal_10_2 as fn() -> ArrayRef)]
-+#[tokio::test]
-+async fn test_not_null_data_column_rejects_null_in_batch(
-+    #[case] data_type: DataType,
-+    #[case] null_array_fn: fn() -> ArrayRef,
-+) -> Result<(), Box<dyn std::error::Error>> {
-+    let _ = tracing_subscriber::fmt::try_init();
-+
-+    // Step 1: Create the table via the kernel builder. The non-null schema is
-+    // the only signal needed -- `maybe_enable_invariants` auto-adds the
-+    // `invariants` writer feature. The test exercises the full chain
-+    // (schema -> auto-enable -> engine enforcement) rather than hand-crafting
-+    // the protocol.
-+    let schema = Arc::new(StructType::try_new(vec![StructField::not_null(
-+        "c",
-+        data_type.clone(),
-+    )])?);
-+    let (_tmp_dir, table_path, engine) = test_table_setup()?;
-+    let _ = kernel_create_table(&table_path, schema.clone(), "test/1.0")
-+        .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
-+        .commit(engine.as_ref())?;
-+
-+    // Step 2: Confirm the protocol auto-enabled `invariants` -- the upstream
-+    // half of the contract this test pins.
-+    let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
-+    assert!(
-+        snapshot
-+            .table_configuration()
-+            .protocol()
-+            .writer_features()
-+            .is_some_and(|f| f.contains(&delta_kernel::table_features::TableFeature::Invariants)),
-+        "non-null schema must auto-enable the `invariants` writer feature",
-+    );
-+
-+    // Step 3: Open the standard default-engine write path. Going through
-+    // `Snapshot::transaction` and `unpartitioned_write_context` exercises the
-+    // exact setup an engine performs prior to calling `engine.write_parquet`.
-+    let txn = snapshot
-+        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
-+        .with_engine_info("default engine");
-+    let _write_context = txn.unpartitioned_write_context()?;
-+
-+    // Step 4: Confirm `nullable: false` propagates from kernel into the
-+    // logical Arrow schema engines build batches against.
-+    let arrow_schema: delta_kernel::arrow::datatypes::Schema = schema.as_ref().try_into_arrow()?;
-+    assert!(
-+        !arrow_schema.field(0).is_nullable(),
-+        "kernel `not_null` field must produce Arrow `nullable: false`",
-+    );
-+
-+    // Step 5: Building the input `RecordBatch` for `engine.write_parquet` with
-+    // a null in the NOT NULL column must fail at `RecordBatch::try_new`,
-+    // before the engine is called. This is the default-engine NOT NULL
-+    // enforcement seam documented on `maybe_enable_invariants`.
-+    let result = RecordBatch::try_new(Arc::new(arrow_schema), vec![null_array_fn()]);
-+    assert!(
-+        result.is_err(),
-+        "RecordBatch::try_new should reject null in NOT NULL column ({data_type:?}); got: {result:?}",
-+    );
-+
-+    Ok(())
-+}
\ No newline at end of file

Reproduce locally: git range-diff 32eead5..7c554cc a97b11b..fb4cfa6 | Disable: git config gitstack.push-range-diff false

@sanujbasu
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/materialize-partition-columns-cleanup (fb4cfa6 -> e7c821d)
CLAUDE.md
@@ -0,0 +1,76 @@
+diff --git a/CLAUDE.md b/CLAUDE.md
+--- a/CLAUDE.md
++++ b/CLAUDE.md
+   whether a new test duplicates the flow of an existing nearby test and should be
+   merged into it as a new `#[case]`. A common pattern is toggling a feature (e.g.
+   column mapping on/off) and asserting success vs. error.
+-- Reuse helpers from `test_utils` instead of writing custom ones when possible.
++- Reuse helpers from `test_utils` and the integration-test fixtures instead of writing
++  custom ones when possible. See **Common test helpers** below for a curated starter list.
+ - **Committing in tests:** Use `txn.commit(engine)?.unwrap_committed()` to assert a
+   successful commit and get the `CommittedTransaction`. Do NOT use `match` + `panic!`
+   for this -- `unwrap_committed()` provides a clear error message on failure. Available
+   both `add_commit` (writing log files) and `snapshot`/`Snapshot::try_new` (reading the
+   table). Always include a trailing slash in directory URLs to ensure correct path joining.
+ 
++### Common test helpers
++
++Before writing a custom helper, check this curated list and the locations below.
++This list is non-exhaustive -- when in doubt, browse the source files directly
++(`test-utils/src/lib.rs`, `kernel/tests/integration/common/`,
++`kernel/tests/integration/<topic>/mod.rs`).
++
++**Arrow construction (from `delta_kernel::arrow`)**
++
++- `arrow::array::new_null_array(&arrow_type, n)` -- Arrow array of `n` nulls of any Arrow
++  type. Prefer this over per-type `Int32Array::from(vec![None as Option<i32>])` builders.
++- `engine::arrow_conversion::TryIntoArrow`:
++  `(&kernel_data_type).try_into_arrow()` for `DataType`,
++  `(&kernel_struct_type).try_into_arrow()` for `StructType` -> Arrow `Schema`.
++
++**Engine + table setup (from `test_utils`)**
++
++- `test_table_setup()` / `test_table_setup_mt()` -- engine + temp table path. Use the `_mt`
++  variant under `#[tokio::test(flavor = "multi_thread")]`.
++- `engine_store_setup(name, opts)` -- returns `(store, engine, table_location)` when a test
++  needs direct object-store access.
++- `setup_test_tables(...)` -- multiple pre-built tables for read/scan tests.
++
++**Table creation in tests**
++
++- Prefer the kernel `create_table` builder
++  (`delta_kernel::transaction::create_table::create_table`). It exercises the same path
++  connectors use and auto-derives the protocol from the schema and feature flags.
++- `test_utils::create_table` (a JSON helper that hand-rolls protocol + metadata) is older
++  but still needed when the kernel builder cannot enable a particular feature combination.
++
++**Schema fixtures**
++
++- `test_utils`: `nested_schema`, `schema_with_type`, `nested_schema_with_type`,
++  `multi_schema_with_type`, `top_level_ntz_schema` / `nested_ntz_schema` /
++  `multiple_ntz_schema`, `top_level_variant_schema` / `nested_variant_schema` /
++  `multiple_variant_schema`.
++- `kernel/tests/integration/create_table/mod.rs`: `simple_schema`, `partition_test_schema`.
++
++**Commit + read helpers (from `test_utils`)**
++
++- `add_commit`, `add_staged_commit` -- write a JSON commit at a given version.
++- `read_actions_from_commit` -- read raw JSON actions from a specific commit. Use this
++  instead of hand-rolled `serde_json` parsing.
++- `test_read` -- full-scan read of a table; use for round-trip assertions.
++- `into_record_batch` -- convert `Box<dyn EngineData>` to Arrow `RecordBatch`.
++
++**Assertion helpers (from `test_utils`)**
++
++- `assert_schema_has_field(schema, &["a", "b"])` -- assert a (possibly nested) field path.
++- `assert_result_error_with_message(result, "needle")` -- assert an error contains a
++  substring.
++
++**If a name here doesn't match what's in code:** the list may have drifted from a rename.
++Run `rg '^pub (fn|async fn)' test-utils/src/lib.rs` to discover the current public surface,
++and update this section in your PR. The same pattern works for
++`kernel/tests/integration/common/write_utils.rs`.
++
+ ## Protocol TLDR
+ 
+ The [Delta protocol spec](https://raw.githubusercontent.com/delta-io/delta/master/PROTOCOL.md)
\ No newline at end of file
kernel/src/partition/validation.rs
@@ -5,7 +5,7 @@
  //!   case, detects post-normalization duplicates).
  //! - [`validate_types`]: checks that each `Scalar`'s type matches the partition column's schema
 -//!   type. Null scalars skip the type check.
-+//!   type, and that null scalars are only used for nullable partition columns. Null scalars skip
++//!   type, and that non-null partition columns are never assigned null scalars. Null scalars skip
 +//!   the value type check; the column's nullability is still enforced.
  
  use std::collections::HashMap;
@@ -16,9 +16,7 @@
 -/// partition column's type in the table schema. Null scalars skip the type check
 -/// (null is valid for any partition column type).
 +/// partition column's type in the table schema. Null scalars skip the value type check, but a
-+/// null is only legal when the schema field is nullable. This matches Delta-Spark's
-+/// `DELTA_NOT_NULL_CONSTRAINT_VIOLATED` rejection of NULL writes into a NOT NULL partition
-+/// column.
++/// null is only legal when the schema field is nullable.
  ///
  /// # Parameters
  /// - `logical_schema`: the table's logical schema.
@@ -32,8 +30,6 @@
              )));
          }
          if value.is_null() {
-+            // Null partition values are only valid for nullable partition columns. This
-+            // mirrors Spark `DELTA_NOT_NULL_CONSTRAINT_VIOLATED` (SQLSTATE 23502).
 +            if !field.nullable {
 +                return Err(Error::invalid_partition_values(format!(
 +                    "partition column '{col_name}' is not nullable but received a null value"
kernel/src/transaction/mod.rs
@@ -0,0 +1,12 @@
+diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs
+--- a/kernel/src/transaction/mod.rs
++++ b/kernel/src/transaction/mod.rs
+     /// - **Type checking**: rejects non-primitive partition column types (struct, array, map) and
+     ///   validates that each non-null `Scalar`'s type matches the partition column's schema type.
+     ///   For example, passing `Scalar::String("2024")` for an `INTEGER` column returns an error.
+-    ///   Null scalars skip the value type check (null is valid for any primitive partition column).
++    ///   Null scalars skip the value type check, but a null is only legal when the partition column
++    ///   is nullable; passing a null for a `nullable: false` partition column returns an error.
+     ///
+     /// - **Value serialization**: serializes each `Scalar` to a protocol-compliant string per the
+     ///   Delta protocol's "Partition Value Serialization" rules. `Scalar::Null(...)` becomes `None`
\ No newline at end of file
kernel/tests/integration/write/partitioned.rs
@@ -14,29 +14,46 @@
 +// 23502): a NULL written into a `NOT NULL` partition column must be rejected on the default
 +// engine. The two arms exercise the two enforcement seams:
 +//
-+// 1. **Non-materialized** (default): `Transaction::generate_logical_to_physical` drops partition
++// 1. Non-materialized (default): `Transaction::generate_logical_to_physical` drops partition
 +//    columns from the batch before Parquet write, so the partition-value map is the authority. The
 +//    fix in `validate_types` rejects null `Scalar`s for `nullable: false` partition columns at
 +//    `partitioned_write_context` time.
-+// 2. **Materialized** (`materializePartitionColumns` writer feature): partition columns stay in the
++// 2. Materialized (`materializePartitionColumns` writer feature): partition columns stay in the
 +//    batch through the transform, so a null in the NOT NULL column is rejected by `arrow-rs` at
 +//    `RecordBatch::try_new`, matching the data-column NOT NULL contract.
 +
-+/// Non-materialized: a null partition `Scalar` for a `NOT NULL` partition column is rejected
-+/// at `partitioned_write_context` (i.e. before `engine.write_parquet` is called and before
-+/// any path encoding). This is the partition-map authority described in the issue plan.
-+///
-+/// The CM axis is orthogonal to nullability validation -- `validate_types` runs against
-+/// logical column names and field nullability before serialization. Unit tests in
-+/// `kernel/src/partition/validation.rs` cover the type matrix; this test only confirms the
-+/// e2e wiring through `Transaction`.
++/// `validate_types` enforces the NOT NULL contract on partition values for the non-materialized
++/// path: nulls into a `nullable: false` partition column are rejected before serialization, and
++/// non-null values pass through unchanged. The CM axis is orthogonal -- validation runs against
++/// logical names and field nullability before any column-mapping renaming. Unit tests in
++/// `kernel/src/partition/validation.rs` cover the full type matrix; this test pins the e2e
++/// wiring through `Transaction::partitioned_write_context`.
 +#[rstest]
-+#[case::cm_none(ColumnMappingMode::None)]
-+#[case::cm_name(ColumnMappingMode::Name)]
-+#[case::cm_id(ColumnMappingMode::Id)]
++// Reject: null Scalar into NOT NULL partition column, all CM modes.
++#[case::cm_none_null(
++    ColumnMappingMode::None,
++    Scalar::Null(DataType::STRING),
++    Some("not nullable")
++)]
++#[case::cm_name_null(
++    ColumnMappingMode::Name,
++    Scalar::Null(DataType::STRING),
++    Some("not nullable")
++)]
++#[case::cm_id_null(
++    ColumnMappingMode::Id,
++    Scalar::Null(DataType::STRING),
++    Some("not nullable")
++)]
++// Accept: non-null Scalar into NOT NULL partition column, all CM modes.
++#[case::cm_none_non_null(ColumnMappingMode::None, Scalar::String("a".into()), None)]
++#[case::cm_name_non_null(ColumnMappingMode::Name, Scalar::String("a".into()), None)]
++#[case::cm_id_non_null(ColumnMappingMode::Id, Scalar::String("a".into()), None)]
 +#[tokio::test(flavor = "multi_thread")]
-+async fn test_partition_not_null_rejects_null_scalar_non_materialized(
++async fn test_partition_null_validation_non_materialized(
 +    #[case] cm_mode: ColumnMappingMode,
++    #[case] value: Scalar,
++    #[case] expected_err: Option<&str>,
 +) -> Result<(), Box<dyn std::error::Error>> {
 +    let _ = tracing_subscriber::fmt::try_init();
 +
@@ -45,69 +62,31 @@
 +        StructField::nullable("value", DataType::INTEGER),
 +        StructField::not_null("p", DataType::STRING),
 +    ])?);
-+    let partition_cols = ["p"];
 +
 +    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
-+    let snapshot = create_partitioned_table(
-+        &table_path,
-+        engine.as_ref(),
-+        schema,
-+        &partition_cols,
-+        cm_mode,
-+    )?;
++    let snapshot = create_partitioned_table(&table_path, engine.as_ref(), schema, &["p"], cm_mode)?;
 +
-+    // The transform drops the partition column from the batch, so the partition-value map
-+    // is the only place a null can show up for `p`. Driving `partitioned_write_context` with
-+    // `Scalar::Null` must surface kernel's NOT NULL rejection from `validate_types`.
 +    let txn = snapshot
 +        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
 +        .with_engine_info("default engine");
-+    let result = txn.partitioned_write_context(HashMap::from([(
-+        "p".to_string(),
-+        Scalar::Null(DataType::STRING),
-+    )]));
++    let result = txn.partitioned_write_context(HashMap::from([("p".to_string(), value)]));
 +
-+    let err = result
-+        .err()
-+        .ok_or("expected partitioned_write_context to error for null into NOT NULL partition")?
-+        .to_string();
-+    assert!(err.contains("not nullable"), "{err}");
-+    assert!(err.contains("'p'"), "{err}");
-+
-+    Ok(())
-+}
-+
-+/// Non-materialized counterpart: a non-null partition `Scalar` for a `NOT NULL` partition
-+/// column is accepted, and the partition-map authority test does not regress the legal
-+/// path. Confirms our error case in
-+/// [`test_partition_not_null_rejects_null_scalar_non_materialized`] is shape-driven, not a
-+/// blanket rejection.
-+#[tokio::test(flavor = "multi_thread")]
-+async fn test_partition_not_null_accepts_non_null_scalar_non_materialized(
-+) -> Result<(), Box<dyn std::error::Error>> {
-+    let _ = tracing_subscriber::fmt::try_init();
-+
-+    let schema = Arc::new(StructType::try_new(vec![
-+        StructField::nullable("value", DataType::INTEGER),
-+        StructField::not_null("p", DataType::STRING),
-+    ])?);
-+    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
-+    let snapshot = create_partitioned_table(
-+        &table_path,
-+        engine.as_ref(),
-+        schema,
-+        &["p"],
-+        ColumnMappingMode::None,
-+    )?;
++    match expected_err {
++        Some(needle) => {
++            let err = result
++                .err()
++                .ok_or(
++                    "expected partitioned_write_context to error for null into NOT NULL partition",
++                )?
++                .to_string();
++            assert!(err.contains(needle), "{err}");
++            assert!(err.contains("'p'"), "{err}");
++        }
++        None => {
++            result?;
++        }
++    }
 +
-+    let txn = snapshot
-+        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
-+        .with_engine_info("default engine");
-+    txn.partitioned_write_context(HashMap::from([(
-+        "p".to_string(),
-+        Scalar::String("a".into()),
-+    )]))?;
-+
 +    Ok(())
 +}
 +
@@ -118,7 +97,7 @@
 +/// as a NOT NULL data column. The feature is enabled at create time via the explicit
 +/// feature signal `delta.feature.materializePartitionColumns=supported`.
 +#[tokio::test(flavor = "multi_thread")]
-+async fn test_partition_not_null_rejects_null_in_batch_materialized(
++async fn test_partition_null_validation_in_batch_materialized(
 +) -> Result<(), Box<dyn std::error::Error>> {
 +    let _ = tracing_subscriber::fmt::try_init();
 +
@@ -136,7 +115,7 @@
 +
 +    let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
 +    assert!(
-+        snapshot.table_configuration().is_feature_supported(
++        snapshot.table_configuration().is_feature_enabled(
 +            &delta_kernel::table_features::TableFeature::MaterializePartitionColumns
 +        ),
 +        "test setup must enable materializePartitionColumns"
kernel/tests/integration/write/types.rs
@@ -5,8 +5,7 @@
  
  use delta_kernel::arrow::array::{
 -    ArrayRef, BinaryArray, Int32Array, StructArray, TimestampMicrosecondArray,
-+    ArrayRef, BinaryArray, BooleanArray, Decimal128Array, Int32Array, Int64Array, StringArray,
-+    StructArray, TimestampMicrosecondArray,
++    new_null_array, ArrayRef, BinaryArray, Int32Array, StructArray, TimestampMicrosecondArray,
  };
  use delta_kernel::arrow::buffer::NullBuffer;
  use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field};
@@ -46,68 +45,34 @@
 +// and asserts that the input `RecordBatch` for `engine.write_parquet` cannot
 +// be constructed with a null in the NOT NULL column. The failure surfaces
 +// before the engine is ever invoked.
-+
-+fn null_array_int32() -> ArrayRef {
-+    Arc::new(Int32Array::from(vec![None as Option<i32>]))
-+}
-+
-+fn null_array_int64() -> ArrayRef {
-+    Arc::new(Int64Array::from(vec![None as Option<i64>]))
-+}
-+
-+fn null_array_string() -> ArrayRef {
-+    Arc::new(StringArray::from(vec![None as Option<&str>]))
-+}
-+
-+fn null_array_binary() -> ArrayRef {
-+    Arc::new(BinaryArray::from(vec![None as Option<&[u8]>]))
-+}
-+
-+fn null_array_boolean() -> ArrayRef {
-+    Arc::new(BooleanArray::from(vec![None as Option<bool>]))
-+}
-+
-+fn null_array_timestamp_utc() -> ArrayRef {
-+    Arc::new(TimestampMicrosecondArray::from(vec![None as Option<i64>]).with_timezone("UTC"))
-+}
 +
-+fn null_array_decimal_10_2() -> ArrayRef {
-+    Arc::new(
-+        Decimal128Array::from(vec![None as Option<i128>])
-+            .with_precision_and_scale(10, 2)
-+            .unwrap(),
-+    )
-+}
-+
 +/// Verifies the default-engine NOT NULL contract for non-partition columns
 +/// across a representative set of primitive types. The contract:
 +///
-+/// 1. `nullable: false` propagates from kernel's `StructField` into the Arrow logical schema
-+///    produced by `try_into_arrow`.
++/// 1. `nullable: false` propagates from kernel's `StructField` through the connector-facing
++///    physical schema (`WriteContext::physical_schema`) and into the Arrow logical schema engines
++///    build batches against.
 +/// 2. Constructing the Arrow `RecordBatch` an engine would hand to `engine.write_parquet` with a
 +///    null in the NOT NULL column fails at `RecordBatch::try_new`, before the engine is invoked.
 +///
 +/// See [#2465](https://github.com/delta-io/delta-kernel-rs/issues/2465).
 +#[rstest]
-+#[case::integer(DataType::INTEGER, null_array_int32 as fn() -> ArrayRef)]
-+#[case::long(DataType::LONG, null_array_int64 as fn() -> ArrayRef)]
-+#[case::string(DataType::STRING, null_array_string as fn() -> ArrayRef)]
-+#[case::binary(DataType::BINARY, null_array_binary as fn() -> ArrayRef)]
-+#[case::boolean(DataType::BOOLEAN, null_array_boolean as fn() -> ArrayRef)]
-+#[case::timestamp(DataType::TIMESTAMP, null_array_timestamp_utc as fn() -> ArrayRef)]
-+#[case::decimal(DataType::decimal(10, 2).unwrap(), null_array_decimal_10_2 as fn() -> ArrayRef)]
++#[case::integer(DataType::INTEGER)]
++#[case::long(DataType::LONG)]
++#[case::string(DataType::STRING)]
++#[case::binary(DataType::BINARY)]
++#[case::boolean(DataType::BOOLEAN)]
++#[case::timestamp(DataType::TIMESTAMP)]
++#[case::decimal(DataType::decimal(10, 2).unwrap())]
 +#[tokio::test]
 +async fn test_not_null_data_column_rejects_null_in_batch(
 +    #[case] data_type: DataType,
-+    #[case] null_array_fn: fn() -> ArrayRef,
 +) -> Result<(), Box<dyn std::error::Error>> {
 +    let _ = tracing_subscriber::fmt::try_init();
 +
-+    // Step 1: Create the table via the kernel builder. The non-null schema is
-+    // the only signal needed -- `maybe_enable_invariants` auto-adds the
-+    // `invariants` writer feature. The test exercises the full chain
-+    // (schema -> auto-enable -> engine enforcement) rather than hand-crafting
-+    // the protocol.
++    // Step 1: Create the table via the kernel builder. The non-null schema is the only signal
++    // needed: kernel auto-adds the `invariants` writer feature. The test exercises the full
++    // chain (schema -> auto-enable -> engine enforcement) rather than hand-crafting the protocol.
 +    let schema = Arc::new(StructType::try_new(vec![StructField::not_null(
 +        "c",
 +        data_type.clone(),
@@ -117,8 +82,8 @@
 +        .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
 +        .commit(engine.as_ref())?;
 +
-+    // Step 2: Confirm the protocol auto-enabled `invariants` -- the upstream
-+    // half of the contract this test pins.
++    // Step 2: Confirm the protocol auto-enabled `invariants` -- the upstream half of the
++    // contract this test pins.
 +    let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
 +    assert!(
 +        snapshot
@@ -129,27 +94,37 @@
 +        "non-null schema must auto-enable the `invariants` writer feature",
 +    );
 +
-+    // Step 3: Open the standard default-engine write path. Going through
-+    // `Snapshot::transaction` and `unpartitioned_write_context` exercises the
-+    // exact setup an engine performs prior to calling `engine.write_parquet`.
++    // Step 3: Open the standard default-engine write path. Going through `Snapshot::transaction`
++    // and `unpartitioned_write_context` exercises the exact setup an engine performs prior to
++    // calling `engine.write_parquet`.
 +    let txn = snapshot
 +        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
 +        .with_engine_info("default engine");
-+    let _write_context = txn.unpartitioned_write_context()?;
++    let write_context = txn.unpartitioned_write_context()?;
 +
-+    // Step 4: Confirm `nullable: false` propagates from kernel into the
-+    // logical Arrow schema engines build batches against.
-+    let arrow_schema: delta_kernel::arrow::datatypes::Schema = schema.as_ref().try_into_arrow()?;
++    // Step 4: Confirm the connector-facing physical schema preserves `nullable: false`. This is
++    // the schema engines must use to build batches; using the logical schema in tests would hide
++    // bugs in the logical->physical mapping.
++    let physical = write_context.physical_schema();
++    let physical_field = physical
++        .field("c")
++        .expect("physical schema must contain column 'c'");
++    assert!(
++        !physical_field.nullable,
++        "physical schema must preserve `nullable: false` for column 'c'",
++    );
++    let arrow_schema: delta_kernel::arrow::datatypes::Schema =
++        physical.as_ref().try_into_arrow()?;
 +    assert!(
 +        !arrow_schema.field(0).is_nullable(),
-+        "kernel `not_null` field must produce Arrow `nullable: false`",
++        "Arrow conversion of physical schema must preserve `nullable: false`",
 +    );
 +
-+    // Step 5: Building the input `RecordBatch` for `engine.write_parquet` with
-+    // a null in the NOT NULL column must fail at `RecordBatch::try_new`,
-+    // before the engine is called. This is the default-engine NOT NULL
-+    // enforcement seam documented on `maybe_enable_invariants`.
-+    let result = RecordBatch::try_new(Arc::new(arrow_schema), vec![null_array_fn()]);
++    // Step 5: Building the input `RecordBatch` for `engine.write_parquet` with a null in the
++    // NOT NULL column must fail at `RecordBatch::try_new`, before the engine is called. This
++    // is the default-engine NOT NULL enforcement seam.
++    let arrow_type = arrow_schema.field(0).data_type().clone();
++    let result = RecordBatch::try_new(Arc::new(arrow_schema), vec![new_null_array(&arrow_type, 1)]);
 +    assert!(
 +        result.is_err(),
 +        "RecordBatch::try_new should reject null in NOT NULL column ({data_type:?}); got: {result:?}",

Reproduce locally: git range-diff a97b11b..fb4cfa6 b61bf96..e7c821d | Disable: git config gitstack.push-range-diff false

@sanujbasu sanujbasu force-pushed the stack/enforce-non-null-columns branch from fb4cfa6 to e7c821d Compare April 29, 2026 21:55
Comment thread kernel/tests/integration/write/partitioned.rs
Comment thread kernel/tests/integration/write/partitioned.rs Outdated
Comment thread kernel/tests/integration/write/partitioned.rs Outdated
Comment thread kernel/tests/integration/write/types.rs Outdated
Comment thread kernel/tests/integration/write/types.rs Outdated
Comment thread kernel/tests/integration/write/types.rs Outdated
Comment thread kernel/tests/integration/write/types.rs Outdated
Comment thread CLAUDE.md
@sanujbasu
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/materialize-partition-columns-cleanup (e7c821d -> c59a39d)
kernel/src/partition/serialization.rs
@@ -0,0 +1,111 @@
+diff --git a/kernel/src/partition/serialization.rs b/kernel/src/partition/serialization.rs
+--- a/kernel/src/partition/serialization.rs
++++ b/kernel/src/partition/serialization.rs
+ // Binary            Raw UTF-8: String::from_utf8            strict, error on invalid
+ // Struct/Array/Map  Error                                   not valid partition types
+ 
++/// Whether a partition-value [`Scalar`] is equivalent to null under the Delta protocol's
++/// partition value serialization rules.
++///
++/// The protocol collapses three Scalar shapes onto JSON null in `AddFile.partitionValues`:
++/// `Scalar::Null(_)`, empty `Scalar::String`, and empty `Scalar::Binary`. Treating all
++/// three as null on write keeps the on-wire format type-erased (connectors can construct
++/// partition values without inspecting the schema) and matches Delta-Spark's pre-write
++/// SQL-layer normalization.
++///
++/// Read paths do not need this predicate: the on-wire collapse to JSON null is undone at
++/// JSON deserialization (null map entries are dropped before kernel sees a Scalar), so a
++/// "null" partition value never appears in Scalar form on reads.
++pub(crate) fn would_serialize_to_null(value: &Scalar) -> bool {
++    match value {
++        Scalar::Null(_) => true,
++        Scalar::String(s) if s.is_empty() => true,
++        Scalar::Binary(b) if b.is_empty() => true,
++        _ => false,
++    }
++}
++
+ /// Serializes a [`Scalar`] partition value to a protocol-compliant string for the
+ /// `partitionValues` map in Add actions.
+ ///
+-/// Returns `Ok(None)` for null values (regardless of the null's data type), empty strings,
+-/// and empty binary (the Delta protocol treats all three as null partition values). Returns
+-/// `Err` for non-null values of types that cannot be partition columns (Struct, Array, Map)
+-/// or for binary values that are not valid UTF-8.
++/// Returns `Ok(None)` for any value the Delta protocol treats as null in `partitionValues`:
++/// `Scalar::Null`, empty `Scalar::String`, and empty `Scalar::Binary`. Collapsing the three
++/// onto JSON null on write keeps the on-wire format type-erased and matches Delta-Spark's
++/// pre-write SQL-layer normalization. Returns `Err` for non-null values of types that
++/// cannot be partition columns (Struct, Array, Map) or for binary values that are not valid
++/// UTF-8.
+ ///
+ /// The inverse of [`PrimitiveType::parse_scalar`].
+ ///
+ /// [`PrimitiveType::parse_scalar`]: crate::schema::PrimitiveType::parse_scalar
+ pub fn serialize_partition_value(value: &Scalar) -> DeltaResult<Option<String>> {
++    if would_serialize_to_null(value) {
++        return Ok(None);
++    }
++    // Null/empty-string/empty-binary are checked up front,
++    // so the remaining arms only see non-null payloads.
+     match value {
+-        Scalar::Null(_) => Ok(None),
+-        Scalar::String(s) => Ok(if s.is_empty() { None } else { Some(s.clone()) }),
++        Scalar::Null(_) => unreachable!("nulls are checked up front"),
++        Scalar::String(s) => Ok(Some(s.clone())),
+         Scalar::Boolean(v) => Ok(Some(v.to_string())),
+         Scalar::Byte(v) => Ok(Some(v.to_string())),
+         Scalar::Short(v) => Ok(Some(v.to_string())),
+         Scalar::Timestamp(us) => Ok(Some(format_timestamp(*us)?)),
+         Scalar::TimestampNtz(us) => Ok(Some(format_timestamp_ntz(*us)?)),
+         Scalar::Decimal(d) => Ok(Some(format_decimal(d))),
+-        Scalar::Binary(b) => {
+-            if b.is_empty() {
+-                Ok(None)
+-            } else {
+-                Ok(Some(format_binary(b)?))
+-            }
+-        }
++        Scalar::Binary(b) => Ok(Some(format_binary(b)?)),
+         Scalar::Struct(_) | Scalar::Array(_) | Scalar::Map(_) => Err(Error::generic(format!(
+             "cannot serialize partition value: type {:?} is not a valid partition column type",
+             value.data_type()
+         assert_eq!(serialize_partition_value(&input).unwrap(), None);
+     }
+ 
++    /// Tests the canonical null-equivalence rule: `would_serialize_to_null` returns true for
++    /// exactly the three Scalar shapes that collapse to JSON null (`Scalar::Null`, empty
++    /// `Scalar::String`, empty `Scalar::Binary`) and false for everything else, including
++    /// non-empty strings/binary and primitive scalars.
++    #[rstest]
++    // Null-equivalent: all three shapes return true.
++    #[case::null_int(Scalar::Null(DataType::INTEGER), true)]
++    #[case::null_string(Scalar::Null(DataType::STRING), true)]
++    #[case::null_binary(Scalar::Null(DataType::BINARY), true)]
++    #[case::empty_string(Scalar::String(String::new()), true)]
++    #[case::empty_binary(Scalar::Binary(Vec::new()), true)]
++    // Non-empty strings/binary and primitive scalars return false.
++    #[case::nonempty_string(Scalar::String("a".into()), false)]
++    #[case::single_space(Scalar::String(" ".into()), false)]
++    #[case::nonempty_binary(Scalar::Binary(vec![0x00]), false)]
++    #[case::integer(Scalar::Integer(0), false)]
++    #[case::boolean_false(Scalar::Boolean(false), false)]
++    fn test_would_serialize_to_null(#[case] value: Scalar, #[case] expected: bool) {
++        assert_eq!(would_serialize_to_null(&value), expected);
++    }
++
++    /// Cross-check: any value where `would_serialize_to_null` is true must also serialize
++    /// to `Ok(None)`, and vice versa for the cases above. Locks the predicate to the actual
++    /// serialization outcome so the helper cannot drift from `serialize_partition_value`.
++    #[rstest]
++    #[case(Scalar::Null(DataType::INTEGER))]
++    #[case(Scalar::String(String::new()))]
++    #[case(Scalar::Binary(Vec::new()))]
++    fn test_would_serialize_to_null_matches_serialize_result(#[case] value: Scalar) {
++        assert!(would_serialize_to_null(&value));
++        assert_eq!(serialize_partition_value(&value).unwrap(), None);
++    }
++
+     // ============================================================================
+     // Spark reference: non-null serialization
+     // ============================================================================
\ No newline at end of file
kernel/src/partition/validation.rs
@@ -5,34 +5,50 @@
  //!   case, detects post-normalization duplicates).
  //! - [`validate_types`]: checks that each `Scalar`'s type matches the partition column's schema
 -//!   type. Null scalars skip the type check.
-+//!   type, and that non-null partition columns are never assigned null scalars. Null scalars skip
-+//!   the value type check; the column's nullability is still enforced.
- 
+-
++//!   type, and that non-null partition columns are never assigned a value that would serialize to a
++//!   null partition value.
  use std::collections::HashMap;
  
+ use crate::expressions::Scalar;
++use crate::partition::serialization::would_serialize_to_null;
+ use crate::schema::{DataType, StructType};
+ use crate::{DeltaResult, Error};
+ 
  }
  
  /// Validates that each [`Scalar`] value's type is compatible with the corresponding
 -/// partition column's type in the table schema. Null scalars skip the type check
 -/// (null is valid for any partition column type).
-+/// partition column's type in the table schema. Null scalars skip the value type check, but a
-+/// null is only legal when the schema field is nullable.
++/// partition column's type in the table schema. Values that would serialize to a null
++/// partition value skip the value type check, but a
++/// null partition value is only legal when the schema field is nullable.
++///
++/// Nullability is enforced before any non-null type check, so the rejection treats null
++/// scalars and their serialization-equivalent forms (empty strings, empty binary)
++/// uniformly -- otherwise an empty `Scalar::String` for a `nullable: false` column would
++/// pass validation, then collapse to JSON null at serialization, landing a null value in
++/// a NOT NULL column.
  ///
  /// # Parameters
  /// - `logical_schema`: the table's logical schema.
  /// # Errors
  /// - A partition column is not found in the table schema
  /// - A partition column's schema type is not a primitive (struct, array, map are rejected)
-+/// - A null value is provided for a non-nullable partition column
++/// - A null-equivalent value (null scalar, empty string, or empty binary) is provided for a
++///   non-nullable partition column
  /// - A non-null value's type does not match the partition column's schema type
  fn validate_types(
      logical_schema: &StructType,
+                  Partition columns must be primitive types."
              )));
          }
-         if value.is_null() {
+-        if value.is_null() {
++        if would_serialize_to_null(value) {
 +            if !field.nullable {
 +                return Err(Error::invalid_partition_values(format!(
-+                    "partition column '{col_name}' is not nullable but received a null value"
++                    "partition column '{col_name}' is not nullable but received a value that \
++                     serializes to null (null scalar, empty string, or empty binary)"
 +                )));
 +            }
              continue;
@@ -52,13 +68,36 @@
      // ============================================================================
      // validate_keys
      // ============================================================================
+     /// Every non-null row from the "String and binary encoding" table in
+     /// `partition/mod.rs`. Row numbers reference that table.
+     #[rstest]
+-    // STRING rows
++    // STRING rows. Rows 49 (empty binary) and 65 (empty string) are null-equivalent under
++    // the protocol and are covered by the null-into-nullable / null-into-non-nullable tests
++    // further down -- they are intentionally absent from this "type-match passes" group.
+     #[case(DataType::STRING, Scalar::String("\x00".into()))] // row 47
+     #[case(DataType::STRING, Scalar::String("before\x00after".into()))] // row 48
+     #[case(DataType::STRING, Scalar::String("a{b".into()))] // row 54
+     #[case(DataType::STRING, Scalar::String("a&b+c$d;e,f".into()))] // row 62
+     #[case(DataType::STRING, Scalar::String("Serbia/srb%".into()))] // row 63
+     #[case(DataType::STRING, Scalar::String("100%25".into()))] // row 64
+-    #[case(DataType::STRING, Scalar::String("".into()))] // row 65
+     #[case(DataType::STRING, Scalar::String(" ".into()))] // row 67
+     #[case(DataType::STRING, Scalar::String("  ".into()))] // row 68
+     // BINARY rows
+-    #[case(DataType::BINARY, Scalar::Binary(vec![]))] // row 49
+     #[case(DataType::BINARY, Scalar::Binary(vec![0xDE, 0xAD, 0xBE, 0xEF]))] // row 50
+     #[case(DataType::BINARY, Scalar::Binary(vec![0x48, 0x45, 0x4C, 0x4C, 0x4F]))] // row 51
+     #[case(DataType::BINARY, Scalar::Binary(vec![0x00, 0xFF]))] // row 52
          assert_type_ok(data_type, value);
      }
  
 -    /// NULL rows from the encoding table. Null is valid for every partition column type,
 -    /// regardless of the Scalar::Null inner type.
-+    /// NULL rows from the encoding table. Null is valid for every partition column type when
-+    /// the schema field is nullable, regardless of the `Scalar::Null` inner type.
++    /// Null-equivalent rows from the encoding table. Under the Delta protocol, three Scalar
++    /// shapes serialize to JSON null in `partitionValues`: `Scalar::Null`, empty `Scalar::String`,
++    /// and empty `Scalar::Binary`. All three are valid for every partition column type when the
++    /// schema field is nullable, regardless of the `Scalar::Null` inner type.
      #[rstest]
      #[case(DataType::INTEGER, Scalar::Null(DataType::INTEGER))] // row 5
      #[case(DataType::LONG, Scalar::Null(DataType::LONG))] // row 8
@@ -67,6 +106,8 @@
      #[case(DataType::BINARY, Scalar::Null(DataType::BINARY))] // (binary NULL)
 -    fn test_validate_types_null_returns_ok(#[case] data_type: DataType, #[case] value: Scalar) {
 -        assert_type_ok(data_type, value);
++    #[case(DataType::STRING, Scalar::String(String::new()))] // row 65: empty string
++    #[case(DataType::BINARY, Scalar::Binary(Vec::new()))] // row 49: empty binary
 +    fn test_validate_types_null_into_nullable_returns_ok(
 +        #[case] data_type: DataType,
 +        #[case] value: Scalar,
@@ -86,10 +127,8 @@
 +        assert_type_ok_nullable(DataType::INTEGER, Scalar::Null(DataType::STRING));
 +    }
 +
-+    /// A null partition value for a non-nullable partition column is rejected, matching
-+    /// Delta-Spark's `DELTA_NOT_NULL_CONSTRAINT_VIOLATED` (SQLSTATE 23502) behavior. The
-+    /// `Scalar::Null` inner type is irrelevant -- the rejection is driven entirely by the
-+    /// schema field's nullability.
++    /// A null-equivalent partition value (null scalar, empty string, or empty binary) for
++    /// a non-nullable partition column is rejected
 +    #[rstest]
 +    #[case(DataType::INTEGER, Scalar::Null(DataType::INTEGER))]
 +    #[case(DataType::LONG, Scalar::Null(DataType::LONG))]
@@ -100,9 +139,14 @@
 +    #[case(DataType::TIMESTAMP, Scalar::Null(DataType::TIMESTAMP))]
 +    #[case(DataType::TIMESTAMP_NTZ, Scalar::Null(DataType::TIMESTAMP_NTZ))]
 +    #[case(DataType::decimal(10, 2).unwrap(), Scalar::Null(DataType::decimal(10, 2).unwrap()))]
-+    // Null with mismatched inner type is also rejected -- the schema's nullability is the
++    // Null with mismatched inner type is also rejected the schema's nullability is the
 +    // only thing checked for null scalars.
 +    #[case(DataType::INTEGER, Scalar::Null(DataType::STRING))]
++    // Empty string and empty binary serialize to JSON null per the Delta protocol; they
++    // must be rejected for NOT NULL partition columns alongside `Scalar::Null` so a
++    // serialization-equivalent value cannot bypass the nullability check.
++    #[case(DataType::STRING, Scalar::String(String::new()))]
++    #[case(DataType::BINARY, Scalar::Binary(Vec::new()))]
 +    fn test_validate_types_null_into_non_nullable_returns_error(
 +        #[case] data_type: DataType,
 +        #[case] value: Scalar,
kernel/src/transaction/mod.rs
@@ -5,8 +5,10 @@
      ///   validates that each non-null `Scalar`'s type matches the partition column's schema type.
      ///   For example, passing `Scalar::String("2024")` for an `INTEGER` column returns an error.
 -    ///   Null scalars skip the value type check (null is valid for any primitive partition column).
-+    ///   Null scalars skip the value type check, but a null is only legal when the partition column
-+    ///   is nullable; passing a null for a `nullable: false` partition column returns an error.
++    ///   Null-equivalent scalars (null scalars, empty strings, and empty binary) all of which
++    ///   collapse to JSON null in `partitionValues`) skip the value type check, but they are only
++    ///   legal when the partition column is nullable; passing any of these for a `nullable: false`
++    ///   partition column returns an error.
      ///
      /// - **Value serialization**: serializes each `Scalar` to a protocol-compliant string per the
      ///   Delta protocol's "Partition Value Serialization" rules. `Scalar::Null(...)` becomes `None`
\ No newline at end of file
kernel/tests/integration/write/partitioned.rs
@@ -14,48 +14,44 @@
 +// 23502): a NULL written into a `NOT NULL` partition column must be rejected on the default
 +// engine. The two arms exercise the two enforcement seams:
 +//
-+// 1. Non-materialized (default): `Transaction::generate_logical_to_physical` drops partition
-+//    columns from the batch before Parquet write, so the partition-value map is the authority. The
-+//    fix in `validate_types` rejects null `Scalar`s for `nullable: false` partition columns at
-+//    `partitioned_write_context` time.
-+// 2. Materialized (`materializePartitionColumns` writer feature): partition columns stay in the
-+//    batch through the transform, so a null in the NOT NULL column is rejected by `arrow-rs` at
-+//    `RecordBatch::try_new`, matching the data-column NOT NULL contract.
++// 1. Non-materialized (default): partition columns are stripped from the batch before the
++//    Parquet write, so the partition-value map is the authority. Kernel validation rejects
++//    null-equivalent values for `nullable: false` partition columns up front.
++// 2. Materialized (`materializePartitionColumns` writer feature): partition columns stay in
++//    the batch on the way to the engine, so a null in the NOT NULL column is rejected by the
++//    engine (arrow-rs) at batch construction, matching the data-column NOT NULL contract.
 +
-+/// `validate_types` enforces the NOT NULL contract on partition values for the non-materialized
-+/// path: nulls into a `nullable: false` partition column are rejected before serialization, and
-+/// non-null values pass through unchanged. The CM axis is orthogonal -- validation runs against
-+/// logical names and field nullability before any column-mapping renaming. Unit tests in
-+/// `kernel/src/partition/validation.rs` cover the full type matrix; this test pins the e2e
-+/// wiring through `Transaction::partitioned_write_context`.
++/// Validates the e2e NOT NULL contract on partition values for the non-materialized path: a
++/// null-equivalent value into a `nullable: false` partition column is rejected before
++/// serialization, and ordinary non-null values pass through unchanged. The column-mapping
++/// axis is orthogonal. Validation runs against logical names and field nullability before
++/// any column-mapping renaming, so the same outcome is expected under all three CM modes.
++///
++/// Three value cases cross-multiplied against three column-mapping modes:
++///
++/// - `Scalar::Null(STRING)` -- explicit null, must be rejected.
++/// - `Scalar::String("a")`  -- ordinary non-null value, must be accepted.
++/// - `Scalar::String("")`   -- empty string, which the Delta protocol treats as JSON null in
++///   `partitionValues`. It must be rejected for the same reason as `Scalar::Null` -- otherwise
++///   it would slip past the nullability check and land a null value in a NOT NULL column.
 +#[rstest]
-+// Reject: null Scalar into NOT NULL partition column, all CM modes.
-+#[case::cm_none_null(
-+    ColumnMappingMode::None,
-+    Scalar::Null(DataType::STRING),
-+    Some("not nullable")
-+)]
-+#[case::cm_name_null(
-+    ColumnMappingMode::Name,
-+    Scalar::Null(DataType::STRING),
-+    Some("not nullable")
-+)]
-+#[case::cm_id_null(
-+    ColumnMappingMode::Id,
-+    Scalar::Null(DataType::STRING),
-+    Some("not nullable")
-+)]
-+// Accept: non-null Scalar into NOT NULL partition column, all CM modes.
-+#[case::cm_none_non_null(ColumnMappingMode::None, Scalar::String("a".into()), None)]
-+#[case::cm_name_non_null(ColumnMappingMode::Name, Scalar::String("a".into()), None)]
-+#[case::cm_id_non_null(ColumnMappingMode::Id, Scalar::String("a".into()), None)]
 +#[tokio::test(flavor = "multi_thread")]
 +async fn test_partition_null_validation_non_materialized(
-+    #[case] cm_mode: ColumnMappingMode,
-+    #[case] value: Scalar,
-+    #[case] expected_err: Option<&str>,
++    #[values(
++        ColumnMappingMode::None,
++        ColumnMappingMode::Name,
++        ColumnMappingMode::Id
++    )]
++    cm_mode: ColumnMappingMode,
++    #[values(
++        (Scalar::Null(DataType::STRING), Some("not nullable")),
++        (Scalar::String("a".into()), None),
++        (Scalar::String(String::new()), Some("not nullable")),
++    )]
++    case: (Scalar, Option<&'static str>),
 +) -> Result<(), Box<dyn std::error::Error>> {
 +    let _ = tracing_subscriber::fmt::try_init();
++    let (value, expected_err) = case;
 +
 +    // Schema: a nullable data column + a NOT NULL string partition column.
 +    let schema = Arc::new(StructType::try_new(vec![
@@ -66,17 +62,17 @@
 +    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
 +    let snapshot = create_partitioned_table(&table_path, engine.as_ref(), schema, &["p"], cm_mode)?;
 +
-+    let txn = snapshot
++    let result = snapshot
 +        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
-+        .with_engine_info("default engine");
-+    let result = txn.partitioned_write_context(HashMap::from([("p".to_string(), value)]));
++        .with_engine_info("default engine")
++        .partitioned_write_context(HashMap::from([("p".to_string(), value)]));
 +
 +    match expected_err {
 +        Some(needle) => {
 +            let err = result
 +                .err()
 +                .ok_or(
-+                    "expected partitioned_write_context to error for null into NOT NULL partition",
++                    "expected partitioned_write_context to error for a null-equivalent value into NOT NULL partition",
 +                )?
 +                .to_string();
 +            assert!(err.contains(needle), "{err}");
@@ -90,12 +86,10 @@
 +    Ok(())
 +}
 +
-+/// Materialized arm: with `materializePartitionColumns` enabled, partition columns stay in
-+/// the logical batch (`Transaction::generate_logical_to_physical` does not drop them). The
-+/// kernel-derived Arrow schema preserves `nullable: false`, and `arrow-rs` rejects a null
-+/// in the NOT NULL partition column at `RecordBatch::try_new` -- the same enforcement seam
-+/// as a NOT NULL data column. The feature is enabled at create time via the explicit
-+/// feature signal `delta.feature.materializePartitionColumns=supported`.
++/// Materialized arm: with the `materializePartitionColumns` writer feature enabled, partition
++/// columns stay in the engine-bound Arrow batch, so the NOT NULL enforcement seam is the same
++/// as for data columns where the engine rejects a null in a `nullable: false` field at batch
++/// construction.
 +#[tokio::test(flavor = "multi_thread")]
 +async fn test_partition_null_validation_in_batch_materialized(
 +) -> Result<(), Box<dyn std::error::Error>> {
@@ -121,9 +115,12 @@
 +        "test setup must enable materializePartitionColumns"
 +    );
 +
-+    // Partitioned write context with a non-null partition value. The materialize feature
-+    // means the partition column stays in the logical batch; constructing that batch with a
-+    // null in the NOT NULL `p` column is the failure surface we care about.
++    // The partition-value map below is a benign mock: it satisfies the kernel write API
++    // but is decoupled from this test's assertion. With the materialize feature enabled,
++    // the partition column stays in the Arrow batch on the way to the engine (rather than
++    // being filtered out), so the NOT NULL enforcement seam moves into the engine: a null
++    // in a `nullable: false` field is rejected at batch construction below, independent of
++    // whatever value the mock map carries here.
 +    let txn = snapshot
 +        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
 +        .with_engine_info("default engine");
kernel/tests/integration/write/types.rs
@@ -31,29 +31,18 @@
 +// NOT NULL data column tests (default engine)
 +// =============================================================================
 +//
-+// These tests pin the contract documented on `maybe_enable_invariants` (see
-+// `kernel/src/transaction/builder/create_table.rs`): kernel does not enforce
-+// nullability at write time -- it relies on the engine's `ParquetHandler` to
-+// do so. The default engine inherits the guarantee from `arrow-rs`, which
-+// rejects null values for fields with `nullable: false` at
-+// `RecordBatch::try_new`.
-+//
-+// Each case creates the table via the kernel `create_table` builder so the
-+// non-null schema drives the auto-enablement of the `invariants` writer
-+// feature end-to-end (the protocol is not hand-crafted). The test then opens
-+// the standard write path (snapshot + transaction + `unpartitioned_write_context`)
-+// and asserts that the input `RecordBatch` for `engine.write_parquet` cannot
-+// be constructed with a null in the NOT NULL column. The failure surfaces
-+// before the engine is ever invoked.
++// These tests validate the kernel<->engine NOT NULL contract: kernel does not enforce
++// nullability at write time. It relies on the engine to do so. The default engine
++// inherits the guarantee from arrow-rs, which rejects nulls in `nullable: false`
++// fields at batch construction.
 +
 +/// Verifies the default-engine NOT NULL contract for non-partition columns
 +/// across a representative set of primitive types. The contract:
 +///
-+/// 1. `nullable: false` propagates from kernel's `StructField` through the connector-facing
-+///    physical schema (`WriteContext::physical_schema`) and into the Arrow logical schema engines
-+///    build batches against.
-+/// 2. Constructing the Arrow `RecordBatch` an engine would hand to `engine.write_parquet` with a
-+///    null in the NOT NULL column fails at `RecordBatch::try_new`, before the engine is invoked.
++/// 1. `nullable: false` propagates from kernel's logical schema through the connector-facing
++///    physical schema and into the Arrow schema engines build batches against.
++/// 2. Constructing the Arrow batch an engine would hand to the kernel write API with a null in the
++///    NOT NULL column fails at batch construction, before the engine is invoked.
 +///
 +/// See [#2465](https://github.com/delta-io/delta-kernel-rs/issues/2465).
 +#[rstest]
@@ -70,9 +59,7 @@
 +) -> Result<(), Box<dyn std::error::Error>> {
 +    let _ = tracing_subscriber::fmt::try_init();
 +
-+    // Step 1: Create the table via the kernel builder. The non-null schema is the only signal
-+    // needed: kernel auto-adds the `invariants` writer feature. The test exercises the full
-+    // chain (schema -> auto-enable -> engine enforcement) rather than hand-crafting the protocol.
++    // Create a table with a NOT NULL column.
 +    let schema = Arc::new(StructType::try_new(vec![StructField::not_null(
 +        "c",
 +        data_type.clone(),
@@ -82,9 +69,8 @@
 +        .build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?
 +        .commit(engine.as_ref())?;
 +
-+    // Step 2: Confirm the protocol auto-enabled `invariants` -- the upstream half of the
-+    // contract this test pins.
 +    let snapshot = Snapshot::builder_for(&table_path).build(engine.as_ref())?;
++    // The non-null schema auto-enables the `invariants` writer feature.
 +    assert!(
 +        snapshot
 +            .table_configuration()
@@ -94,17 +80,13 @@
 +        "non-null schema must auto-enable the `invariants` writer feature",
 +    );
 +
-+    // Step 3: Open the standard default-engine write path. Going through `Snapshot::transaction`
-+    // and `unpartitioned_write_context` exercises the exact setup an engine performs prior to
-+    // calling `engine.write_parquet`.
-+    let txn = snapshot
++    let write_context = snapshot
 +        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
-+        .with_engine_info("default engine");
-+    let write_context = txn.unpartitioned_write_context()?;
++        .with_engine_info("default engine")
++        .unpartitioned_write_context()?;
 +
-+    // Step 4: Confirm the connector-facing physical schema preserves `nullable: false`. This is
-+    // the schema engines must use to build batches; using the logical schema in tests would hide
-+    // bugs in the logical->physical mapping.
++    // Use the connector-facing physical schema (not the logical one); the logical schema
++    // would hide bugs in the logical->physical mapping that engines hit in production.
 +    let physical = write_context.physical_schema();
 +    let physical_field = physical
 +        .field("c")
@@ -120,14 +102,11 @@
 +        "Arrow conversion of physical schema must preserve `nullable: false`",
 +    );
 +
-+    // Step 5: Building the input `RecordBatch` for `engine.write_parquet` with a null in the
-+    // NOT NULL column must fail at `RecordBatch::try_new`, before the engine is called. This
-+    // is the default-engine NOT NULL enforcement seam.
 +    let arrow_type = arrow_schema.field(0).data_type().clone();
 +    let result = RecordBatch::try_new(Arc::new(arrow_schema), vec![new_null_array(&arrow_type, 1)]);
 +    assert!(
 +        result.is_err(),
-+        "RecordBatch::try_new should reject null in NOT NULL column ({data_type:?}); got: {result:?}",
++        "batch construction should reject null in NOT NULL column ({data_type:?}); got: {result:?}",
 +    );
 +
 +    Ok(())

Reproduce locally: git range-diff b61bf96..e7c821d b61bf96..c59a39d | Disable: git config gitstack.push-range-diff false

@sanujbasu sanujbasu force-pushed the stack/enforce-non-null-columns branch from e7c821d to c59a39d Compare April 30, 2026 23:19
@sanujbasu sanujbasu requested a review from scottsand-db April 30, 2026 23:21
Copy link
Copy Markdown
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good but some feedback on some of the comment blocks

Comment thread kernel/src/partition/serialization.rs Outdated
Comment thread kernel/tests/integration/write/types.rs Outdated
Comment thread kernel/tests/integration/write/types.rs
Comment thread kernel/src/partition/serialization.rs Outdated
Copy link
Copy Markdown
Collaborator

@DrakeLin DrakeLin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a test with a table with multiple partition tables with mixed nullability

Comment thread kernel/src/partition/serialization.rs Outdated
@sanujbasu sanujbasu force-pushed the stack/enforce-non-null-columns branch from c59a39d to 897cd26 Compare May 2, 2026 00:08
Fixes delta-io#2465.

Closes the write-time enforcement gap left by delta-io#2418, which auto-enables the
invariants writer feature on schemas with non-null columns but does not itself
reject nulls on writes. Kernel now rejects Scalar::Null partition values for
partition columns with nullable: false in partitioned_write_context, matching
Delta-Spark's DELTA_NOT_NULL_CONSTRAINT_VIOLATED (SQLSTATE 23502).

The check sits in validate_types alongside the existing type checks. The
Scalar::Null inner type continues to be ignored on nulls, so connectors can keep
constructing null partition values without first looking up the schema; only the
schema field's nullability is enforced. This matches Catalyst's NullType
permissiveness and the type-erased on-wire format (partitionValues serializes
nulls as JSON null with no type information).

Adds integration tests covering all three non-null write paths:

1/ Non-materialized partitioned writes (default): kernel rejects the null Scalar
   at partitioned_write_context time, before serialization. Covered across all
   column-mapping modes via the kernel create_table builder.

2/ Materialized partitioned writes (materializePartitionColumns): kernel
   delegates to the engine's ParquetHandler. The default engine inherits the
   guarantee from arrow-rs, whose RecordBatch::try_new rejects nulls in fields
   with nullable: false. The table for this case is set up via the kernel
   create_table builder with delta.feature.materializePartitionColumns=supported
   (allow-listed by delta-io#2481).

3/ Plain non-null data columns: same engine-delegation seam, exercised across a
   representative primitive type matrix (integer, long, string, binary, boolean,
   timestamp, decimal). The table is created via the kernel create_table builder
   so the non-null schema drives the auto-enablement of the invariants writer
   feature end-to-end; the test asserts both the auto-enable and the downstream
   RecordBatch::try_new rejection. Pins the full chain documented on
   maybe_enable_invariants in transaction/builder/create_table.rs.

Local verification:

1/ cargo +nightly fmt --check 2/ cargo clippy --workspace --benches --tests
   --all-features -- -D warnings 3/ cargo doc --workspace --all-features
   --no-deps 4/ cargo test -p delta_kernel --all-features for affected paths
@sanujbasu
Copy link
Copy Markdown
Collaborator Author

Range-diff: stack/materialize-partition-columns-cleanup (897cd26 -> 063dda3)
kernel/src/partition/serialization.rs
@@ -8,10 +8,10 @@
 +/// partition value serialization rules.
 +///
 +/// The protocol collapses three Scalar shapes onto JSON null in `AddFile.partitionValues`:
-+/// `Scalar::Null(_)`, empty `Scalar::String`, and empty `Scalar::Binary`. Treating all
-+/// three as null on write keeps the on-wire format type-erased (connectors can construct
-+/// partition values without inspecting the schema) and matches Delta-Spark's pre-write
-+/// SQL-layer normalization.
++/// `Scalar::Null(_)`, empty `Scalar::String`, and empty `Scalar::Binary`. Partition
++/// values are serialized according to protocol rules on write and interpreted against
++/// the table schema on read, so all three forms must be treated as null before enforcing
++/// NOT NULL constraints.
 +///
 +/// Read paths do not need this predicate: the on-wire collapse to JSON null is undone at
 +/// JSON deserialization (null map entries are dropped before kernel sees a Scalar), so a
@@ -33,25 +33,18 @@
 -/// `Err` for non-null values of types that cannot be partition columns (Struct, Array, Map)
 -/// or for binary values that are not valid UTF-8.
 +/// Returns `Ok(None)` for any value the Delta protocol treats as null in `partitionValues`:
-+/// `Scalar::Null`, empty `Scalar::String`, and empty `Scalar::Binary`. Collapsing the three
-+/// onto JSON null on write keeps the on-wire format type-erased and matches Delta-Spark's
-+/// pre-write SQL-layer normalization. Returns `Err` for non-null values of types that
-+/// cannot be partition columns (Struct, Array, Map) or for binary values that are not valid
-+/// UTF-8.
++/// `Scalar::Null`, empty `Scalar::String`, and empty `Scalar::Binary`. Non-null partition
++/// values are serialized according to protocol rules; readers interpret the stored value
++/// against the table schema. Returns `Err` for non-null values of types that cannot be
++/// partition columns (Struct, Array, Map) or for binary values that are not valid UTF-8.
  ///
  /// The inverse of [`PrimitiveType::parse_scalar`].
  ///
- /// [`PrimitiveType::parse_scalar`]: crate::schema::PrimitiveType::parse_scalar
  pub fn serialize_partition_value(value: &Scalar) -> DeltaResult<Option<String>> {
-+    if would_serialize_to_null(value) {
-+        return Ok(None);
-+    }
-+    // Null/empty-string/empty-binary are checked up front,
-+    // so the remaining arms only see non-null payloads.
      match value {
--        Scalar::Null(_) => Ok(None),
+         Scalar::Null(_) => Ok(None),
 -        Scalar::String(s) => Ok(if s.is_empty() { None } else { Some(s.clone()) }),
-+        Scalar::Null(_) => unreachable!("nulls are checked up front"),
++        Scalar::String(s) if s.is_empty() => Ok(None),
 +        Scalar::String(s) => Ok(Some(s.clone())),
          Scalar::Boolean(v) => Ok(Some(v.to_string())),
          Scalar::Byte(v) => Ok(Some(v.to_string())),
@@ -66,6 +59,7 @@
 -                Ok(Some(format_binary(b)?))
 -            }
 -        }
++        Scalar::Binary(b) if b.is_empty() => Ok(None),
 +        Scalar::Binary(b) => Ok(Some(format_binary(b)?)),
          Scalar::Struct(_) | Scalar::Array(_) | Scalar::Map(_) => Err(Error::generic(format!(
              "cannot serialize partition value: type {:?} is not a valid partition column type",
kernel/tests/integration/write/partitioned.rs
@@ -14,12 +14,12 @@
 +// 23502): a NULL written into a `NOT NULL` partition column must be rejected on the default
 +// engine. The two arms exercise the two enforcement seams:
 +//
-+// 1. Non-materialized (default): partition columns are stripped from the batch before the
-+//    Parquet write, so the partition-value map is the authority. Kernel validation rejects
-+//    null-equivalent values for `nullable: false` partition columns up front.
-+// 2. Materialized (`materializePartitionColumns` writer feature): partition columns stay in
-+//    the batch on the way to the engine, so a null in the NOT NULL column is rejected by the
-+//    engine (arrow-rs) at batch construction, matching the data-column NOT NULL contract.
++// 1. Non-materialized (default): partition columns are stripped from the batch before the Parquet
++//    write, so the partition-value map is the authority. Kernel validation rejects null-equivalent
++//    values for `nullable: false` partition columns up front.
++// 2. Materialized (`materializePartitionColumns` writer feature): partition columns stay in the
++//    batch on the way to the engine, so a null in the NOT NULL column is rejected by the engine
++//    (arrow-rs) at batch construction, matching the data-column NOT NULL contract.
 +
 +/// Validates the e2e NOT NULL contract on partition values for the non-materialized path: a
 +/// null-equivalent value into a `nullable: false` partition column is rejected before
@@ -32,8 +32,8 @@
 +/// - `Scalar::Null(STRING)` -- explicit null, must be rejected.
 +/// - `Scalar::String("a")`  -- ordinary non-null value, must be accepted.
 +/// - `Scalar::String("")`   -- empty string, which the Delta protocol treats as JSON null in
-+///   `partitionValues`. It must be rejected for the same reason as `Scalar::Null` -- otherwise
-+///   it would slip past the nullability check and land a null value in a NOT NULL column.
++///   `partitionValues`. It must be rejected for the same reason as `Scalar::Null` -- otherwise it
++///   would slip past the nullability check and land a null value in a NOT NULL column.
 +#[rstest]
 +#[tokio::test(flavor = "multi_thread")]
 +async fn test_partition_null_validation_non_materialized(
@@ -86,6 +86,69 @@
 +    Ok(())
 +}
 +
++/// Verifies mixed-nullability partition schemas enforce each partition column's own contract:
++/// null-equivalent values are accepted for nullable partition columns and rejected for NOT NULL
++/// partition columns.
++#[rstest]
++#[tokio::test(flavor = "multi_thread")]
++async fn test_partition_null_validation_mixed_nullability(
++    #[values(
++        ColumnMappingMode::None,
++        ColumnMappingMode::Name,
++        ColumnMappingMode::Id
++    )]
++    cm_mode: ColumnMappingMode,
++) -> Result<(), Box<dyn std::error::Error>> {
++    let _ = tracing_subscriber::fmt::try_init();
++
++    let schema = Arc::new(StructType::try_new(vec![
++        StructField::nullable("value", DataType::INTEGER),
++        StructField::not_null("p_required", DataType::STRING),
++        StructField::nullable("p_optional", DataType::STRING),
++    ])?);
++
++    let (_tmp_dir, table_path, engine) = test_table_setup_mt()?;
++    let snapshot = create_partitioned_table(
++        &table_path,
++        engine.as_ref(),
++        schema,
++        &["p_required", "p_optional"],
++        cm_mode,
++    )?;
++
++    snapshot
++        .clone()
++        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
++        .with_engine_info("default engine")
++        .partitioned_write_context(HashMap::from([
++            ("p_required".to_string(), Scalar::String("a".into())),
++            ("p_optional".to_string(), Scalar::Null(DataType::STRING)),
++        ]))?;
++
++    snapshot
++        .clone()
++        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
++        .with_engine_info("default engine")
++        .partitioned_write_context(HashMap::from([
++            ("p_required".to_string(), Scalar::String("a".into())),
++            ("p_optional".to_string(), Scalar::String(String::new())),
++        ]))?;
++
++    let err = snapshot
++        .transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?
++        .with_engine_info("default engine")
++        .partitioned_write_context(HashMap::from([
++            ("p_required".to_string(), Scalar::Null(DataType::STRING)),
++            ("p_optional".to_string(), Scalar::String("b".into())),
++        ]))
++        .unwrap_err()
++        .to_string();
++    assert!(err.contains("not nullable"), "{err}");
++    assert!(err.contains("'p_required'"), "{err}");
++
++    Ok(())
++}
++
 +/// Materialized arm: with the `materializePartitionColumns` writer feature enabled, partition
 +/// columns stay in the engine-bound Arrow batch, so the NOT NULL enforcement seam is the same
 +/// as for data columns where the engine rejects a null in a `nullable: false` field at batch
kernel/tests/integration/write/types.rs
@@ -31,10 +31,9 @@
 +// NOT NULL data column tests (default engine)
 +// =============================================================================
 +//
-+// These tests validate the kernel<->engine NOT NULL contract: kernel does not enforce
-+// nullability at write time. It relies on the engine to do so. The default engine
-+// inherits the guarantee from arrow-rs, which rejects nulls in `nullable: false`
-+// fields at batch construction.
++// These tests validate data-column nullability on the default engine. Kernel preserves
++// `nullable: false` in the connector-facing schema and relies on the engine to reject
++// nulls before writing.
 +
 +/// Verifies the default-engine NOT NULL contract for non-partition columns
 +/// across a representative set of primitive types. The contract:

Reproduce locally: git range-diff b61bf96..897cd26 b61bf96..063dda3 | Disable: git config gitstack.push-range-diff false

@sanujbasu sanujbasu force-pushed the stack/enforce-non-null-columns branch from 897cd26 to 063dda3 Compare May 2, 2026 00:15
@sanujbasu sanujbasu enabled auto-merge May 2, 2026 00:23
@sanujbasu sanujbasu added this pull request to the merge queue May 2, 2026
Merged via the queue into delta-io:main with commit 78670eb May 2, 2026
25 of 26 checks passed
@sanujbasu sanujbasu deleted the stack/enforce-non-null-columns branch May 2, 2026 00:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Test gap: NOT NULL column enforcement on writes (default engine)

3 participants