diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 942161bfde4eb..1edda6fb859a3 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -1356,10 +1356,106 @@ impl RocksStoreDetails for RocksMetaStoreDetails { } } +pub struct CachedTables { + tables: Mutex>>>, +} + +impl CachedTables { + pub fn new() -> Self { + Self { + tables: Mutex::new(None), + } + } + + pub fn reset(&self) { + *self.tables.lock().unwrap() = None; + } + + pub fn get(&self) -> Option>> { + self.tables.lock().unwrap().clone() + } + + pub fn set(&self, tables: Arc>) { + *self.tables.lock().unwrap() = Some(tables); + } + + /// Surgically update a single entry in the cache by table ID. + /// The closure receives a mutable reference to the entry for in-place mutation. + /// If the entry is not found or the cache is not populated, resets the cache. + #[inline(always)] + pub fn update_table_by_id_or_reset(&self, table_id: u64, f: F) + where + F: FnOnce(&mut TablePath), + { + let mut guard = self.tables.lock().unwrap(); + let Some(cached) = guard.as_mut() else { + return; + }; + + // Check existence on the immutable Arc first to avoid a wasted + // deep-clone from make_mut when the entry is absent. + let Some(idx) = cached.iter().position(|tp| tp.table.get_id() == table_id) else { + log::warn!( + "Table with id: {} not found in cache, completely resetting cache", + table_id + ); + + *guard = None; + return; + }; + + let tables = Arc::make_mut(cached); + f(&mut tables[idx]); + + // Remove entry if it's no longer ready (cache only stores ready tables) + if !tables[idx].table.get_row().is_ready() { + tables.swap_remove(idx); + } + } + + pub fn upsert_table_by_id(&self, table_id: u64, entry: TablePath) { + let mut guard = self.tables.lock().unwrap(); + let Some(cached) = guard.as_mut() else { + return; + }; + + let tables = Arc::make_mut(cached); + if let Some(idx) = tables.iter().position(|tp| tp.table.get_id() == table_id) { + tables.remove(idx); + } + + // Paranoid check + if entry.table.get_row().is_ready() { + tables.push(entry); + } else { + debug_assert!( + false, + "upsert_table_by_id called with non-ready table (id: {})", + table_id + ); + } + } + + pub fn remove_by_table_id_or_reset(&self, table_id: u64) { + let mut guard = self.tables.lock().unwrap(); + let Some(cached) = guard.as_mut() else { + return; + }; + + let tables = Arc::make_mut(cached); + let Some(idx) = tables.iter().position(|tp| tp.table.get_id() == table_id) else { + *guard = None; + return; + }; + + tables.remove(idx); + } +} + #[derive(Clone)] pub struct RocksMetaStore { store: Arc, - cached_tables: Arc>>>>, + cached_tables: Arc, disk_space_cache: Arc, SystemTime)>>>, upload_loop: Arc, } @@ -1382,14 +1478,14 @@ impl RocksMetaStore { fn new_from_store(store: Arc) -> Arc { Arc::new(Self { store, - cached_tables: Arc::new(Mutex::new(None)), + cached_tables: Arc::new(CachedTables::new()), disk_space_cache: Arc::new(RwLock::new(None)), upload_loop: Arc::new(WorkerLoop::new("Metastore upload")), }) } pub fn reset_cached_tables(&self) { - *self.cached_tables.lock().unwrap() = None; + self.cached_tables.reset(); } pub async fn load_from_dump( @@ -1917,7 +2013,7 @@ impl MetaStore for RocksMetaStore { ) -> Result, CubeError> { self.write_operation("create_schema", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let table = SchemaRocksTable::new(db_ref.clone()); if if_not_exists { @@ -1980,7 +2076,7 @@ impl MetaStore for RocksMetaStore { ) -> Result, CubeError> { self.write_operation("rename_schema", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let table = SchemaRocksTable::new(db_ref.clone()); let existing_keys = @@ -2006,7 +2102,7 @@ impl MetaStore for RocksMetaStore { ) -> Result, CubeError> { self.write_operation("rename_schema_by_id", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let table = SchemaRocksTable::new(db_ref.clone()); @@ -2024,7 +2120,7 @@ impl MetaStore for RocksMetaStore { async fn delete_schema(&self, schema_name: String) -> Result<(), CubeError> { self.write_operation("delete_schema", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let table = SchemaRocksTable::new(db_ref.clone()); let existing_keys = @@ -2053,7 +2149,7 @@ impl MetaStore for RocksMetaStore { async fn delete_schema_by_id(&self, schema_id: u64) -> Result<(), CubeError> { self.write_operation("delete_schema_by_id", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); let tables = TableRocksTable::new(db_ref.clone()).all_rows()?; if tables @@ -2120,13 +2216,15 @@ impl MetaStore for RocksMetaStore { ) -> Result, CubeError> { self.write_operation("create_table", move |db_ref, batch_pipe| { batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + metastore.cached_tables.reset(); }); + if drop_if_exists { if let Ok(exists_table) = get_table_impl(db_ref.clone(), schema_name.clone(), table_name.clone()) { RocksMetaStore::drop_table_impl(exists_table.get_id(), db_ref.clone(), batch_pipe)?; } } + let rocks_table = TableRocksTable::new(db_ref.clone()); let rocks_index = IndexRocksTable::new(db_ref.clone()); let rocks_schema = SchemaRocksTable::new(db_ref.clone()); @@ -2316,11 +2414,25 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn table_ready(&self, id: u64, is_ready: bool) -> Result, CubeError> { self.write_operation("table_ready", move |db_ref, batch_pipe| { - batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; - }); let rocks_table = TableRocksTable::new(db_ref.clone()); - Ok(rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?) + let entry = + rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?; + + if is_ready { + let schema = SchemaRocksTable::new(db_ref) + .get_row_or_not_found(entry.get_row().get_schema_id())?; + let table_path = TablePath::new(Arc::new(schema), entry.clone()); + + batch_pipe.set_post_commit_callback(move |metastore| { + metastore.cached_tables.upsert_table_by_id(id, table_path); + }); + } else { + batch_pipe.set_post_commit_callback(move |metastore| { + metastore.cached_tables.remove_by_table_id_or_reset(id); + }); + } + + Ok(entry) }) .await } @@ -2328,11 +2440,19 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn seal_table(&self, id: u64) -> Result, CubeError> { self.write_operation("seal_table", move |db_ref, batch_pipe| { - batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; - }); let rocks_table = TableRocksTable::new(db_ref.clone()); - Ok(rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?) + let entry = rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?; + + let table_to_move = entry.get_row().clone(); + batch_pipe.set_post_commit_callback(move |metastore| { + metastore + .cached_tables + .update_table_by_id_or_reset(id, |tp| { + tp.table = IdRow::new(tp.table.get_id(), table_to_move); + }); + }); + + Ok(entry) }) .await } @@ -2359,16 +2479,24 @@ impl MetaStore for RocksMetaStore { self.write_operation( "update_location_download_size", move |db_ref, batch_pipe| { - batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; - }); - let rocks_table = TableRocksTable::new(db_ref.clone()); - rocks_table.update_with_res_fn( + let entry = rocks_table.update_with_res_fn( id, |r| r.update_location_download_size(&location, download_size), batch_pipe, - ) + )?; + + let table_to_move = entry.get_row().clone(); + + batch_pipe.set_post_commit_callback(move |metastore| { + metastore + .cached_tables + .update_table_by_id_or_reset(id, |tp| { + tp.table = IdRow::new(tp.table.get_id(), table_to_move); + }); + }); + + Ok(entry) }, ) .await @@ -2423,18 +2551,17 @@ impl MetaStore for RocksMetaStore { } else { let cache = self.cached_tables.clone(); - if let Some(t) = cube_ext::spawn_blocking(move || cache.lock().unwrap().clone()).await? - { + if let Some(t) = cube_ext::spawn_blocking(move || cache.get()).await? { return Ok(t); } let cache = self.cached_tables.clone(); // Can't do read_operation_out_of_queue as we need to update cache on the same thread where it's dropped self.read_operation("get_tables_with_path", move |db_ref| { - let cached_tables = { cache.lock().unwrap().clone() }; - if let Some(t) = cached_tables { + if let Some(t) = cache.get() { return Ok(t); } + let table_rocks_table = TableRocksTable::new(db_ref.clone()); let mut tables = Vec::new(); for t in table_rocks_table.scan_all_rows()? { @@ -2450,8 +2577,7 @@ impl MetaStore for RocksMetaStore { |table, schema| TablePath::new(schema, table), )?); - let to_cache = tables.clone(); - *cache.lock().unwrap() = Some(to_cache); + cache.set(tables.clone()); Ok(tables) }) @@ -2493,9 +2619,12 @@ impl MetaStore for RocksMetaStore { #[tracing::instrument(level = "trace", skip(self))] async fn drop_table(&self, table_id: u64) -> Result, CubeError> { self.write_operation("drop_table", move |db_ref, batch_pipe| { - batch_pipe.set_post_commit_callback(|metastore| { - *metastore.cached_tables.lock().unwrap() = None; + batch_pipe.set_post_commit_callback(move |metastore| { + metastore + .cached_tables + .remove_by_table_id_or_reset(table_id); }); + RocksMetaStore::drop_table_impl(table_id, db_ref, batch_pipe) }) .await