Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::table_features::{
FeatureType, IntoTableFeature, TableFeature, MIN_VALID_RW_VERSION,
TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
};
use crate::table_properties::TableProperties;
use crate::table_properties::{TableProperties, COLUMN_MAPPING_MAX_COLUMN_ID};
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, EvaluationHandlerExtension as _, FileMeta,
Expand Down Expand Up @@ -360,6 +360,40 @@ impl Metadata {
})
}

/// Returns a new Metadata with a single configuration entry inserted (or replaced),
/// preserving all other configuration entries and metadata fields.
pub(crate) fn with_configuration_entry(
mut self,
key: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.configuration.insert(key.into(), value.into());
self
}

/// Parse `delta.columnMapping.maxColumnId` from the table configuration, if present.
///
/// # Errors
///
/// Returns [`Error::InvalidProtocol`] if the property is present but not a non-negative
/// integer.
pub(crate) fn max_column_id(&self) -> DeltaResult<Option<i64>> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have this already as a TableProperty? It should parse it for us already, though I'm not sure it does the non-negative check.

let Some(raw) = self.configuration.get(COLUMN_MAPPING_MAX_COLUMN_ID) else {
return Ok(None);
};
let parsed = raw.parse::<i64>().map_err(|_| {
Error::invalid_protocol(format!(
"Invalid {COLUMN_MAPPING_MAX_COLUMN_ID} value '{raw}': must be an integer"
))
})?;
if parsed < 0 {
return Err(Error::invalid_protocol(format!(
"Invalid {COLUMN_MAPPING_MAX_COLUMN_ID} value '{raw}': must be non-negative"
)));
}
Ok(Some(parsed))
}

#[cfg(test)]
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_unchecked(
Expand Down Expand Up @@ -2147,4 +2181,49 @@ mod tests {
Some(&Some("1677811178336000".to_string()))
);
}

fn metadata_with_config(entries: &[(&str, &str)]) -> Metadata {
let mut config = HashMap::new();
for (k, v) in entries {
config.insert(k.to_string(), v.to_string());
}
Metadata {
configuration: config,
..Default::default()
}
}

#[test]
fn max_column_id_missing_returns_none() {
let m = metadata_with_config(&[]);
assert_eq!(m.max_column_id().unwrap(), None);
}

#[test]
fn max_column_id_valid_nonnegative_parses() {
let m = metadata_with_config(&[("delta.columnMapping.maxColumnId", "7")]);
assert_eq!(m.max_column_id().unwrap(), Some(7));
}

#[test]
fn max_column_id_zero_is_ok() {
let m = metadata_with_config(&[("delta.columnMapping.maxColumnId", "0")]);
assert_eq!(m.max_column_id().unwrap(), Some(0));
}

#[test]
fn max_column_id_non_integer_is_invalid_protocol() {
let m = metadata_with_config(&[("delta.columnMapping.maxColumnId", "not_a_number")]);
let err = m.max_column_id().unwrap_err();
assert!(matches!(err, Error::InvalidProtocol(_)));
assert!(err.to_string().contains("maxColumnId"));
}

#[test]
fn max_column_id_negative_is_invalid_protocol() {
let m = metadata_with_config(&[("delta.columnMapping.maxColumnId", "-5")]);
let err = m.max_column_id().unwrap_err();
assert!(matches!(err, Error::InvalidProtocol(_)));
assert!(err.to_string().contains("non-negative"));
}
}
50 changes: 50 additions & 0 deletions kernel/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,56 @@ impl StructType {
self.fields.into_values()
}

/// Gets a mutable reference to the underlying field map.
pub(crate) fn field_map_mut(&mut self) -> &mut IndexMap<String, StructField> {
&mut self.fields
}

