Skip to content

Commit eb4f572

Browse files
committed
numrecord validator
1 parent 766ace4 commit eb4f572

4 files changed

Lines changed: 163 additions & 21 deletions

File tree

kernel/src/engine/default/stats.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ fn compute_column_stats(
439439

440440
// When min/max is None (all nulls or unsupported type), emit a null-valued
441441
// single-element array to keep the field present in the stats struct. This
442-
// allows downstream consumers (like StatsVerifier) to find the column and
442+
// allows downstream consumers (like StatsColumnVerifier) to find the column and
443443
// check nullCount == numRecords. The JSON serializer omits null fields, so
444444
// the on-disk format still matches Spark's ignoreNullFields behavior.
445445
let null_fallback = || -> ArrayRef { Arc::new(new_null_array(column.data_type(), 1)) };
@@ -778,7 +778,7 @@ mod tests {
778778
assert_eq!(value_null_count.value(0), 3);
779779

780780
// All-null columns are present in minValues/maxValues but with null values.
781-
// The field must exist so that StatsVerifier can find it via visit_rows and
781+
// The field must exist so that StatsColumnVerifier can find it via visit_rows and
782782
// check nullCount == numRecords. The JSON serializer omits null fields, so
783783
// the on-disk format still matches Spark's ignoreNullFields behavior.
784784
let min_values = stats

kernel/src/transaction/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ mod stats_verifier;
6464
mod update;
6565
mod write_context;
6666

67-
use stats_verifier::StatsVerifier;
67+
use stats_verifier::StatsColumnVerifier;
6868
use write_context::SharedWriteState;
6969
pub use write_context::WriteContext;
7070

@@ -989,6 +989,13 @@ impl<S> Transaction<S> {
989989
if add_files.is_empty() {
990990
return Ok(());
991991
}
992+
// IcebergCompatV3 requires every AddFile to carry `stats.numRecords`.
993+
if self
994+
.effective_table_config
995+
.is_feature_enabled(&TableFeature::IcebergCompatV3)
996+
{
997+
stats_verifier::verify_num_records_present(add_files)?;
998+
}
992999
if let Some(ref clustering_cols) = self.physical_clustering_columns {
9931000
if !clustering_cols.is_empty() {
9941001
let physical_schema = self.effective_table_config.physical_schema();
@@ -1007,7 +1014,7 @@ impl<S> Transaction<S> {
10071014
Ok((col.clone(), data_type))
10081015
})
10091016
.collect::<DeltaResult<_>>()?;
1010-
let verifier = StatsVerifier::new(columns_with_types);
1017+
let verifier = StatsColumnVerifier::new(columns_with_types);
10111018
verifier.verify(add_files)?;
10121019
}
10131020
}

kernel/src/transaction/stats_verifier.rs

Lines changed: 150 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ use crate::DeltaResult;
1919
/// For each required column, validates that `nullCount` is present (non-null) and that
2020
/// `minValues` and `maxValues` are present unless the column is all-null
2121
/// (`nullCount == numRecords`).
22-
pub(crate) struct StatsVerifier {
22+
pub(crate) struct StatsColumnVerifier {
2323
required_columns: Vec<(ColumnName, DataType)>,
2424
}
2525

26-
impl StatsVerifier {
26+
impl StatsColumnVerifier {
2727
/// Create a new verifier that checks statistics for the given required columns and types.
2828
pub(crate) fn new(required_columns: Vec<(ColumnName, DataType)>) -> Self {
2929
Self { required_columns }
@@ -163,6 +163,14 @@ static COL_TYPES_DECIMAL: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
163163
(names, types).into()
164164
});
165165

166+
/// [`ColumnNamesAndTypes`] for [`NumRecordsValidator`]. The `names` is
167+
/// just a placeholder, the actual column names are provided by `EngineData::visit_rows`.
168+
static NUM_RECORDS_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
169+
let names = vec![column_name!("path"), column_name!("nr")];
170+
let types = vec![DataType::STRING, DataType::LONG];
171+
(names, types).into()
172+
});
173+
166174
/// Select the predefined static type array for a given column data type.
167175
fn column_types_for(dt: &DataType) -> DeltaResult<&'static ColumnNamesAndTypes> {
168176
match dt {
@@ -265,6 +273,62 @@ impl RowVisitor for ColumnStatsVisitor<'_> {
265273
}
266274
}
267275

276+
/// Verify that every `add` action has `stats.numRecords` populated. Short-circuits on the first
277+
/// violation and returns an error containing the `add.path`.
278+
pub(crate) fn verify_num_records_present(
279+
add_files: &[Box<dyn crate::EngineData>],
280+
) -> DeltaResult<()> {
281+
let column_names = vec![
282+
ColumnName::new(["path"]),
283+
ColumnName::new(["stats", "numRecords"]),
284+
];
285+
let mut first_missing: Option<String> = None;
286+
for batch in add_files {
287+
let mut visitor = NumRecordsValidator {
288+
first_missing: &mut first_missing,
289+
};
290+
batch.visit_rows(&column_names, &mut visitor)?;
291+
if first_missing.is_some() {
292+
break;
293+
}
294+
}
295+
if let Some(path) = first_missing {
296+
return Err(Error::stats_validation(format!(
297+
"'stats.numRecords' is required by icebergCompatV3, but is missing for file '{path}'",
298+
)));
299+
}
300+
Ok(())
301+
}
302+
303+
/// Visitor validates that every `add` action has `stats.numRecords` populated.
304+
/// Stops at the first row whose `numRecords` is null and records that row's `add.path`.
305+
struct NumRecordsValidator<'a> {
306+
first_missing: &'a mut Option<String>,
307+
}
308+
309+
impl RowVisitor for NumRecordsValidator<'_> {
310+
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
311+
NUM_RECORDS_TYPES.as_ref()
312+
}
313+
314+
fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
315+
require!(
316+
getters.len() == 2,
317+
Error::internal_error(format!(
318+
"Expected 2 getters for numRecords validation, got {}",
319+
getters.len()
320+
))
321+
);
322+
for row_idx in 0..row_count {
323+
if getters[1].get_long(row_idx, "numRecords")?.is_none() {
324+
*self.first_missing = Some(getters[0].get(row_idx, "path")?);
325+
return Ok(());
326+
}
327+
}
328+
Ok(())
329+
}
330+
}
331+
268332
#[cfg(test)]
269333
mod tests {
270334
use std::sync::Arc;
@@ -365,7 +429,7 @@ mod tests {
365429
#[test]
366430
fn test_verifier_with_empty_add_files() {
367431
let columns = vec![(ColumnName::new(["col"]), DataType::LONG)];
368-
let verifier = StatsVerifier::new(columns);
432+
let verifier = StatsColumnVerifier::new(columns);
369433
let result = verifier.verify(&[]);
370434
assert!(result.is_ok());
371435
}
@@ -381,7 +445,7 @@ mod tests {
381445
);
382446

383447
let columns = vec![(ColumnName::new(["col"]), DataType::LONG)];
384-
let verifier = StatsVerifier::new(columns);
448+
let verifier = StatsColumnVerifier::new(columns);
385449
let result = verifier.verify(&[batch]);
386450
assert!(result.is_ok());
387451
}
@@ -401,7 +465,8 @@ mod tests {
401465
min_values,
402466
max_values,
403467
);
404-
let verifier = StatsVerifier::new(vec![(ColumnName::new(["col"]), DataType::LONG)]);
468+
let verifier =
469+
StatsColumnVerifier::new(vec![(ColumnName::new(["col"]), DataType::LONG)]);
405470
let err_msg = verifier.verify(&[batch]).unwrap_err().to_string();
406471
assert!(err_msg.contains("file1.parquet"), "case: {category}");
407472
assert!(err_msg.contains(category), "case: {category}");
@@ -426,7 +491,7 @@ mod tests {
426491
);
427492

428493
let columns = vec![(ColumnName::new(["col"]), DataType::LONG)];
429-
let verifier = StatsVerifier::new(columns);
494+
let verifier = StatsColumnVerifier::new(columns);
430495
let result = verifier.verify(&[batch1, batch2]);
431496

432497
assert!(result.is_err());
@@ -445,7 +510,7 @@ mod tests {
445510
vec![None],
446511
);
447512

448-
let verifier = StatsVerifier::new(vec![]);
513+
let verifier = StatsColumnVerifier::new(vec![]);
449514
let result = verifier.verify(&[batch]);
450515
assert!(result.is_ok());
451516
}
@@ -462,7 +527,7 @@ mod tests {
462527
);
463528

