Skip to content

Commit 95c9f2e

Browse files
committed
feat(cubestore): Add upsert_table_by_id for table_ready cache sync
1 parent 84b5ea0 commit 95c9f2e

1 file changed

Lines changed: 37 additions & 8 deletions

File tree

  • rust/cubestore/cubestore/src/metastore

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,6 +1411,29 @@ impl CachedTables {
14111411
}
14121412
}
14131413

1414+
pub fn upsert_table_by_id(&self, table_id: u64, entry: TablePath) {
1415+
let mut guard = self.tables.lock().unwrap();
1416+
let Some(cached) = guard.as_mut() else {
1417+
return;
1418+
};
1419+
1420+
let tables = Arc::make_mut(cached);
1421+
if let Some(idx) = tables.iter().position(|tp| tp.table.get_id() == table_id) {
1422+
tables.remove(idx);
1423+
}
1424+
1425+
// Paranoid check
1426+
if entry.table.get_row().is_ready() {
1427+
tables.push(entry);
1428+
} else {
1429+
debug_assert!(
1430+
false,
1431+
"upsert_table_by_id called with non-ready table (id: {})",
1432+
table_id
1433+
);
1434+
}
1435+
}
1436+
14141437
pub fn remove_by_table_id_or_reset(&self, table_id: u64) {
14151438
let mut guard = self.tables.lock().unwrap();
14161439
let Some(cached) = guard.as_mut() else {
@@ -2199,6 +2222,7 @@ impl MetaStore for RocksMetaStore {
21992222
RocksMetaStore::drop_table_impl(exists_table.get_id(), db_ref.clone(), batch_pipe)?;
22002223
}
22012224
}
2225+
22022226
let rocks_table = TableRocksTable::new(db_ref.clone());
22032227
let rocks_index = IndexRocksTable::new(db_ref.clone());
22042228
let rocks_schema = SchemaRocksTable::new(db_ref.clone());
@@ -2392,14 +2416,19 @@ impl MetaStore for RocksMetaStore {
23922416
let entry =
23932417
rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?;
23942418

2395-
let table_to_move = entry.get_row().clone();
2396-
batch_pipe.set_post_commit_callback(move |metastore| {
2397-
metastore
2398-
.cached_tables
2399-
.update_table_by_id_or_reset(id, |tp| {
2400-
tp.table = IdRow::new(tp.table.get_id(), table_to_move);
2401-
});
2402-
});
2419+
if is_ready {
2420+
let schema = SchemaRocksTable::new(db_ref)
2421+
.get_row_or_not_found(entry.get_row().get_schema_id())?;
2422+
let table_path = TablePath::new(Arc::new(schema), entry.clone());
2423+
2424+
batch_pipe.set_post_commit_callback(move |metastore| {
2425+
metastore.cached_tables.upsert_table_by_id(id, table_path);
2426+
});
2427+
} else {
2428+
batch_pipe.set_post_commit_callback(move |metastore| {
2429+
metastore.cached_tables.remove_by_table_id_or_reset(id);
2430+
});
2431+
}
24032432

24042433
Ok(entry)
24052434
})

0 commit comments

Comments
 (0)