@@ -1356,10 +1356,81 @@ impl RocksStoreDetails for RocksMetaStoreDetails {
13561356 }
13571357}
13581358
1359+ pub struct CachedTables {
1360+ tables : Mutex < Option < Arc < Vec < TablePath > > > > ,
1361+ }
1362+
1363+ impl CachedTables {
1364+ pub fn new ( ) -> Self {
1365+ Self {
1366+ tables : Mutex :: new ( None ) ,
1367+ }
1368+ }
1369+
1370+ pub fn reset ( & self ) {
1371+ * self . tables . lock ( ) . unwrap ( ) = None ;
1372+ }
1373+
1374+ pub fn get ( & self ) -> Option < Arc < Vec < TablePath > > > {
1375+ self . tables . lock ( ) . unwrap ( ) . clone ( )
1376+ }
1377+
1378+ pub fn set ( & self , tables : Arc < Vec < TablePath > > ) {
1379+ * self . tables . lock ( ) . unwrap ( ) = Some ( tables) ;
1380+ }
1381+
1382+ /// Surgically update a single entry in the cache by table ID.
1383+ /// The closure receives a mutable reference to the entry for in-place mutation.
1384+ /// If the entry is not found or the cache is not populated, resets the cache.
1385+ #[ inline( always) ]
1386+ pub fn update_table_by_id_or_reset < F > ( & self , table_id : u64 , f : F )
1387+ where
1388+ F : FnOnce ( & mut TablePath ) ,
1389+ {
1390+ let mut guard = self . tables . lock ( ) . unwrap ( ) ;
1391+ let Some ( cached) = guard. as_mut ( ) else {
1392+ return ;
1393+ } ;
1394+
1395+ let tables = Arc :: make_mut ( cached) ;
1396+ let Some ( entry) = tables. iter_mut ( ) . find ( |tp| tp. table . get_id ( ) == table_id) else {
1397+ log:: warn!(
1398+ "Table with id: {} not found in cache, completely resetting cache" ,
1399+ table_id
1400+ ) ;
1401+
1402+ * guard = None ;
1403+ return ;
1404+ } ;
1405+
1406+ f ( entry) ;
1407+
1408+ // Remove entry if it's no longer ready (cache only stores ready tables)
1409+ if !entry. table . get_row ( ) . is_ready ( ) {
1410+ tables. retain ( |tp| tp. table . get_id ( ) != table_id) ;
1411+ }
1412+ }
1413+
1414+ pub fn remove_by_table_id_or_reset ( & self , table_id : u64 ) {
1415+ let mut guard = self . tables . lock ( ) . unwrap ( ) ;
1416+ let Some ( cached) = guard. as_mut ( ) else {
1417+ return ;
1418+ } ;
1419+
1420+ let tables = Arc :: make_mut ( cached) ;
1421+ let Some ( idx) = tables. iter ( ) . position ( |tp| tp. table . get_id ( ) == table_id) else {
1422+ * guard = None ;
1423+ return ;
1424+ } ;
1425+
1426+ tables. remove ( idx) ;
1427+ }
1428+ }
1429+
13591430#[ derive( Clone ) ]
13601431pub struct RocksMetaStore {
13611432 store : Arc < RocksStore > ,
1362- cached_tables : Arc < Mutex < Option < Arc < Vec < TablePath > > > > > ,
1433+ cached_tables : Arc < CachedTables > ,
13631434 disk_space_cache : Arc < RwLock < Option < ( HashMap < String , u64 > , SystemTime ) > > > ,
13641435 upload_loop : Arc < WorkerLoop > ,
13651436}
@@ -1382,14 +1453,14 @@ impl RocksMetaStore {
13821453 fn new_from_store ( store : Arc < RocksStore > ) -> Arc < Self > {
13831454 Arc :: new ( Self {
13841455 store,
1385- cached_tables : Arc :: new ( Mutex :: new ( None ) ) ,
1456+ cached_tables : Arc :: new ( CachedTables :: new ( ) ) ,
13861457 disk_space_cache : Arc :: new ( RwLock :: new ( None ) ) ,
13871458 upload_loop : Arc :: new ( WorkerLoop :: new ( "Metastore upload" ) ) ,
13881459 } )
13891460 }
13901461
13911462 pub fn reset_cached_tables ( & self ) {
1392- * self . cached_tables . lock ( ) . unwrap ( ) = None ;
1463+ self . cached_tables . reset ( ) ;
13931464 }
13941465
13951466 pub async fn load_from_dump (
@@ -1917,7 +1988,7 @@ impl MetaStore for RocksMetaStore {
19171988 ) -> Result < IdRow < Schema > , CubeError > {
19181989 self . write_operation ( "create_schema" , move |db_ref, batch_pipe| {
19191990 batch_pipe. set_post_commit_callback ( |metastore| {
1920- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
1991+ metastore. cached_tables . reset ( ) ;
19211992 } ) ;
19221993 let table = SchemaRocksTable :: new ( db_ref. clone ( ) ) ;
19231994 if if_not_exists {
@@ -1980,7 +2051,7 @@ impl MetaStore for RocksMetaStore {
19802051 ) -> Result < IdRow < Schema > , CubeError > {
19812052 self . write_operation ( "rename_schema" , move |db_ref, batch_pipe| {
19822053 batch_pipe. set_post_commit_callback ( |metastore| {
1983- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2054+ metastore. cached_tables . reset ( ) ;
19842055 } ) ;
19852056 let table = SchemaRocksTable :: new ( db_ref. clone ( ) ) ;
19862057 let existing_keys =
@@ -2006,7 +2077,7 @@ impl MetaStore for RocksMetaStore {
20062077 ) -> Result < IdRow < Schema > , CubeError > {
20072078 self . write_operation ( "rename_schema_by_id" , move |db_ref, batch_pipe| {
20082079 batch_pipe. set_post_commit_callback ( |metastore| {
2009- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2080+ metastore. cached_tables . reset ( ) ;
20102081 } ) ;
20112082 let table = SchemaRocksTable :: new ( db_ref. clone ( ) ) ;
20122083
@@ -2024,7 +2095,7 @@ impl MetaStore for RocksMetaStore {
20242095 async fn delete_schema ( & self , schema_name : String ) -> Result < ( ) , CubeError > {
20252096 self . write_operation ( "delete_schema" , move |db_ref, batch_pipe| {
20262097 batch_pipe. set_post_commit_callback ( |metastore| {
2027- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2098+ metastore. cached_tables . reset ( ) ;
20282099 } ) ;
20292100 let table = SchemaRocksTable :: new ( db_ref. clone ( ) ) ;
20302101 let existing_keys =
@@ -2053,7 +2124,7 @@ impl MetaStore for RocksMetaStore {
20532124 async fn delete_schema_by_id ( & self , schema_id : u64 ) -> Result < ( ) , CubeError > {
20542125 self . write_operation ( "delete_schema_by_id" , move |db_ref, batch_pipe| {
20552126 batch_pipe. set_post_commit_callback ( |metastore| {
2056- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2127+ metastore. cached_tables . reset ( ) ;
20572128 } ) ;
20582129 let tables = TableRocksTable :: new ( db_ref. clone ( ) ) . all_rows ( ) ?;
20592130 if tables
@@ -2120,8 +2191,9 @@ impl MetaStore for RocksMetaStore {
21202191 ) -> Result < IdRow < Table > , CubeError > {
21212192 self . write_operation ( "create_table" , move |db_ref, batch_pipe| {
21222193 batch_pipe. set_post_commit_callback ( |metastore| {
2123- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2194+ metastore. cached_tables . reset ( ) ;
21242195 } ) ;
2196+
21252197 if drop_if_exists {
21262198 if let Ok ( exists_table) = get_table_impl ( db_ref. clone ( ) , schema_name. clone ( ) , table_name. clone ( ) ) {
21272199 RocksMetaStore :: drop_table_impl ( exists_table. get_id ( ) , db_ref. clone ( ) , batch_pipe) ?;
@@ -2316,23 +2388,40 @@ impl MetaStore for RocksMetaStore {
23162388 #[ tracing:: instrument( level = "trace" , skip( self ) ) ]
23172389 async fn table_ready ( & self , id : u64 , is_ready : bool ) -> Result < IdRow < Table > , CubeError > {
23182390 self . write_operation ( "table_ready" , move |db_ref, batch_pipe| {
2319- batch_pipe. set_post_commit_callback ( |metastore| {
2320- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2321- } ) ;
23222391 let rocks_table = TableRocksTable :: new ( db_ref. clone ( ) ) ;
2323- Ok ( rocks_table. update_with_fn ( id, |r| r. update_is_ready ( is_ready) , batch_pipe) ?)
2392+ let entry =
2393+ rocks_table. update_with_fn ( id, |r| r. update_is_ready ( is_ready) , batch_pipe) ?;
2394+
2395+ let table_to_move = entry. get_row ( ) . clone ( ) ;
2396+ batch_pipe. set_post_commit_callback ( move |metastore| {
2397+ metastore
2398+ . cached_tables
2399+ . update_table_by_id_or_reset ( id, |tp| {
2400+ tp. table = IdRow :: new ( tp. table . get_id ( ) , table_to_move) ;
2401+ } ) ;
2402+ } ) ;
2403+
2404+ Ok ( entry)
23242405 } )
23252406 . await
23262407 }
23272408
23282409 #[ tracing:: instrument( level = "trace" , skip( self ) ) ]
23292410 async fn seal_table ( & self , id : u64 ) -> Result < IdRow < Table > , CubeError > {
23302411 self . write_operation ( "seal_table" , move |db_ref, batch_pipe| {
2331- batch_pipe. set_post_commit_callback ( |metastore| {
2332- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2333- } ) ;
23342412 let rocks_table = TableRocksTable :: new ( db_ref. clone ( ) ) ;
2335- Ok ( rocks_table. update_with_fn ( id, |r| r. update_sealed ( true ) , batch_pipe) ?)
2413+ let entry = rocks_table. update_with_fn ( id, |r| r. update_sealed ( true ) , batch_pipe) ?;
2414+
2415+ let table_to_move = entry. get_row ( ) . clone ( ) ;
2416+ batch_pipe. set_post_commit_callback ( move |metastore| {
2417+ metastore
2418+ . cached_tables
2419+ . update_table_by_id_or_reset ( id, |tp| {
2420+ tp. table = IdRow :: new ( tp. table . get_id ( ) , table_to_move) ;
2421+ } ) ;
2422+ } ) ;
2423+
2424+ Ok ( entry)
23362425 } )
23372426 . await
23382427 }
@@ -2359,16 +2448,24 @@ impl MetaStore for RocksMetaStore {
23592448 self . write_operation (
23602449 "update_location_download_size" ,
23612450 move |db_ref, batch_pipe| {
2362- batch_pipe. set_post_commit_callback ( |metastore| {
2363- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2364- } ) ;
2365-
23662451 let rocks_table = TableRocksTable :: new ( db_ref. clone ( ) ) ;
2367- rocks_table. update_with_res_fn (
2452+ let entry = rocks_table. update_with_res_fn (
23682453 id,
23692454 |r| r. update_location_download_size ( & location, download_size) ,
23702455 batch_pipe,
2371- )
2456+ ) ?;
2457+
2458+ let table_to_move = entry. get_row ( ) . clone ( ) ;
2459+
2460+ batch_pipe. set_post_commit_callback ( move |metastore| {
2461+ metastore
2462+ . cached_tables
2463+ . update_table_by_id_or_reset ( id, |tp| {
2464+ tp. table = IdRow :: new ( tp. table . get_id ( ) , table_to_move) ;
2465+ } ) ;
2466+ } ) ;
2467+
2468+ Ok ( entry)
23722469 } ,
23732470 )
23742471 . await
@@ -2423,18 +2520,17 @@ impl MetaStore for RocksMetaStore {
24232520 } else {
24242521 let cache = self . cached_tables . clone ( ) ;
24252522
2426- if let Some ( t) = cube_ext:: spawn_blocking ( move || cache. lock ( ) . unwrap ( ) . clone ( ) ) . await ?
2427- {
2523+ if let Some ( t) = cube_ext:: spawn_blocking ( move || cache. get ( ) ) . await ? {
24282524 return Ok ( t) ;
24292525 }
24302526
24312527 let cache = self . cached_tables . clone ( ) ;
24322528 // Can't do read_operation_out_of_queue as we need to update cache on the same thread where it's dropped
24332529 self . read_operation ( "get_tables_with_path" , move |db_ref| {
2434- let cached_tables = { cache. lock ( ) . unwrap ( ) . clone ( ) } ;
2435- if let Some ( t) = cached_tables {
2530+ if let Some ( t) = cache. get ( ) {
24362531 return Ok ( t) ;
24372532 }
2533+
24382534 let table_rocks_table = TableRocksTable :: new ( db_ref. clone ( ) ) ;
24392535 let mut tables = Vec :: new ( ) ;
24402536 for t in table_rocks_table. scan_all_rows ( ) ? {
@@ -2450,8 +2546,7 @@ impl MetaStore for RocksMetaStore {
24502546 |table, schema| TablePath :: new ( schema, table) ,
24512547 ) ?) ;
24522548
2453- let to_cache = tables. clone ( ) ;
2454- * cache. lock ( ) . unwrap ( ) = Some ( to_cache) ;
2549+ cache. set ( tables. clone ( ) ) ;
24552550
24562551 Ok ( tables)
24572552 } )
@@ -2493,9 +2588,12 @@ impl MetaStore for RocksMetaStore {
24932588 #[ tracing:: instrument( level = "trace" , skip( self ) ) ]
24942589 async fn drop_table ( & self , table_id : u64 ) -> Result < IdRow < Table > , CubeError > {
24952590 self . write_operation ( "drop_table" , move |db_ref, batch_pipe| {
2496- batch_pipe. set_post_commit_callback ( |metastore| {
2497- * metastore. cached_tables . lock ( ) . unwrap ( ) = None ;
2591+ batch_pipe. set_post_commit_callback ( move |metastore| {
2592+ metastore
2593+ . cached_tables
2594+ . remove_by_table_id_or_reset ( table_id) ;
24982595 } ) ;
2596+
24992597 RocksMetaStore :: drop_table_impl ( table_id, db_ref, batch_pipe)
25002598 } )
25012599 . await
0 commit comments