Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 160 additions & 31 deletions rust/cubestore/cubestore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1356,10 +1356,106 @@ impl RocksStoreDetails for RocksMetaStoreDetails {
}
}

pub struct CachedTables {
tables: Mutex<Option<Arc<Vec<TablePath>>>>,
}
Comment on lines +1359 to +1361
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Clippy will flag new_without_default here. Consider deriving or implementing Default:

Suggested change
pub struct CachedTables {
tables: Mutex<Option<Arc<Vec<TablePath>>>>,
}
pub struct CachedTables {
tables: Mutex<Option<Arc<Vec<TablePath>>>>,
}
impl Default for CachedTables {
fn default() -> Self {
Self::new()
}
}


impl CachedTables {
pub fn new() -> Self {
Self {
tables: Mutex::new(None),
}
}

pub fn reset(&self) {
*self.tables.lock().unwrap() = None;
}

pub fn get(&self) -> Option<Arc<Vec<TablePath>>> {
self.tables.lock().unwrap().clone()
}

pub fn set(&self, tables: Arc<Vec<TablePath>>) {
*self.tables.lock().unwrap() = Some(tables);
}

/// Surgically update a single entry in the cache by table ID.
/// The closure receives a mutable reference to the entry for in-place mutation.
/// If the entry is not found or the cache is not populated, resets the cache.
#[inline(always)]
Comment thread
ovr marked this conversation as resolved.
pub fn update_table_by_id_or_reset<F>(&self, table_id: u64, f: F)
where
F: FnOnce(&mut TablePath),
{
let mut guard = self.tables.lock().unwrap();
let Some(cached) = guard.as_mut() else {
return;
};

// Check existence on the immutable Arc first to avoid a wasted
// deep-clone from make_mut when the entry is absent.
let Some(idx) = cached.iter().position(|tp| tp.table.get_id() == table_id) else {
log::warn!(
"Table with id: {} not found in cache, completely resetting cache",
table_id
);

*guard = None;
return;
};

let tables = Arc::make_mut(cached);
f(&mut tables[idx]);

// Remove entry if it's no longer ready (cache only stores ready tables)
if !tables[idx].table.get_row().is_ready() {
tables.swap_remove(idx);
}
}

pub fn upsert_table_by_id(&self, table_id: u64, entry: TablePath) {
let mut guard = self.tables.lock().unwrap();
let Some(cached) = guard.as_mut() else {
return;
};

let tables = Arc::make_mut(cached);
if let Some(idx) = tables.iter().position(|tp| tp.table.get_id() == table_id) {
tables.remove(idx);
}

// Paranoid check
if entry.table.get_row().is_ready() {
tables.push(entry);
} else {
debug_assert!(
false,
"upsert_table_by_id called with non-ready table (id: {})",
table_id
);
}
Comment thread
ovr marked this conversation as resolved.
}

pub fn remove_by_table_id_or_reset(&self, table_id: u64) {
let mut guard = self.tables.lock().unwrap();
let Some(cached) = guard.as_mut() else {
return;
};

let tables = Arc::make_mut(cached);
let Some(idx) = tables.iter().position(|tp| tp.table.get_id() == table_id) else {
*guard = None;
return;
};
Comment on lines +1445 to +1449
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Same Arc::make_mut-before-lookup issue as update_table_by_id_or_reset. When dropping a non-ready table (not in cache), this clones the entire vec needlessly:

Suggested change
let tables = Arc::make_mut(cached);
let Some(idx) = tables.iter().position(|tp| tp.table.get_id() == table_id) else {
*guard = None;
return;
};
let Some(idx) = cached.iter().position(|tp| tp.table.get_id() == table_id) else {
*guard = None;
return;
};
let tables = Arc::make_mut(cached);


tables.remove(idx);
}
}

