Skip to content

Commit 6781f14

Browse files
committed
[feat](condition cache) Support condition cache for external table
1 parent e8bc244 commit 6781f14

File tree

17 files changed

+1264
-24
lines changed

17 files changed

+1264
-24
lines changed

be/src/olap/rowset/segment_v2/condition_cache.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,32 @@ void ConditionCache::insert(const CacheKey& key, std::shared_ptr<std::vector<boo
4949
result->capacity(), result->capacity(), CachePriority::NORMAL));
5050
}
5151

52+
bool ConditionCache::lookup(const ExternalCacheKey& key, ConditionCacheHandle* handle) {
53+
auto encoded = key.encode();
54+
if (encoded.empty()) {
55+
return false;
56+
}
57+
auto* lru_handle = LRUCachePolicy::lookup(encoded);
58+
if (lru_handle == nullptr) {
59+
return false;
60+
}
61+
*handle = ConditionCacheHandle(this, lru_handle);
62+
return true;
63+
}
64+
65+
void ConditionCache::insert(const ExternalCacheKey& key,
66+
std::shared_ptr<std::vector<bool>> result) {
67+
auto encoded = key.encode();
68+
if (encoded.empty()) {
69+
return;
70+
}
71+
std::unique_ptr<ConditionCache::CacheValue> cache_value_ptr =
72+
std::make_unique<ConditionCache::CacheValue>();
73+
cache_value_ptr->filter_result = result;
74+
75+
ConditionCacheHandle(this, LRUCachePolicy::insert(encoded, (void*)cache_value_ptr.release(),
76+
result->capacity(), result->capacity(),
77+
CachePriority::NORMAL));
78+
}
79+
5280
} // namespace doris::segment_v2

be/src/olap/rowset/segment_v2/condition_cache.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,30 @@ class ConditionCache : public LRUCachePolicy {
6969
std::shared_ptr<std::vector<bool>> filter_result;
7070
};
7171

