-
Notifications
You must be signed in to change notification settings - Fork 167
feat: add column mapping support for add_column #2389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -245,7 +245,7 @@ pub(crate) fn get_column_mapping_mode_from_properties( | |
| /// arrays, and maps. Each field is assigned a new unique ID and physical name. | ||
| /// | ||
| /// Fields with pre-existing column mapping metadata (id or physicalName) are rejected | ||
| /// to avoid conflicts. ALTER TABLE will need different handling in the future. | ||
| /// to avoid conflicts. | ||
| /// | ||
| /// # Arguments | ||
| /// | ||
|
|
@@ -268,30 +268,34 @@ pub(crate) fn assign_column_mapping_metadata( | |
| StructType::try_new(new_fields) | ||
| } | ||
|
|
||
| /// Assigns column mapping metadata to a single field, recursively processing nested types. | ||
| /// | ||
| /// Rejects fields with pre-existing column mapping metadata. Otherwise, assigns a new | ||
| /// unique ID and physical name (incrementing `max_id`). | ||
| fn assign_field_column_mapping(field: &StructField, max_id: &mut i64) -> DeltaResult<StructField> { | ||
| /// Rejects fields that already carry column mapping annotations (`delta.columnMapping.id` or | ||
| /// `delta.columnMapping.physicalName`). | ||
| pub(crate) fn reject_preexisting_cm_metadata(field: &StructField) -> DeltaResult<()> { | ||
| let has_id = field | ||
| .get_config_value(&ColumnMetadataKey::ColumnMappingId) | ||
| .is_some(); | ||
| let has_physical_name = field | ||
| .get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) | ||
| .is_some(); | ||
|
|
||
| // For CREATE TABLE, reject any pre-existing column mapping metadata. | ||
| // This avoids conflicts between user-provided IDs/physical names and the ones we assign. | ||
| // ALTER TABLE (adding columns) will need different handling in the future. | ||
| // TODO: Also check for nested column IDs (`delta.columnMapping.nested.ids`) once | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to add this TODO back? cc @dengsh12 FYI |
||
| // Iceberg compatibility (IcebergCompatV2+) is supported. See issue #1125. | ||
| if has_id || has_physical_name { | ||
| return Err(Error::generic(format!( | ||
| "Field '{}' already has column mapping metadata. \ | ||
| Pre-existing column mapping metadata is not supported for CREATE TABLE.", | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't this just be a loop where we iterate over the two fields, look each one up, and then return a very specific error message for that exact field instead of referring to both fields in the error message? |
||
| "Field '{}' already has column mapping metadata; the caller must not pre-populate \ | ||
| `delta.columnMapping.id` or `delta.columnMapping.physicalName` on input fields.", | ||
| field.name | ||
| ))); | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Assigns column mapping metadata to a single field, recursively processing nested types. | ||
| /// | ||
| /// Rejects fields with pre-existing column mapping metadata. Otherwise, assigns a new | ||
| /// unique ID and physical name (incrementing `max_id`). | ||
| pub(crate) fn assign_field_column_mapping( | ||
| field: &StructField, | ||
| max_id: &mut i64, | ||
| ) -> DeltaResult<StructField> { | ||
| reject_preexisting_cm_metadata(field)?; | ||
|
|
||
| // Start with the existing field and assign new ID | ||
| let mut new_field = field.clone(); | ||
|
|
@@ -316,6 +320,31 @@ fn assign_field_column_mapping(field: &StructField, max_id: &mut i64) -> DeltaRe | |
| Ok(new_field) | ||
| } | ||
|
|
||
| /// Returns the largest `delta.columnMapping.id` found anywhere in `schema`, including nested | ||
| /// data types. Returns `0` if no field carries a column mapping ID. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this function set the default, or should we return an |
||
| pub(crate) fn find_max_column_id_in_schema(schema: &StructType) -> i64 { | ||
| let mut max_id: i64 = 0; | ||
| for field in schema.fields() { | ||
| if let Some(MetadataValue::Number(n)) = | ||
| field.get_config_value(&ColumnMetadataKey::ColumnMappingId) | ||
| { | ||
| max_id = max_id.max(*n); | ||
| } | ||
| max_id = max_id.max(find_max_column_id_in_data_type(field.data_type())); | ||
| } | ||
| max_id | ||
| } | ||
|
|
||
| fn find_max_column_id_in_data_type(data_type: &DataType) -> i64 { | ||
| match data_type { | ||
| DataType::Struct(inner) => find_max_column_id_in_schema(inner), | ||
| DataType::Array(array_type) => find_max_column_id_in_data_type(array_type.element_type()), | ||
| DataType::Map(map_type) => find_max_column_id_in_data_type(map_type.key_type()) | ||
| .max(find_max_column_id_in_data_type(map_type.value_type())), | ||
| DataType::Primitive(_) | DataType::Variant(_) => 0, | ||
| } | ||
| } | ||
|
|
||
| /// Process nested data types to assign column mapping metadata to any nested struct fields. | ||
| fn process_nested_data_type(data_type: &DataType, max_id: &mut i64) -> DeltaResult<DataType> { | ||
| match data_type { | ||
|
|
@@ -1307,4 +1336,73 @@ mod tests { | |
| .to_string() | ||
| .contains("is not a struct type")); | ||
| } | ||
|
|
||
| // === find_max_column_id_in_schema tests === | ||
|
|
||
| fn field_with_id(name: &str, ty: DataType, id: i64) -> StructField { | ||
| let mut f = StructField::nullable(name, ty); | ||
| f.metadata.insert( | ||
| ColumnMetadataKey::ColumnMappingId.as_ref().to_string(), | ||
| MetadataValue::Number(id), | ||
| ); | ||
| f | ||
| } | ||
|
|
||
| #[test] | ||
| fn find_max_column_id_empty_schema_is_zero() { | ||
| let schema = | ||
| StructType::try_new(vec![StructField::nullable("a", DataType::STRING)]).unwrap(); | ||
| assert_eq!(find_max_column_id_in_schema(&schema), 0); | ||
| } | ||
|
|
||
| #[test] | ||
| fn find_max_column_id_top_level_only() { | ||
| let schema = StructType::try_new(vec![ | ||
| field_with_id("a", DataType::STRING, 1), | ||
| field_with_id("b", DataType::INTEGER, 3), | ||
| field_with_id("c", DataType::STRING, 2), | ||
| ]) | ||
| .unwrap(); | ||
| assert_eq!(find_max_column_id_in_schema(&schema), 3); | ||
| } | ||
|
|
||
| #[test] | ||
| fn find_max_column_id_nested_struct() { | ||
| let inner = DataType::Struct(Box::new( | ||
| StructType::try_new(vec![ | ||
| field_with_id("x", DataType::STRING, 7), | ||
| field_with_id("y", DataType::STRING, 5), | ||
| ]) | ||
| .unwrap(), | ||
| )); | ||
| let schema = StructType::try_new(vec![ | ||
| field_with_id("outer", inner, 2), | ||
| field_with_id("sibling", DataType::STRING, 3), | ||
| ]) | ||
| .unwrap(); | ||
| assert_eq!(find_max_column_id_in_schema(&schema), 7); | ||
| } | ||
|
|
||
| #[test] | ||
| fn find_max_column_id_array_and_map_recurse_into_element_types() { | ||
| let array_elem_struct = DataType::Array(Box::new(ArrayType::new( | ||
| DataType::Struct(Box::new( | ||
| StructType::try_new(vec![field_with_id("deep", DataType::STRING, 42)]).unwrap(), | ||
| )), | ||
| true, | ||
| ))); | ||
| let map_ty = DataType::Map(Box::new(MapType::new( | ||
| DataType::STRING, | ||
| DataType::Struct(Box::new( | ||
| StructType::try_new(vec![field_with_id("inside", DataType::STRING, 9)]).unwrap(), | ||
| )), | ||
| false, | ||
| ))); | ||
| let schema = StructType::try_new(vec![ | ||
| field_with_id("arr", array_elem_struct, 1), | ||
| field_with_id("m", map_ty, 2), | ||
| ]) | ||
| .unwrap(); | ||
| assert_eq!(find_max_column_id_in_schema(&schema), 42); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we have this already as a TableProperty? It should parse it for us already, though I'm not sure it does the non-negative check.