Skip to content

Commit 56fda82

Browse files
authored
feat!(catalog): adding support for purge_table (#2232)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #2133 ## What changes are included in this PR? - Add catalog/utils.rs to provide helpers to delete table data using file_io and table_metadata - Add new API `purge_table` to `Catalog` trait and add default implementation - Implement purge_table for S3TableCatalog and RestCatalog <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? Added new tests in table_suite <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? -->
1 parent fea5906 commit 56fda82

File tree

12 files changed

+337
-29
lines changed

12 files changed

+337
-29
lines changed

crates/catalog/glue/src/catalog.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,17 @@ impl Catalog for GlueCatalog {
659659
Ok(())
660660
}
661661

662+
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
663+
let table_info = self.load_table(table).await?;
664+
self.drop_table(table).await?;
665+
iceberg::drop_table_data(
666+
table_info.file_io(),
667+
table_info.metadata(),
668+
table_info.metadata_location(),
669+
)
670+
.await
671+
}
672+
662673
/// Asynchronously checks the existence of a specified table
663674
/// in the database.
664675
///

crates/catalog/hms/src/catalog.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,17 @@ impl Catalog for HmsCatalog {
604604
Ok(())
605605
}
606606

607+
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
608+
let table_info = self.load_table(table).await?;
609+
self.drop_table(table).await?;
610+
iceberg::drop_table_data(
611+
table_info.file_io(),
612+
table_info.metadata(),
613+
table_info.metadata_location(),
614+
)
615+
.await
616+
}
617+
607618
/// Asynchronously checks the existence of a specified table
608619
/// in the database.
609620
///

crates/catalog/loader/tests/common/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ pub fn assert_map_contains(expected: &HashMap<String, String>, actual: &HashMap<
335335
pub async fn cleanup_namespace_dyn(catalog: &dyn Catalog, namespace: &NamespaceIdent) {
336336
if let Ok(tables) = catalog.list_tables(namespace).await {
337337
for table in tables {
338-
let _ = catalog.drop_table(&table).await;
338+
let _ = catalog.purge_table(&table).await;
339339
}
340340
}
341341
let _ = catalog.drop_namespace(namespace).await;

crates/catalog/loader/tests/table_suite.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,3 +274,80 @@ async fn test_catalog_drop_table_missing_errors(#[case] kind: CatalogKind) -> Re
274274
assert!(catalog.drop_table(&table_ident).await.is_err());
275275
Ok(())
276276
}
277+
278+
// Common behavior: purge_table removes the table from the catalog.
279+
#[rstest]
280+
#[case::rest_catalog(CatalogKind::Rest)]
281+
#[case::glue_catalog(CatalogKind::Glue)]
282+
#[case::hms_catalog(CatalogKind::Hms)]
283+
#[case::sql_catalog(CatalogKind::Sql)]
284+
#[case::s3tables_catalog(CatalogKind::S3Tables)]
285+
#[case::memory_catalog(CatalogKind::Memory)]
286+
#[tokio::test]
287+
async fn test_catalog_purge_table(#[case] kind: CatalogKind) -> Result<()> {
288+
let Some(harness) = load_catalog(kind).await else {
289+
return Ok(());
290+
};
291+
let catalog = harness.catalog;
292+
let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
293+
"catalog_purge_table",
294+
harness.label
295+
));
296+
297+
cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
298+
catalog.create_namespace(&namespace, HashMap::new()).await?;
299+
300+
let table_name = normalize_test_name_with_parts!("catalog_purge_table", harness.label, "table");
301+
let table = catalog
302+
.create_table(&namespace, table_creation(table_name))
303+
.await?;
304+
let ident = table.identifier().clone();
305+
306+
assert!(catalog.table_exists(&ident).await?);
307+
308+
// Capture metadata location and file_io before purge so we can verify
309+
// that the underlying files are actually deleted.
310+
let metadata_location = table.metadata_location().map(|s| s.to_string());
311+
let file_io = table.file_io().clone();
312+
313+
catalog.purge_table(&ident).await?;
314+
assert!(!catalog.table_exists(&ident).await?);
315+
316+
if let Some(location) = &metadata_location {
317+
assert!(
318+
!file_io.exists(location).await?,
319+
"Metadata file should have been deleted after purge"
320+
);
321+
}
322+
323+
catalog.drop_namespace(&namespace).await?;
324+
325+
Ok(())
326+
}
327+
328+
// Common behavior: purging a missing table should error.
329+
#[rstest]
330+
#[case::rest_catalog(CatalogKind::Rest)]
331+
#[case::glue_catalog(CatalogKind::Glue)]
332+
#[case::hms_catalog(CatalogKind::Hms)]
333+
#[case::sql_catalog(CatalogKind::Sql)]
334+
#[case::s3tables_catalog(CatalogKind::S3Tables)]
335+
#[case::memory_catalog(CatalogKind::Memory)]
336+
#[tokio::test]
337+
async fn test_catalog_purge_table_missing_errors(#[case] kind: CatalogKind) -> Result<()> {
338+
let Some(harness) = load_catalog(kind).await else {
339+
return Ok(());
340+
};
341+
let catalog = harness.catalog;
342+
let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
343+
"catalog_purge_table_missing_errors",
344+
harness.label
345+
));
346+
347+
cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
348+
catalog.create_namespace(&namespace, HashMap::new()).await?;
349+
350+
let table_ident = TableIdent::new(namespace.clone(), "missing".to_string());
351+
assert!(catalog.purge_table(&table_ident).await.is_err());
352+
Ok(())
353+
}

