@@ -1356,10 +1356,106 @@ impl RocksStoreDetails for RocksMetaStoreDetails {
13561356 }
13571357}
13581358
1359+ pub struct CachedTables {
1360+ tables : Mutex < Option < Arc < Vec < TablePath > > > > ,
1361+ }
1362+
1363+ impl CachedTables {
1364+ pub fn new ( ) -> Self {
1365+ Self {
1366+ tables : Mutex :: new ( None ) ,
1367+ }
1368+ }
1369+
1370+ pub fn reset ( & self ) {
1371+ * self . tables . lock ( ) . unwrap ( ) = None ;
1372+ }
1373+
1374+ pub fn get ( & self ) -> Option < Arc < Vec < TablePath > > > {
1375+ self . tables . lock ( ) . unwrap ( ) . clone ( )
1376+ }
1377+
1378+ pub fn set ( & self , tables : Arc < Vec < TablePath > > ) {
1379+ * self . tables . lock ( ) . unwrap ( ) = Some ( tables) ;
1380+ }
1381+
1382+ /// Surgically update a single entry in the cache by table ID.
1383+ /// The closure receives a mutable reference to the entry for in-place mutation.
1384+ /// If the entry is not found or the cache is not populated, resets the cache.
1385+ #[ inline( always) ]
1386+ pub fn update_table_by_id_or_reset < F > ( & self , table_id : u64 , f : F )
1387+ where
1388+ F : FnOnce ( & mut TablePath ) ,
1389+ {
1390+ let mut guard = self . tables . lock ( ) . unwrap ( ) ;
1391+ let Some ( cached) = guard. as_mut ( ) else {
1392+ return ;
1393+ } ;
1394+
1395+ // Check existence on the immutable Arc first to avoid a wasted
1396+ // deep-clone from make_mut when the entry is absent.
1397+ let Some ( idx) = cached. iter ( ) . position ( |tp| tp. table . get_id ( ) == table_id) else {
1398+ log:: warn!(
1399+ "Table with id: {} not found in cache, completely resetting cache" ,
1400+ table_id
1401+ ) ;
1402+
1403+ * guard = None ;
1404+ return ;
1405+ } ;
1406+
1407+ let tables = Arc :: make_mut ( cached) ;
1408+ f ( & mut tables[ idx] ) ;
1409+
1410+ // Remove entry if it's no longer ready (cache only stores ready tables)
1411+ if !tables[ idx] . table . get_row ( ) . is_ready ( ) {
1412+ tables. swap_remove ( idx) ;
1413+ }
1414+ }
1415+
1416+ pub fn upsert_table_by_id ( & self , table_id : u64 , entry : TablePath ) {
1417+ let mut guard = self . tables . lock ( ) . unwrap ( ) ;
1418+ let Some ( cached) = guard. as_mut ( ) else {
1419+ return ;
1420+ } ;
1421+
1422+ let tables = Arc :: make_mut ( cached) ;
1423+ if let Some ( idx) = tables. iter ( ) . position ( |tp| tp. table . get_id ( ) == table_id) {
1424+ tables. remove ( idx) ;
1425+ }
1426+
1427+ // Paranoid check
1428+ if entry. table . get_row ( ) . is_ready ( ) {
1429+ tables. push ( entry) ;
1430+ } else {
1431+ debug_assert ! (
1432+ false ,
1433+ "upsert_table_by_id called with non-ready table (id: {})" ,
1434+ table_id
1435+ ) ;
1436+ }
1437+ }
1438+
1439+ pub fn remove_by_table_id_or_reset ( & self , table_id : u64 ) {
1440+ let mut guard = self . tables . lock ( ) . unwrap ( ) ;
1441+ let Some ( cached) = guard. as_mut ( ) else {
1442+ return ;
1443+ } ;
1444+
1445+ let tables = Arc :: make_mut ( cached) ;
1446+ let Some ( idx) = tables. iter ( ) . position ( |tp| tp. table . get_id ( ) == table_id) else {
1447+ * guard = None ;
1448+ return ;
1449+ } ;
1450+
1451+ tables. remove ( idx) ;
1452+ }
1453+ }
1454+
13591455#[ derive( Clone ) ]
13601456pub struct RocksMetaStore {
13611457 store : Arc < RocksStore > ,
1362- cached_tables : Arc < Mutex < Option < Arc < Vec < TablePath > > > > > ,
1458+ cached_tables : Arc < CachedTables > ,
13631459 disk_space_cache : Arc < RwLock < Option < ( HashMap < String , u64 > , SystemTime ) > > > ,
13641460 upload_loop : Arc < WorkerLoop > ,
13651461}
@@ -1382,14 +1478,14 @@ impl RocksMetaStore {
13821478 fn new_from_store ( store : Arc < RocksStore > ) -> Arc < Self > {
13831479 Arc :: new ( Self {
13841480 store,
1385- cached_tables : Arc :: new ( Mutex :: new ( None ) ) ,
1481+ cached_tables : Arc :: new ( CachedTables :: new ( ) ) ,
13861482 disk_space_cache : Arc :: new ( RwLock :: new ( None ) ) ,
13871483 upload_loop : Arc :: new ( WorkerLoop :: new ( "Metastore upload" ) ) ,
13881484 } )
13891485 }
13901486
13911487 pub fn reset_cached_tables ( & self ) {
1392- * self . cached_tables . lock ( ) . unwrap ( ) = None ;
1488+ self . cached_tables . reset ( ) ;
13931489 }
13941490
13951491 pub async fn load_from_dump (
@@ -1917,7 +2013,7 @@ impl MetaStore for RocksMetaStore {
19172013 ) -> Result < IdRow < Schema > , CubeError > {
19182014 self . write_operation ( "create_schema" , move |db_ref, batch_pipe| {
19192015 batch_pipe. set_post_commit_callback ( |metastore| {
1920- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2016+ metastore. cached_tables . reset ( ) ;
19212017 } ) ;
19222018 let table = SchemaRocksTable :: new ( db_ref. clone ( ) ) ;
19232019 if if_not_exists {
@@ -1980,7 +2076,7 @@ impl MetaStore for RocksMetaStore {
19802076 ) -> Result < IdRow < Schema > , CubeError > {
19812077 self . write_operation ( "rename_schema" , move |db_ref, batch_pipe| {
19822078 batch_pipe. set_post_commit_callback ( |metastore| {
1983- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2079+ metastore. cached_tables . reset ( ) ;
19842080 } ) ;
19852081 let table = SchemaRocksTable :: new ( db_ref. clone ( ) ) ;
19862082 let existing_keys =
@@ -2006,7 +2102,7 @@ impl MetaStore for RocksMetaStore {
20062102 ) -> Result < IdRow < Schema > , CubeError > {
20072103 self . write_operation ( "rename_schema_by_id" , move |db_ref, batch_pipe| {
20082104 batch_pipe. set_post_commit_callback ( |metastore| {
2009- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2105+ metastore. cached_tables . reset ( ) ;
20102106 } ) ;
20112107 let table = SchemaRocksTable :: new ( db_ref. clone ( ) ) ;
20122108
@@ -2024,7 +2120,7 @@ impl MetaStore for RocksMetaStore {
20242120 async fn delete_schema ( & self , schema_name : String ) -> Result < ( ) , CubeError > {
20252121 self . write_operation ( "delete_schema" , move |db_ref, batch_pipe| {
20262122 batch_pipe. set_post_commit_callback ( |metastore| {
2027- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2123+ metastore. cached_tables . reset ( ) ;
20282124 } ) ;
20292125 let table = SchemaRocksTable :: new ( db_ref. clone ( ) ) ;
20302126 let existing_keys =
@@ -2053,7 +2149,7 @@ impl MetaStore for RocksMetaStore {
20532149 async fn delete_schema_by_id ( & self , schema_id : u64 ) -> Result < ( ) , CubeError > {
20542150 self . write_operation ( "delete_schema_by_id" , move |db_ref, batch_pipe| {
20552151 batch_pipe. set_post_commit_callback ( |metastore| {
2056- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2152+ metastore. cached_tables . reset ( ) ;
20572153 } ) ;
20582154 let tables = TableRocksTable :: new ( db_ref. clone ( ) ) . all_rows ( ) ?;
20592155 if tables
@@ -2120,13 +2216,15 @@ impl MetaStore for RocksMetaStore {
21202216 ) -> Result < IdRow < Table > , CubeError > {
21212217 self . write_operation ( "create_table" , move |db_ref, batch_pipe| {
21222218 batch_pipe. set_post_commit_callback ( |metastore| {
2123- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2219+ metastore. cached_tables . reset ( ) ;
21242220 } ) ;
2221+
21252222 if drop_if_exists {
21262223 if let Ok ( exists_table) = get_table_impl ( db_ref. clone ( ) , schema_name. clone ( ) , table_name. clone ( ) ) {
21272224 RocksMetaStore :: drop_table_impl ( exists_table. get_id ( ) , db_ref. clone ( ) , batch_pipe) ?;
21282225 }
21292226 }
2227+
21302228 let rocks_table = TableRocksTable :: new ( db_ref. clone ( ) ) ;
21312229 let rocks_index = IndexRocksTable :: new ( db_ref. clone ( ) ) ;
21322230 let rocks_schema = SchemaRocksTable :: new ( db_ref. clone ( ) ) ;
@@ -2316,23 +2414,45 @@ impl MetaStore for RocksMetaStore {
23162414 #[ tracing:: instrument( level = "trace" , skip( self ) ) ]
23172415 async fn table_ready ( & self , id : u64 , is_ready : bool ) -> Result < IdRow < Table > , CubeError > {
23182416 self . write_operation ( "table_ready" , move |db_ref, batch_pipe| {
2319- batch_pipe. set_post_commit_callback ( |metastore| {
2320- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2321- } ) ;
23222417 let rocks_table = TableRocksTable :: new ( db_ref. clone ( ) ) ;
2323- Ok ( rocks_table. update_with_fn ( id, |r| r. update_is_ready ( is_ready) , batch_pipe) ?)
2418+ let entry =
2419+ rocks_table. update_with_fn ( id, |r| r. update_is_ready ( is_ready) , batch_pipe) ?;
2420+
2421+ if is_ready {
2422+ let schema = SchemaRocksTable :: new ( db_ref)
2423+ . get_row_or_not_found ( entry. get_row ( ) . get_schema_id ( ) ) ?;
2424+ let table_path = TablePath :: new ( Arc :: new ( schema) , entry. clone ( ) ) ;
2425+
2426+ batch_pipe. set_post_commit_callback ( move |metastore| {
2427+ metastore. cached_tables . upsert_table_by_id ( id, table_path) ;
2428+ } ) ;
2429+ } else {
2430+ batch_pipe. set_post_commit_callback ( move |metastore| {
2431+ metastore. cached_tables . remove_by_table_id_or_reset ( id) ;
2432+ } ) ;
2433+ }
2434+
2435+ Ok ( entry)
23242436 } )
23252437 . await
23262438 }
23272439
23282440 #[ tracing:: instrument( level = "trace" , skip( self ) ) ]
23292441 async fn seal_table ( & self , id : u64 ) -> Result < IdRow < Table > , CubeError > {
23302442 self . write_operation ( "seal_table" , move |db_ref, batch_pipe| {
2331- batch_pipe. set_post_commit_callback ( |metastore| {
2332- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2333- } ) ;
23342443 let rocks_table = TableRocksTable :: new ( db_ref. clone ( ) ) ;
2335- Ok ( rocks_table. update_with_fn ( id, |r| r. update_sealed ( true ) , batch_pipe) ?)
2444+ let entry = rocks_table. update_with_fn ( id, |r| r. update_sealed ( true ) , batch_pipe) ?;
2445+
2446+ let table_to_move = entry. get_row ( ) . clone ( ) ;
2447+ batch_pipe. set_post_commit_callback ( move |metastore| {
2448+ metastore
2449+ . cached_tables
2450+ . update_table_by_id_or_reset ( id, |tp| {
2451+ tp. table = IdRow :: new ( tp. table . get_id ( ) , table_to_move) ;
2452+ } ) ;
2453+ } ) ;
2454+
2455+ Ok ( entry)
23362456 } )
23372457 . await
23382458 }
@@ -2359,16 +2479,24 @@ impl MetaStore for RocksMetaStore {
23592479 self . write_operation (
23602480 "update_location_download_size" ,
23612481 move |db_ref, batch_pipe| {
2362- batch_pipe. set_post_commit_callback ( |metastore| {
2363- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2364- } ) ;
2365-
23662482 let rocks_table = TableRocksTable :: new ( db_ref. clone ( ) ) ;
2367- rocks_table. update_with_res_fn (
2483+ let entry = rocks_table. update_with_res_fn (
23682484 id,
23692485 |r| r. update_location_download_size ( & location, download_size) ,
23702486 batch_pipe,
2371- )
2487+ ) ?;
2488+
2489+ let table_to_move = entry. get_row ( ) . clone ( ) ;
2490+
2491+ batch_pipe. set_post_commit_callback ( move |metastore| {
2492+ metastore
2493+ . cached_tables
2494+ . update_table_by_id_or_reset ( id, |tp| {
2495+ tp. table = IdRow :: new ( tp. table . get_id ( ) , table_to_move) ;
2496+ } ) ;
2497+ } ) ;
2498+
2499+ Ok ( entry)
23722500 } ,
23732501 )
23742502 . await
@@ -2423,18 +2551,17 @@ impl MetaStore for RocksMetaStore {
24232551 } else {
24242552 let cache = self . cached_tables . clone ( ) ;
24252553
2426- if let Some ( t) = cube_ext:: spawn_blocking ( move || cache. lock ( ) . unwrap ( ) . clone ( ) ) . await ?
2427- {
2554+ if let Some ( t) = cube_ext:: spawn_blocking ( move || cache. get ( ) ) . await ? {
24282555 return Ok ( t) ;
24292556 }
24302557
24312558 let cache = self . cached_tables . clone ( ) ;
24322559 // Can't do read_operation_out_of_queue as we need to update cache on the same thread where it's dropped
24332560 self . read_operation ( "get_tables_with_path" , move |db_ref| {
2434- let cached_tables = { cache. lock ( ) . unwrap ( ) . clone ( ) } ;
2435- if let Some ( t) = cached_tables {
2561+ if let Some ( t) = cache. get ( ) {
24362562 return Ok ( t) ;
24372563 }
2564+
24382565 let table_rocks_table = TableRocksTable :: new ( db_ref. clone ( ) ) ;
24392566 let mut tables = Vec :: new ( ) ;
24402567 for t in table_rocks_table. scan_all_rows ( ) ? {
@@ -2450,8 +2577,7 @@ impl MetaStore for RocksMetaStore {
24502577 |table, schema| TablePath :: new ( schema, table) ,
24512578 ) ?) ;
24522579
2453- let to_cache = tables. clone ( ) ;
2454- * cache. lock ( ) . unwrap ( ) = Some ( to_cache) ;
2580+ cache. set ( tables. clone ( ) ) ;
24552581
24562582 Ok ( tables)
24572583 } )
@@ -2493,9 +2619,12 @@ impl MetaStore for RocksMetaStore {
24932619 #[ tracing:: instrument( level = "trace" , skip( self ) ) ]
24942620 async fn drop_table ( & self , table_id : u64 ) -> Result < IdRow < Table > , CubeError > {
24952621 self . write_operation ( "drop_table" , move |db_ref, batch_pipe| {
2496- batch_pipe. set_post_commit_callback ( |metastore| {
2497- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2622+ batch_pipe. set_post_commit_callback ( move |metastore| {
2623+ metastore
2624+ . cached_tables
2625+ . remove_by_table_id_or_reset ( table_id) ;
24982626 } ) ;
2627+
24992628 RocksMetaStore :: drop_table_impl ( table_id, db_ref, batch_pipe)
25002629 } )
25012630 . await
0 commit comments