72+
// Cache key for external tables (Hive ORC/Parquet)
73+
struct ExternalCacheKey {
74+
ExternalCacheKey(const std::string& path_, int64_t modification_time_, int64_t file_size_,
75+
uint64_t digest_)
76+
: path(path_),
77+
modification_time(modification_time_),
78+
file_size(file_size_),
79+
digest(digest_) {}
80+
std::string path;
81+
int64_t modification_time;
82+
int64_t file_size;
83+
uint64_t digest;
84+
85+
[[nodiscard]] std::string encode() const {
86+
std::string key = path;
87+
char buf[24];
88+
memcpy(buf, &modification_time, 8);
89+
memcpy(buf + 8, &file_size, 8);
90+
memcpy(buf + 16, &digest, 8);
91+
key.append(buf, 24);
92+
return key;
93+
}
94+
};
95+
7296
// Create global instance of this class
7397
static ConditionCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) {
7498
auto* res = new ConditionCache(capacity, num_shards);
@@ -90,6 +114,10 @@ class ConditionCache : public LRUCachePolicy {
90114
bool lookup(const CacheKey& key, ConditionCacheHandle* handle);
91115

92116
void insert(const CacheKey& key, std::shared_ptr<std::vector<bool>> filter_result);
117+
118+
bool lookup(const ExternalCacheKey& key, ConditionCacheHandle* handle);
119+
120+
void insert(const ExternalCacheKey& key, std::shared_ptr<std::vector<bool>> filter_result);
93121
};
94122

95123
class ConditionCacheHandle {

be/src/olap/rowset/segment_v2/row_ranges.h

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,53 @@ class RowRanges {
192192
*result = std::move(tmp_range);
193193
}
194194

195+
// Calculates the exception (set difference) of the two specified RowRanges objects: left \ right.
196+
// The result contains all row indexes that are in the left ranges but NOT in the right ranges.
197+
// For example:
198+
// [100, 300) \ [150, 200) = [100, 150), [200, 300)
199+
// [100, 300) \ [0, 150) = [150, 300)
200+
// [100, 300) \ [250, 400) = [100, 250)
201+
// [100, 200) \ [200, 300) = [100, 200)
202+
// [100, 300) \ [0, 400) = <EMPTY>
203+
// [100, 200), [300, 400) \ [150, 350) = [100, 150), [350, 400)
204+
static void ranges_exception(const RowRanges& left, const RowRanges& right, RowRanges* result) {
205+
RowRanges tmp_range;
206+
int right_index = 0;
207+
for (auto it1 = left._ranges.begin(); it1 != left._ranges.end(); ++it1) {
208+
int64_t current_from = it1->from();
209+
int64_t current_to = it1->to();
210+
for (int i = right_index; i < right._ranges.size(); ++i) {
211+
const RowRange& range2 = right._ranges[i];
212+
if (current_from >= current_to) {
213+
// Current range fully consumed
214+
break;
215+
}
216+
if (current_to <= range2.from()) {
217+
// Current remaining range is entirely before range2, no more subtraction needed
218+
break;
219+
}
220+
if (current_from >= range2.to()) {
221+
// range2 is entirely before the current remaining range, advance right_index
222+
right_index = i + 1;
223+
continue;
224+
}
225+
// There is overlap between [current_from, current_to) and range2
226+
if (current_from < range2.from()) {
227+
// Left portion before the overlap: [current_from, range2.from())
228+
tmp_range.add(RowRange(current_from, range2.from()));
229+
}
230+
// Advance current_from past the overlap
231+
current_from = range2.to();
232+
}
233+
// Add whatever remains of the current left range
234+
if (current_from < current_to) {
235+
tmp_range.add(RowRange(current_from, current_to));
236+
}
237+
}
238+
*result = std::move(tmp_range);
239+
}
240+
241+
195242
static roaring::Roaring ranges_to_roaring(const RowRanges& ranges) {
196243
roaring::Roaring result;
197244
for (auto it = ranges._ranges.begin(); it != ranges._ranges.end(); ++it) {
@@ -275,6 +322,23 @@ class RowRanges {
275322
_count += range_to_add.count();
276323
}
277324

325+
// Returns the row index (within the original row space) of the pos-th element
326+
// across all ranges. For example, if ranges are [0,3000) and [8000,11000),
327+
// pos=0 returns 0, pos=2999 returns 2999, pos=3000 returns 8000.
328+
int64_t get_row_index_by_pos(int64_t pos) const {
329+
size_t remaining = pos;
330+
for (const auto& range : _ranges) {
331+
size_t range_len = range.count();
332+
if (remaining < range_len) {
333+
return range.from() + remaining;
334+
}
335+
remaining -= range_len;
336+
}
337+
// pos is out of bounds; return -1 to indicate invalid
338+
DCHECK(false) << "pos " << pos << " is out of bounds for RowRanges with count " << _count;
339+
return -1;
340+
}
341+
278342
uint64_t get_digest(uint64_t seed) const {
279343
for (auto range : _ranges) {
280344
seed = range.get_digest(seed);

be/src/vec/exec/format/generic_reader.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@ namespace doris::vectorized {
3535

3636
class Block;
3737
class VSlotRef;
38+
39+
// Context passed from FileScanner to readers for condition cache integration.
40+
// On MISS: readers populate filter_result per-granule during predicate evaluation.
41+
// On HIT: readers skip granules where filter_result[granule] == false.
42+
struct ConditionCacheContext {
43+
bool is_hit = false;
44+
std::shared_ptr<std::vector<bool>> filter_result; // per-granule: true = has surviving rows
45+
static constexpr int GRANULE_SIZE = 2048;
46+
};
47+
3848
// This a reader interface for all file readers.
3949
// A GenericReader is responsible for reading a file and return
4050
// a set of blocks with specified schema,
@@ -103,7 +113,16 @@ class GenericReader : public ProfileCollector {
103113
bool _fill_all_columns = false;
104114
TPushAggOp::type _push_down_agg_type {};
105115

106-
// For TopN queries, rows will be read according to row ids produced by TopN result.
116+
public:
117+
// Pass condition cache context to the reader for HIT/MISS tracking.
118+
virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {}
119+
120+
// Returns true if this reader has delete operations (e.g. Iceberg position/equality deletes,
121+
// Hive ACID deletes). Used to disable condition cache when deletes are present, since cached
122+
// granule results may become stale if delete files change between queries.
123+
virtual bool has_delete_operations() const { return false; }
124+
125+
protected:
107126
bool _read_by_rows = false;
108127
std::list<int64_t> _row_ids;
109128

be/src/vec/exec/format/orc/vorc_reader.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2264,6 +2264,26 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
22642264
// reset decimal_scale_params_index;
22652265
_decimal_scale_params_index = 0;
22662266
try {
2267+
// Condition cache HIT: skip consecutive false granules before reading
2268+
if (_condition_cache_ctx && _condition_cache_ctx->is_hit) {
2269+
int64_t current_row = _row_reader->getRowNumber();
2270+
auto& cache = *_condition_cache_ctx->filter_result;
2271+
int64_t granule = current_row / ConditionCacheContext::GRANULE_SIZE;
2272+
int64_t max_granule = static_cast<int64_t>(cache.size());
2273+
while (granule < max_granule && !cache[granule]) {
2274+
granule++;
2275+
}
2276+
if (granule >= max_granule) {
2277+
*eof = true;
2278+
*read_rows = 0;
2279+
return Status::OK();
2280+
}
2281+
int64_t target_row = granule * ConditionCacheContext::GRANULE_SIZE;
2282+
if (target_row > current_row) {
2283+
_row_reader->seekToRow(target_row);
2284+
}
2285+
}
2286+
_last_batch_row_offset = _row_reader->getRowNumber();
22672287
rr = _row_reader->nextBatch(*_batch, block);
22682288
if (rr == 0 || _batch->numElements == 0) {
22692289
*eof = true;
@@ -2363,6 +2383,26 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23632383
// reset decimal_scale_params_index;
23642384
_decimal_scale_params_index = 0;
23652385
try {
2386+
// Condition cache HIT: skip consecutive false granules before reading
2387+
if (_condition_cache_ctx && _condition_cache_ctx->is_hit) {
2388+
int64_t current_row = _row_reader->getRowNumber();
2389+
auto& cache = *_condition_cache_ctx->filter_result;
2390+
int64_t granule = current_row / ConditionCacheContext::GRANULE_SIZE;
2391+
int64_t max_granule = static_cast<int64_t>(cache.size());
2392+
while (granule < max_granule && !cache[granule]) {
2393+
granule++;
2394+
}
2395+
if (granule >= max_granule) {
2396+
*eof = true;
2397+
*read_rows = 0;
2398+
return Status::OK();
2399+
}
2400+
int64_t target_row = granule * ConditionCacheContext::GRANULE_SIZE;
2401+
if (target_row > current_row) {
2402+
_row_reader->seekToRow(target_row);
2403+
}
2404+
}
2405+
_last_batch_row_offset = _row_reader->getRowNumber();
23662406
rr = _row_reader->nextBatch(*_batch, block);
23672407
if (rr == 0 || _batch->numElements == 0) {
23682408
*eof = true;
@@ -2480,6 +2520,24 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
24802520
bool can_filter_all = false;
24812521
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
24822522
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
2523+
2524+
// Condition cache MISS: mark granules with surviving rows (non-lazy path)
2525+
if (_condition_cache_ctx && !_condition_cache_ctx->is_hit) {
2526+
auto& cache = *_condition_cache_ctx->filter_result;
2527+
auto* filter_data = result_filter.data();
2528+
size_t num_rows = block->rows();
2529+
for (size_t i = 0; i < num_rows; i++) {
2530+
if (filter_data[i]) {
2531+
size_t granule = (_last_batch_row_offset + i) /
2532+
ConditionCacheContext::GRANULE_SIZE;
2533+
if (granule >= cache.size()) {
2534+
cache.resize(granule + 1, false);
2535+
}
2536+
cache[granule] = true;
2537+
}
2538+
}
2539+
}
2540+
24832541
if (can_filter_all) {
24842542
for (auto& col : columns_to_filter) {
24852543
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
@@ -2697,6 +2755,21 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
26972755
sel[new_size] = i;
26982756
new_size += result_filter_data[i] ? 1 : 0;
26992757
}
2758+
2759+
// Condition cache MISS: mark granules with surviving rows
2760+
if (_condition_cache_ctx && !_condition_cache_ctx->is_hit && new_size > 0) {
2761+
auto& cache = *_condition_cache_ctx->filter_result;
2762+
for (uint16_t i = 0; i < size; i++) {
2763+
if (result_filter_data[i]) {
2764+
size_t granule = (_last_batch_row_offset + i) / ConditionCacheContext::GRANULE_SIZE;
2765+
if (granule >= cache.size()) {
2766+
cache.resize(granule + 1, false);
2767+
}
2768+
cache[granule] = true;
2769+
}
2770+
}
2771+
}
2772+
27002773
_statistics.lazy_read_filtered_rows += static_cast<int64_t>(size - new_size);
27012774
data.numElements = new_size;
27022775
return Status::OK();

be/src/vec/exec/format/orc/vorc_reader.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,16 @@ class OrcReader : public GenericReader {
232232

233233
bool count_read_rows() override { return true; }
234234

235+
void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) override {
236+
_condition_cache_ctx = std::move(ctx);
237+
}
238+
239+
bool has_delete_operations() const override {
240+
return (_position_delete_ordered_rowids != nullptr &&
241+
!_position_delete_ordered_rowids->empty()) ||
242+
(_delete_rows != nullptr && !_delete_rows->empty());
243+
}
244+
235245
protected:
236246
void _collect_profile_before_close() override;
237247

@@ -700,6 +710,9 @@ class OrcReader : public GenericReader {
700710
std::unique_ptr<orc::ColumnVectorBatch> _batch;
701711
std::unique_ptr<orc::Reader> _reader = nullptr;
702712
std::unique_ptr<orc::RowReader> _row_reader;
713+
714+
int64_t _last_batch_row_offset = -1;
715+
std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
703716
std::unique_ptr<ORCFilterImpl> _orc_filter;
704717
orc::RowReaderOptions _row_reader_options;
705718

0 commit comments

Comments
 (0)