crates/catalog/rest/src/catalog.rs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,35 @@ impl RestCatalog {
363363
}
364364
}
365365

366+
/// Sends a DELETE request for the given table, optionally requesting purge.
367+
async fn delete_table(&self, table: &TableIdent, purge: bool) -> Result<()> {
368+
let context = self.context().await?;
369+
370+
let mut request_builder = context
371+
.client
372+
.request(Method::DELETE, context.config.table_endpoint(table));
373+
374+
if purge {
375+
request_builder = request_builder.query(&[("purgeRequested", "true")]);
376+
}
377+
378+
let request = request_builder.build()?;
379+
let http_response = context.client.query_catalog(request).await?;
380+
381+
match http_response.status() {
382+
StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
383+
StatusCode::NOT_FOUND => Err(Error::new(
384+
ErrorKind::TableNotFound,
385+
"Tried to drop a table that does not exist",
386+
)),
387+
_ => Err(deserialize_unexpected_catalog_error(
388+
http_response,
389+
context.client.disable_header_redaction(),
390+
)
391+
.await),
392+
}
393+
}
394+
366395
/// Gets the [`RestContext`] from the catalog.
367396
async fn context(&self) -> Result<&RestContext> {
368397
self.ctx
@@ -828,27 +857,13 @@ impl Catalog for RestCatalog {
828857

829858
/// Drop a table from the catalog.
830859
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
831-
let context = self.context().await?;
832-
833-
let request = context
834-
.client
835-
.request(Method::DELETE, context.config.table_endpoint(table))
836-
.build()?;
837-
838-
let http_response = context.client.query_catalog(request).await?;
860+
self.delete_table(table, false).await
861+
}
839862

840-
match http_response.status() {
841-
StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
842-
StatusCode::NOT_FOUND => Err(Error::new(
843-
ErrorKind::TableNotFound,
844-
"Tried to drop a table that does not exist",
845-
)),
846-
_ => Err(deserialize_unexpected_catalog_error(
847-
http_response,
848-
context.client.disable_header_redaction(),
849-
)
850-
.await),
851-
}
863+
/// Drop a table from the catalog and purge its data by sending
864+
/// `purgeRequested=true` to the REST server.
865+
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
866+
self.delete_table(table, true).await
852867
}
853868

854869
/// Check if a table exists in the catalog.

crates/catalog/s3tables/src/catalog.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -562,15 +562,18 @@ impl Catalog for S3TablesCatalog {
562562
Ok(self.load_table_with_version_token(table_ident).await?.0)
563563
}
564564

565-
/// Drops an existing table from the s3tables catalog.
565+
/// Not supported for S3Tables. Use `purge_table` instead.
566566
///
567-
/// Validates the table identifier and then deletes the corresponding
568-
/// table from the s3tables catalog.
569-
///
570-
/// This function can return an error in the following situations:
571-
/// - Errors from the underlying database deletion process, converted using
572-
/// `from_aws_sdk_error`.
573-
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
567+
/// S3 Tables doesn't support soft delete, so dropping a table will permanently remove it from the catalog.
568+
async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
569+
Err(Error::new(
570+
ErrorKind::FeatureUnsupported,
571+
"drop_table is not supported for S3Tables; use purge_table instead",
572+
))
573+
}
574+
575+
/// Purge a table from the S3 Tables catalog.
576+
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
574577
let req = self
575578
.s3tables_client
576579
.delete_table()

crates/catalog/sql/src/catalog.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,17 @@ impl Catalog for SqlCatalog {
757757
Ok(())
758758
}
759759

760+
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
761+
let table_info = self.load_table(table).await?;
762+
self.drop_table(table).await?;
763+
iceberg::drop_table_data(
764+
table_info.file_io(),
765+
table_info.metadata(),
766+
table_info.metadata_location(),
767+
)
768+
.await
769+
}
770+
760771
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
761772
if !self.table_exists(identifier).await? {
762773
return no_such_table_err(identifier);

crates/iceberg/src/catalog/memory/catalog.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,17 @@ impl Catalog for MemoryCatalog {
326326
Ok(())
327327
}
328328

329+
async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
330+
let table_info = self.load_table(table_ident).await?;
331+
self.drop_table(table_ident).await?;
332+
crate::catalog::utils::drop_table_data(
333+
table_info.file_io(),
334+
table_info.metadata(),
335+
table_info.metadata_location(),
336+
)
337+
.await
338+
}
339+
329340
/// Check if a table exists in the catalog.
330341
async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
331342
let root_namespace_state = self.root_namespace_state.lock().await;

crates/iceberg/src/catalog/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
pub mod memory;
2121
mod metadata_location;
22+
pub(crate) mod utils;
2223

2324
use std::collections::HashMap;
2425
use std::fmt::{Debug, Display};
@@ -98,6 +99,14 @@ pub trait Catalog: Debug + Sync + Send {
9899
/// Drop a table from the catalog, or returns error if it doesn't exist.
99100
async fn drop_table(&self, table: &TableIdent) -> Result<()>;
100101

102+
/// Drop a table from the catalog and delete the underlying table data.
103+
///
104+
/// Implementations should load the table metadata, drop the table
105+
/// from the catalog, then delete all associated data and metadata files.
106+
/// The [`drop_table_data`](utils::drop_table_data) utility function can
107+
/// be used for the file cleanup step.
108+
async fn purge_table(&self, table: &TableIdent) -> Result<()>;
109+
101110
/// Check if a table exists in the catalog.
102111
async fn table_exists(&self, table: &TableIdent) -> Result<bool>;
103112

0 commit comments

Comments
 (0)