#[derive(Clone)]
pub struct RocksMetaStore {
store: Arc<RocksStore>,
cached_tables: Arc<Mutex<Option<Arc<Vec<TablePath>>>>>,
cached_tables: Arc<CachedTables>,
disk_space_cache: Arc<RwLock<Option<(HashMap<String, u64>, SystemTime)>>>,
upload_loop: Arc<WorkerLoop>,
}
Expand All @@ -1382,14 +1478,14 @@ impl RocksMetaStore {
fn new_from_store(store: Arc<RocksStore>) -> Arc<Self> {
Arc::new(Self {
store,
cached_tables: Arc::new(Mutex::new(None)),
cached_tables: Arc::new(CachedTables::new()),
disk_space_cache: Arc::new(RwLock::new(None)),
upload_loop: Arc::new(WorkerLoop::new("Metastore upload")),
})
}

pub fn reset_cached_tables(&self) {
*self.cached_tables.lock().unwrap() = None;
self.cached_tables.reset();
}

pub async fn load_from_dump(
Expand Down Expand Up @@ -1917,7 +2013,7 @@ impl MetaStore for RocksMetaStore {
) -> Result<IdRow<Schema>, CubeError> {
self.write_operation("create_schema", move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
metastore.cached_tables.reset();
});
let table = SchemaRocksTable::new(db_ref.clone());
if if_not_exists {
Expand Down Expand Up @@ -1980,7 +2076,7 @@ impl MetaStore for RocksMetaStore {
) -> Result<IdRow<Schema>, CubeError> {
self.write_operation("rename_schema", move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
metastore.cached_tables.reset();
});
let table = SchemaRocksTable::new(db_ref.clone());
let existing_keys =
Expand All @@ -2006,7 +2102,7 @@ impl MetaStore for RocksMetaStore {
) -> Result<IdRow<Schema>, CubeError> {
self.write_operation("rename_schema_by_id", move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
metastore.cached_tables.reset();
});
let table = SchemaRocksTable::new(db_ref.clone());

Expand All @@ -2024,7 +2120,7 @@ impl MetaStore for RocksMetaStore {
async fn delete_schema(&self, schema_name: String) -> Result<(), CubeError> {
self.write_operation("delete_schema", move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
metastore.cached_tables.reset();
});
let table = SchemaRocksTable::new(db_ref.clone());
let existing_keys =
Expand Down Expand Up @@ -2053,7 +2149,7 @@ impl MetaStore for RocksMetaStore {
async fn delete_schema_by_id(&self, schema_id: u64) -> Result<(), CubeError> {
self.write_operation("delete_schema_by_id", move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
metastore.cached_tables.reset();
});
let tables = TableRocksTable::new(db_ref.clone()).all_rows()?;
if tables
Expand Down Expand Up @@ -2120,13 +2216,15 @@ impl MetaStore for RocksMetaStore {
) -> Result<IdRow<Table>, CubeError> {
self.write_operation("create_table", move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
metastore.cached_tables.reset();
});

if drop_if_exists {
if let Ok(exists_table) = get_table_impl(db_ref.clone(), schema_name.clone(), table_name.clone()) {
RocksMetaStore::drop_table_impl(exists_table.get_id(), db_ref.clone(), batch_pipe)?;
}
}

let rocks_table = TableRocksTable::new(db_ref.clone());
let rocks_index = IndexRocksTable::new(db_ref.clone());
let rocks_schema = SchemaRocksTable::new(db_ref.clone());
Expand Down Expand Up @@ -2316,23 +2414,45 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn table_ready(&self, id: u64, is_ready: bool) -> Result<IdRow<Table>, CubeError> {
self.write_operation("table_ready", move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
let rocks_table = TableRocksTable::new(db_ref.clone());
Ok(rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?)
let entry =
rocks_table.update_with_fn(id, |r| r.update_is_ready(is_ready), batch_pipe)?;

if is_ready {
let schema = SchemaRocksTable::new(db_ref)
.get_row_or_not_found(entry.get_row().get_schema_id())?;
let table_path = TablePath::new(Arc::new(schema), entry.clone());

batch_pipe.set_post_commit_callback(move |metastore| {
metastore.cached_tables.upsert_table_by_id(id, table_path);
});
} else {
batch_pipe.set_post_commit_callback(move |metastore| {
metastore.cached_tables.remove_by_table_id_or_reset(id);
});
}

Ok(entry)
})
.await
}

#[tracing::instrument(level = "trace", skip(self))]
async fn seal_table(&self, id: u64) -> Result<IdRow<Table>, CubeError> {
self.write_operation("seal_table", move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});
let rocks_table = TableRocksTable::new(db_ref.clone());
Ok(rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?)
let entry = rocks_table.update_with_fn(id, |r| r.update_sealed(true), batch_pipe)?;

let table_to_move = entry.get_row().clone();
batch_pipe.set_post_commit_callback(move |metastore| {
metastore
.cached_tables
.update_table_by_id_or_reset(id, |tp| {
tp.table = IdRow::new(tp.table.get_id(), table_to_move);
});
});

Ok(entry)
})
.await
}
Expand All @@ -2359,16 +2479,24 @@ impl MetaStore for RocksMetaStore {
self.write_operation(
"update_location_download_size",
move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
});

let rocks_table = TableRocksTable::new(db_ref.clone());
rocks_table.update_with_res_fn(
let entry = rocks_table.update_with_res_fn(
id,
|r| r.update_location_download_size(&location, download_size),
batch_pipe,
)
)?;

let table_to_move = entry.get_row().clone();

batch_pipe.set_post_commit_callback(move |metastore| {
metastore
.cached_tables
.update_table_by_id_or_reset(id, |tp| {
tp.table = IdRow::new(tp.table.get_id(), table_to_move);
});
});

Ok(entry)
},
)
.await
Expand Down Expand Up @@ -2423,18 +2551,17 @@ impl MetaStore for RocksMetaStore {
} else {
let cache = self.cached_tables.clone();

if let Some(t) = cube_ext::spawn_blocking(move || cache.lock().unwrap().clone()).await?
{
if let Some(t) = cube_ext::spawn_blocking(move || cache.get()).await? {
return Ok(t);
}

let cache = self.cached_tables.clone();
// Can't do read_operation_out_of_queue as we need to update cache on the same thread where it's dropped
self.read_operation("get_tables_with_path", move |db_ref| {
let cached_tables = { cache.lock().unwrap().clone() };
if let Some(t) = cached_tables {
if let Some(t) = cache.get() {
return Ok(t);
}

let table_rocks_table = TableRocksTable::new(db_ref.clone());
let mut tables = Vec::new();
for t in table_rocks_table.scan_all_rows()? {
Expand All @@ -2450,8 +2577,7 @@ impl MetaStore for RocksMetaStore {
|table, schema| TablePath::new(schema, table),
)?);

let to_cache = tables.clone();
*cache.lock().unwrap() = Some(to_cache);
cache.set(tables.clone());

Ok(tables)
})
Expand Down Expand Up @@ -2493,9 +2619,12 @@ impl MetaStore for RocksMetaStore {
#[tracing::instrument(level = "trace", skip(self))]
async fn drop_table(&self, table_id: u64) -> Result<IdRow<Table>, CubeError> {
self.write_operation("drop_table", move |db_ref, batch_pipe| {
batch_pipe.set_post_commit_callback(|metastore| {
*metastore.cached_tables.lock().unwrap() = None;
batch_pipe.set_post_commit_callback(move |metastore| {
metastore
.cached_tables
.remove_by_table_id_or_reset(table_id);
});

RocksMetaStore::drop_table_impl(table_id, db_ref, batch_pipe)
})
.await
Expand Down
Loading