/// Walk a pre-segmented column path through this schema and return the leaf field.
///
/// `path` is the path's individual name segments (one per nesting level), already split by
/// the caller. Lookup at each level is case-insensitive. The Delta protocol uses `.` as the
/// dotted-path separator at the API surface, but `field_at_path` itself does not split --
/// callers typically pass [`ColumnName::path()`](crate::expressions::ColumnName::path),
/// which yields the segments directly.
///
/// Panics if any segment is missing or an intermediate field is not a struct. Intended for
/// use in test assertions.
///
/// # Example
///
/// ```ignore
/// // Schema:
/// // id: INTEGER not null
/// // address: STRUCT { city: STRING not null, zip: STRING }
/// let path = vec!["address".to_string(), "city".to_string()];
/// let city = schema.field_at_path(&path);
/// assert_eq!(city.name(), "city");
/// assert!(!city.is_nullable());
/// ```
#[cfg(any(test, feature = "test-utils"))]
#[allow(clippy::panic, clippy::expect_used)]
pub fn field_at_path<'a>(&'a self, path: &[String]) -> &'a StructField {
fn find_ci<'a>(
mut fields: impl Iterator<Item = &'a StructField>,
name: &str,
) -> &'a StructField {
let lowered = name.to_lowercase();
fields
.find(|f| f.name().to_lowercase() == lowered)
.unwrap_or_else(|| panic!("field '{name}' not found"))
}
let (first, rest) = path.split_first().expect("non-empty path");
let mut field = find_ci(self.fields(), first);
for seg in rest {
let DataType::Struct(s) = field.data_type() else {
panic!("expected struct at intermediate segment '{seg}'");
};
field = find_ci(s.fields(), seg);
}
field
}