464529
let columns = vec![(ColumnName::new(["col"]), DataType::LONG)];
465-
let verifier = StatsVerifier::new(columns);
530+
let verifier = StatsColumnVerifier::new(columns);
466531
assert!(verifier.verify(&[batch]).is_ok());
467532
}
468533

@@ -478,7 +543,7 @@ mod tests {
478543
);
479544

480545
let columns = vec![(ColumnName::new(["col"]), DataType::LONG)];
481-
let verifier = StatsVerifier::new(columns);
546+
let verifier = StatsColumnVerifier::new(columns);
482547
let result = verifier.verify(&[batch]);
483548
assert!(matches!(result, Err(Error::StatsValidation(_))));
484549
let err = result.unwrap_err().to_string();
@@ -566,7 +631,7 @@ mod tests {
566631
(ColumnName::new(["col_a"]), DataType::LONG),
567632
(ColumnName::new(["col_b"]), DataType::LONG),
568633
];
569-
assert!(StatsVerifier::new(columns).verify(&[batch]).is_ok());
634+
assert!(StatsColumnVerifier::new(columns).verify(&[batch]).is_ok());
570635

571636
// col_a valid, col_b missing minValues
572637
let batch = create_two_column_batch(
@@ -583,7 +648,7 @@ mod tests {
583648
(ColumnName::new(["col_a"]), DataType::LONG),
584649
(ColumnName::new(["col_b"]), DataType::LONG),
585650
];
586-
let err_msg = StatsVerifier::new(columns)
651+
let err_msg = StatsColumnVerifier::new(columns)
587652
.verify(&[batch])
588653
.unwrap_err()
589654
.to_string();
@@ -594,7 +659,7 @@ mod tests {
594659

595660
/// Verifies that stats collected from non-standard Arrow string representations
596661
/// (LargeUtf8/LargeStringArray, Utf8View/StringViewArray) can be validated by
597-
/// StatsVerifier, which expects Delta's logical STRING type. Engines may use any of
662+
/// StatsColumnVerifier, which expects Delta's logical STRING type. Engines may use any of
598663
/// these representations, and the stats pipeline must handle them without type errors.
599664
#[rstest]
600665
#[case::large_utf8(Arc::new(LargeStringArray::from(vec!["Austin", "Boston", "Chicago"])) as ArrayRef)]
@@ -625,13 +690,14 @@ mod tests {
625690

626691
let engine_data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(add_file_batch));
627692

628-
let verifier = StatsVerifier::new(vec![(ColumnName::new(["city"]), DataType::STRING)]);
693+
let verifier =
694+
StatsColumnVerifier::new(vec![(ColumnName::new(["city"]), DataType::STRING)]);
629695
verifier.verify(&[engine_data]).unwrap();
630696
}
631697

632698
/// Verify collect_stats produces correct stats shape for all-null and empty batches.
633699
/// These cases keep the column in minValues/maxValues with null values (so that
634-
/// StatsVerifier can find the field via visit_rows and check nullCount == numRecords).
700+
/// StatsColumnVerifier can find the field via visit_rows and check nullCount == numRecords).
635701
#[rstest]
636702
#[case::all_null_values(Arc::new(Int64Array::from(vec![None::<i64>, None, None])) as ArrayRef)]
637703
#[case::empty_batch(Arc::new(Int64Array::from(Vec::<Option<i64>>::new())) as ArrayRef)]
@@ -808,7 +874,76 @@ mod tests {
808874

809875
let engine_data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(add_file_batch));
810876

811-
let verifier = StatsVerifier::new(vec![(ColumnName::new(["col"]), dt)]);
877+
let verifier = StatsColumnVerifier::new(vec![(ColumnName::new(["col"]), dt)]);
812878
verifier.verify(&[engine_data]).unwrap();
813879
}
880+
881+
// ============================================================================
882+
// verify_num_records_present tests
883+
// ============================================================================
884+
885+
#[rstest]
886+
#[case::empty_input(vec![], None, vec![])]
887+
#[case::all_present(
888+
vec![vec![("a.parquet", Some(10)), ("b.parquet", Some(20))]],
889+
None/* expected_first_offender */,
890+
vec![]/* later_offenders */,
891+
)]
892+
#[case::first_offender_named_later_offenders_hidden(
893+
vec![vec![("a.parquet", Some(10)), ("b.parquet", None), ("c.parquet", None)]],
894+
Some("b.parquet"),
895+
vec!["c.parquet"],
896+
)]
897+
#[case::short_circuits_across_batches(
898+
vec![
899+
vec![("a.parquet", None), ("b.parquet", Some(20))],
900+
vec![("c.parquet", Some(30)), ("d.parquet", None)],
901+
],
902+
Some("a.parquet"),
903+
vec!["d.parquet"],
904+
)]
905+
fn test_verify_num_records_present(
906+
#[case] batches: Vec<Vec<(&str, Option<i64>)>>,
907+
#[case] expected_first_offender: Option<&str>,
908+
#[case] later_offenders: Vec<&str>,
909+
) {
910+
let batches: Vec<Box<dyn EngineData>> = batches
911+
.into_iter()
912+
.map(|rows| {
913+
let (paths, num_records): (Vec<_>, Vec<_>) = rows.into_iter().unzip();
914+
create_add_file_batch_with_num_records(paths, num_records)
915+
})
916+
.collect();
917+
let result = verify_num_records_present(&batches);
918+
match expected_first_offender {
919+
None => result.unwrap(),
920+
Some(path) => {
921+
let err = result.unwrap_err().to_string();
922+
assert!(
923+
err.contains("'stats.numRecords' is required") && err.contains(path),
924+
"expected error containing '{path}', but got: {err}",
925+
);
926+
for later_offender in &later_offenders {
927+
assert!(
928+
!err.contains(later_offender),
929+
"error should not mention '{later_offender}': {err}",
930+
);
931+
}
932+
}
933+
}
934+
}
935+
936+
fn create_add_file_batch_with_num_records(
937+
paths: Vec<&str>,
938+
num_records: Vec<Option<i64>>,
939+
) -> Box<dyn EngineData> {
940+
let n = paths.len();
941+
create_add_file_batch(
942+
paths,
943+
num_records,
944+
vec![None; n],
945+
vec![None; n],
946+
vec![None; n],
947+
)
948+
}
814949
}

kernel/tests/integration/features/clustering_e2e.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ async fn test_clustered_table_write_and_checkpoint(
142142
/// Regression test: writing a batch where a clustering column has ALL null values should succeed.
143143
///
144144
/// `collect_stats` emits null-valued min/max entries for all-null columns, allowing
145-
/// `StatsVerifier` to find the field and confirm `nullCount == numRecords`. The JSON serializer
146-
/// omits null fields on disk, matching Spark's `ignoreNullFields` behavior.
145+
/// `StatsColumnVerifier` to find the field and confirm `nullCount == numRecords`. The JSON
146+
/// serializer omits null fields on disk, matching Spark's `ignoreNullFields` behavior.
147147
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
148148
async fn test_clustered_table_write_all_null_clustering_column() {
149149
let (_temp_dir, table_path, engine) = test_table_setup_mt().unwrap();

0 commit comments

Comments
 (0)