Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,8 @@ Status CloudCumulativeCompaction::modify_rowsets() {
DBUG_BLOCK);
auto status = _engine.meta_mgr().cloud_update_delete_bitmap_without_lock(
*cloud_tablet(), pre_rowsets_delete_bitmap.get(), pre_rowset_to_versions,
_output_rowset->start_version(), _output_rowset->end_version());
cloud_tablet()->table_id(), _output_rowset->start_version(),
_output_rowset->end_version());
if (!status.ok()) {
LOG(WARNING) << "failed to agg pre rowsets delete bitmap to ms. tablet_id="
<< _tablet->tablet_id() << ", pre rowset num=" << pre_rowsets.size()
Expand Down
5 changes: 3 additions & 2 deletions be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
RETURN_IF_ERROR(rowset_writer->build(rowset));
rowset->rowset_meta()->set_delete_predicate(std::move(del_pred));

auto st = engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(), "");
auto st =
engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(), "", tablet->table_id());
if (!st.ok()) {
LOG(WARNING) << "failed to prepare rowset, status=" << st.to_string();
return st;
}

st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");
st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "", tablet->table_id());
if (!st.ok()) {
LOG(WARNING) << "failed to commit rowset, status=" << st.to_string();
return st;
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ Status CloudDeltaWriter::commit_rowset() {
}

// Handle normal rowset with data
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "",
_rowset_builder->tablet()->table_id());
}

Status CloudDeltaWriter::_commit_empty_rowset() {
Expand All @@ -138,7 +139,8 @@ Status CloudDeltaWriter::_commit_empty_rowset() {
return Status::OK();
}
// write a empty rowset kv to keep version continuous
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "",
_rowset_builder->tablet()->table_id());
}

Status CloudDeltaWriter::set_txn_related_info() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ Status CloudFullCompaction::_cloud_full_compaction_update_delete_bitmap(int64_t
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*cloud_tablet(), -1, initiator, delete_bitmap.get(), delete_bitmap.get(),
_output_rowset->rowset_id().to_string(), storage_resource,
config::delete_bitmap_store_write_version));
config::delete_bitmap_store_write_version, cloud_tablet()->table_id()));
LOG_INFO("update delete bitmap in CloudFullCompaction, tablet_id={}, range=[{}-{}]",
_tablet->tablet_id(), _input_rowsets.front()->start_version(),
_input_rowsets.back()->end_version())
Expand Down
306 changes: 266 additions & 40 deletions be/src/cloud/cloud_meta_mgr.cpp

Large diffs are not rendered by default.

26 changes: 20 additions & 6 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class StartTabletJobResponse;
class TabletJobInfoPB;
class TabletStatsPB;
class TabletIndexPB;
class HostLevelMSRpcRateLimiters;
class MSBackpressureHandler;

using StorageVaultInfos = std::vector<
std::tuple<std::string, std::variant<S3Conf, HdfsVaultInfo>, StorageVaultPB_PathFormat>>;
Expand Down Expand Up @@ -77,17 +79,18 @@ class CloudMetaMgr {
CloudTablet* tablet, std::unique_lock<bthread::Mutex>& lock /* _sync_meta_lock */,
const SyncOptions& options = {}, SyncRowsetStats* sync_stats = nullptr);

Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id, int64_t table_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id, int64_t table_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);
void cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t expiration_time);

Status update_tmp_rowset(const RowsetMeta& rs_meta);
Status update_tmp_rowset(const RowsetMeta& rs_meta, int64_t table_id);

Status update_packed_file_info(const std::string& packed_file_path,
const cloud::PackedFileInfoPB& packed_file_info);
const cloud::PackedFileInfoPB& packed_file_info,
int64_t table_id);

Status commit_txn(const StreamLoadContext& ctx, bool is_2pc);

Expand Down Expand Up @@ -141,12 +144,12 @@ class CloudMetaMgr {
DeleteBitmap* delete_bitmap, DeleteBitmap* delete_bitmap_v2,
std::string rowset_id,
std::optional<StorageResource> storage_resource,
int64_t store_version, int64_t txn_id = -1,
int64_t store_version, int64_t table_id, int64_t txn_id = -1,
bool is_explicit_txn = false, int64_t next_visible_version = -1);

Status cloud_update_delete_bitmap_without_lock(
const CloudTablet& tablet, DeleteBitmap* delete_bitmap,
std::map<std::string, int64_t>& rowset_to_versions,
std::map<std::string, int64_t>& rowset_to_versions, int64_t table_id,
int64_t pre_rowset_agg_start_version = 0, int64_t pre_rowset_agg_end_version = 0);

Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
Expand Down Expand Up @@ -175,6 +178,14 @@ class CloudMetaMgr {
Status get_cluster_status(std::unordered_map<std::string, std::pair<int32_t, int64_t>>* result,
std::string* my_cluster_id = nullptr);

void set_host_level_ms_rpc_rate_limiters(HostLevelMSRpcRateLimiters* limiters) {
host_level_ms_rpc_rate_limiters_ = limiters;
}

void set_ms_backpressure_handler(MSBackpressureHandler* handler) {
ms_backpressure_handler_ = handler;
}

private:
bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap);
Expand Down Expand Up @@ -202,6 +213,9 @@ class CloudMetaMgr {
void check_table_size_correctness(RowsetMeta& rs_meta);
int64_t get_segment_file_size(RowsetMeta& rs_meta);
int64_t get_inverted_index_file_size(RowsetMeta& rs_meta);

HostLevelMSRpcRateLimiters* host_level_ms_rpc_rate_limiters_ {nullptr};
MSBackpressureHandler* ms_backpressure_handler_ {nullptr};
};

} // namespace cloud
Expand Down
Loading
Loading