/// Gets all the field names in this struct type in the order they are defined.
pub fn field_names(&self) -> impl ExactSizeIterator<Item = &String> {
self.fields.keys()
Expand Down
126 changes: 112 additions & 14 deletions kernel/src/table_features/column_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pub(crate) fn get_column_mapping_mode_from_properties(
/// arrays, and maps. Each field is assigned a new unique ID and physical name.
///
/// Fields with pre-existing column mapping metadata (id or physicalName) are rejected
/// to avoid conflicts. ALTER TABLE will need different handling in the future.
/// to avoid conflicts.
///
/// # Arguments
///
Expand All @@ -268,30 +268,34 @@ pub(crate) fn assign_column_mapping_metadata(
StructType::try_new(new_fields)
}

/// Assigns column mapping metadata to a single field, recursively processing nested types.
///
/// Rejects fields with pre-existing column mapping metadata. Otherwise, assigns a new
/// unique ID and physical name (incrementing `max_id`).
fn assign_field_column_mapping(field: &StructField, max_id: &mut i64) -> DeltaResult<StructField> {
/// Rejects fields that already carry column mapping annotations (`delta.columnMapping.id` or
/// `delta.columnMapping.physicalName`).
pub(crate) fn reject_preexisting_cm_metadata(field: &StructField) -> DeltaResult<()> {
let has_id = field
.get_config_value(&ColumnMetadataKey::ColumnMappingId)
.is_some();
let has_physical_name = field
.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
.is_some();

// For CREATE TABLE, reject any pre-existing column mapping metadata.
// This avoids conflicts between user-provided IDs/physical names and the ones we assign.
// ALTER TABLE (adding columns) will need different handling in the future.
// TODO: Also check for nested column IDs (`delta.columnMapping.nested.ids`) once
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add this TODO back? cc @dengsh12 FYI

// Iceberg compatibility (IcebergCompatV2+) is supported. See issue #1125.
if has_id || has_physical_name {
return Err(Error::generic(format!(
"Field '{}' already has column mapping metadata. \
Pre-existing column mapping metadata is not supported for CREATE TABLE.",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this just be a loop where we iterate over the two fields, look each one up, and then return a very specific error message for that exact field instead of referring to both fields in the error message?

"Field '{}' already has column mapping metadata; the caller must not pre-populate \
`delta.columnMapping.id` or `delta.columnMapping.physicalName` on input fields.",
field.name
)));
}
Ok(())
}

/// Assigns column mapping metadata to a single field, recursively processing nested types.
///
/// Rejects fields with pre-existing column mapping metadata. Otherwise, assigns a new
/// unique ID and physical name (incrementing `max_id`).
pub(crate) fn assign_field_column_mapping(
field: &StructField,
max_id: &mut i64,
) -> DeltaResult<StructField> {
reject_preexisting_cm_metadata(field)?;

// Start with the existing field and assign new ID
let mut new_field = field.clone();
Expand All @@ -316,6 +320,31 @@ fn assign_field_column_mapping(field: &StructField, max_id: &mut i64) -> DeltaRe
Ok(new_field)
}

/// Returns the largest `delta.columnMapping.id` found anywhere in `schema`, including nested
/// data types. Returns `0` if no field carries a column mapping ID.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this function set the default, or should we return an Option and let the caller set the default to zero? I defer to you.

pub(crate) fn find_max_column_id_in_schema(schema: &StructType) -> i64 {
let mut max_id: i64 = 0;
for field in schema.fields() {
if let Some(MetadataValue::Number(n)) =
field.get_config_value(&ColumnMetadataKey::ColumnMappingId)
{
max_id = max_id.max(*n);
}
max_id = max_id.max(find_max_column_id_in_data_type(field.data_type()));
}
max_id
}

fn find_max_column_id_in_data_type(data_type: &DataType) -> i64 {
match data_type {
DataType::Struct(inner) => find_max_column_id_in_schema(inner),
DataType::Array(array_type) => find_max_column_id_in_data_type(array_type.element_type()),
DataType::Map(map_type) => find_max_column_id_in_data_type(map_type.key_type())
.max(find_max_column_id_in_data_type(map_type.value_type())),
DataType::Primitive(_) | DataType::Variant(_) => 0,
}
}

/// Process nested data types to assign column mapping metadata to any nested struct fields.
fn process_nested_data_type(data_type: &DataType, max_id: &mut i64) -> DeltaResult<DataType> {
match data_type {
Expand Down Expand Up @@ -1307,4 +1336,73 @@ mod tests {
.to_string()
.contains("is not a struct type"));
}

// === find_max_column_id_in_schema tests ===

fn field_with_id(name: &str, ty: DataType, id: i64) -> StructField {
let mut f = StructField::nullable(name, ty);
f.metadata.insert(
ColumnMetadataKey::ColumnMappingId.as_ref().to_string(),
MetadataValue::Number(id),
);
f
}

#[test]
fn find_max_column_id_empty_schema_is_zero() {
let schema =
StructType::try_new(vec![StructField::nullable("a", DataType::STRING)]).unwrap();
assert_eq!(find_max_column_id_in_schema(&schema), 0);
}

#[test]
fn find_max_column_id_top_level_only() {
let schema = StructType::try_new(vec![
field_with_id("a", DataType::STRING, 1),
field_with_id("b", DataType::INTEGER, 3),
field_with_id("c", DataType::STRING, 2),
])
.unwrap();
assert_eq!(find_max_column_id_in_schema(&schema), 3);
}

#[test]
fn find_max_column_id_nested_struct() {
let inner = DataType::Struct(Box::new(
StructType::try_new(vec![
field_with_id("x", DataType::STRING, 7),
field_with_id("y", DataType::STRING, 5),
])
.unwrap(),
));
let schema = StructType::try_new(vec![
field_with_id("outer", inner, 2),
field_with_id("sibling", DataType::STRING, 3),
])
.unwrap();
assert_eq!(find_max_column_id_in_schema(&schema), 7);
}

#[test]
fn find_max_column_id_array_and_map_recurse_into_element_types() {
let array_elem_struct = DataType::Array(Box::new(ArrayType::new(
DataType::Struct(Box::new(
StructType::try_new(vec![field_with_id("deep", DataType::STRING, 42)]).unwrap(),
)),
true,
)));
let map_ty = DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::Struct(Box::new(
StructType::try_new(vec![field_with_id("inside", DataType::STRING, 9)]).unwrap(),
)),
false,
)));
let schema = StructType::try_new(vec![
field_with_id("arr", array_elem_struct, 1),
field_with_id("m", map_ty, 2),
])
.unwrap();
assert_eq!(find_max_column_id_in_schema(&schema), 42);
}
}
6 changes: 4 additions & 2 deletions kernel/src/table_features/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ pub(crate) use column_mapping::get_any_level_column_physical_name;
pub use column_mapping::validate_schema_column_mapping;
pub use column_mapping::ColumnMappingMode;
pub(crate) use column_mapping::{
assign_column_mapping_metadata, column_mapping_mode, get_column_mapping_mode_from_properties,
get_field_column_mapping_info, physical_to_logical_column_name,
assign_column_mapping_metadata, assign_field_column_mapping, column_mapping_mode,
find_max_column_id_in_schema, get_column_mapping_mode_from_properties,
get_field_column_mapping_info, physical_to_logical_column_name, reject_preexisting_cm_metadata,
};
use delta_kernel_derive::internal_api;
use itertools::Itertools;
Expand All @@ -21,6 +22,7 @@ use crate::schema::derive_macro_utils::ToDataType;
use crate::schema::DataType;
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Error};

mod column_mapping;
mod timestamp_ntz;

Expand Down
Loading
Loading