Skip to content

Commit 4a5d300

Browse files
committed
[feature](scan) Add session variable to control value predicate pushdown for MOR tables
Add a new session variable `enable_mor_value_predicate_pushdown_tables` to allow users to selectively enable value column predicate pushdown for MOR (Merge-On-Read) tables. This can improve query performance by utilizing inverted indexes on value columns for filtering. The session variable accepts: - Comma-separated table names: `db1.tbl1,db2.tbl2` - Wildcard for all MOR tables: `*` - Empty string to disable (default) Changes: - Add session variable in SessionVariable.java with helper method - Add isMorTable() helper in OlapTable.java - Add Thrift field enable_mor_value_predicate_pushdown in TOlapScanNode - Set flag in OlapScanNode.toThrift() based on session variable - Add virtual method _should_push_down_mor_value_predicate() in scan_operator - Implement override in olap_scan_operator to read the flag - Modify predicate pushdown condition in scan_operator.cpp
1 parent 59fabff commit 4a5d300

File tree

10 files changed

+343
-1
lines changed

10 files changed

+343
-1
lines changed

be/src/pipeline/exec/olap_scan_operator.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,12 @@ bool OlapScanLocalState::_storage_no_merge() {
445445
p._olap_scan_node.enable_unique_key_merge_on_write));
446446
}
447447

448+
bool OlapScanLocalState::_should_push_down_mor_value_predicate() {
449+
auto& p = _parent->cast<OlapScanOperatorX>();
450+
return p._olap_scan_node.__isset.enable_mor_value_predicate_pushdown &&
451+
p._olap_scan_node.enable_mor_value_predicate_pushdown;
452+
}
453+
448454
Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) {
449455
if (_scan_ranges.empty()) {
450456
_eos = true;

be/src/pipeline/exec/olap_scan_operator.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
105105

106106
bool _storage_no_merge() override;
107107

108+
bool _should_push_down_mor_value_predicate() override;
109+
108110
bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override {
109111
if (!predicate.target_is_slot(_parent->node_id())) {
110112
return false;

be/src/pipeline/exec/scan_operator.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,8 @@ Status ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c
439439
return Status::OK();
440440
}
441441

442-
if (pdt == PushDownType::ACCEPTABLE && (_is_key_column(slot->col_name()))) {
442+
if (pdt == PushDownType::ACCEPTABLE &&
443+
(_is_key_column(slot->col_name()) || _should_push_down_mor_value_predicate())) {
443444
output_expr = nullptr;
444445
return Status::OK();
445446
} else {

be/src/pipeline/exec/scan_operator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ class ScanLocalState : public ScanLocalStateBase {
205205
virtual bool _storage_no_merge() { return false; }
206206
virtual bool _push_down_topn(const vectorized::RuntimePredicate& predicate) { return false; }
207207
virtual bool _is_key_column(const std::string& col_name) { return false; }
208+
virtual bool _should_push_down_mor_value_predicate() { return false; }
208209
virtual PushDownType _should_push_down_bloom_filter() const {
209210
return PushDownType::UNACCEPTABLE;
210211
}

fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3022,6 +3022,14 @@ public boolean isUniqKeyMergeOnWrite() {
30223022
return getKeysType() == KeysType.UNIQUE_KEYS && getEnableUniqueKeyMergeOnWrite();
30233023
}
30243024

3025+
/**
3026+
* Check if this is a MOR (Merge-On-Read) table.
3027+
* MOR = UNIQUE_KEYS without merge-on-write enabled.
3028+
*/
3029+
public boolean isMorTable() {
3030+
return getKeysType() == KeysType.UNIQUE_KEYS && !getEnableUniqueKeyMergeOnWrite();
3031+
}
3032+
30253033
public boolean isUniqKeyMergeOnWriteWithClusterKeys() {
30263034
return isUniqKeyMergeOnWrite() && getBaseSchema().stream().anyMatch(Column::isClusterKey);
30273035
}

fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,6 +1181,15 @@ protected void toThrift(TPlanNode msg) {
11811181
msg.olap_scan_node.setTableName(tableName);
11821182
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
11831183

1184+
// Set MOR value predicate pushdown flag based on session variable
1185+
if (olapTable.isMorTable() && ConnectContext.get() != null) {
1186+
String dbName = olapTable.getQualifiedDbName();
1187+
String tblName = olapTable.getName();
1188+
boolean enabled = ConnectContext.get().getSessionVariable()
1189+
.isMorValuePredicatePushdownEnabled(dbName, tblName);
1190+
msg.olap_scan_node.setEnableMorValuePredicatePushdown(enabled);
1191+
}
1192+
11841193
msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
11851194

11861195
msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,9 @@ public class SessionVariable implements Serializable, Writable {
722722

723723
public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax";
724724

725+
public static final String ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES
726+
= "enable_mor_value_predicate_pushdown_tables";
727+
725728
// When set use fix replica = true, the fixed replica maybe bad, try to use the health one if
726729
// this session variable is set to true.
727730
public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt";
@@ -2181,6 +2184,13 @@ public boolean isEnableHboNonStrictMatchingMode() {
21812184
"是否启用 string 类型 min max 下推。", "Set whether to enable push down string type minmax."})
21822185
public boolean enablePushDownStringMinMax = false;
21832186

2187+
// Comma-separated list of MOR tables to enable value predicate pushdown.
2188+
@VariableMgr.VarAttr(name = ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES, needForward = true, description = {
2189+
"指定启用MOR表value列谓词下推的表列表,格式:db1.tbl1,db2.tbl2 或 * 表示所有MOR表。",
2190+
"Comma-separated list of MOR tables to enable value predicate pushdown. "
2191+
+ "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."})
2192+
public String enableMorValuePredicatePushdownTables = "";
2193+
21842194
// Whether drop table when create table as select insert data appear error.
21852195
@VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
21862196
public boolean dropTableIfCtasFailed = true;
@@ -4659,6 +4669,35 @@ public boolean isEnablePushDownStringMinMax() {
46594669
return enablePushDownStringMinMax;
46604670
}
46614671

4672+
public String getEnableMorValuePredicatePushdownTables() {
4673+
return enableMorValuePredicatePushdownTables;
4674+
}
4675+
4676+
/**
4677+
* Check if a table is enabled for MOR value predicate pushdown.
4678+
* @param dbName database name
4679+
* @param tableName table name
4680+
* @return true if the table is in the enabled list or if '*' is set
4681+
*/
4682+
public boolean isMorValuePredicatePushdownEnabled(String dbName, String tableName) {
4683+
if (enableMorValuePredicatePushdownTables == null
4684+
|| enableMorValuePredicatePushdownTables.isEmpty()) {
4685+
return false;
4686+
}
4687+
String trimmed = enableMorValuePredicatePushdownTables.trim();
4688+
if ("*".equals(trimmed)) {
4689+
return true;
4690+
}
4691+
String fullName = dbName + "." + tableName;
4692+
for (String table : trimmed.split(",")) {
4693+
if (table.trim().equalsIgnoreCase(fullName)
4694+
|| table.trim().equalsIgnoreCase(tableName)) {
4695+
return true;
4696+
}
4697+
}
4698+
return false;
4699+
}
4700+
46624701
/** canUseNereidsDistributePlanner */
46634702
public static boolean canUseNereidsDistributePlanner() {
46644703
ConnectContext connectContext = ConnectContext.get();

gensrc/thrift/PlanNodes.thrift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,8 @@ struct TOlapScanNode {
887887
20: optional i64 score_sort_limit
888888
21: optional TSortInfo ann_sort_info
889889
22: optional i64 ann_sort_limit
890+
// Enable value predicate pushdown for MOR tables
891+
23: optional bool enable_mor_value_predicate_pushdown
890892
}
891893

892894
struct TEqJoinCondition {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !select_disabled --
3+
3 300 test
4+
5+
-- !select_enabled_tablename --
6+
3 300 test
7+
8+
-- !select_enabled_fullname --
9+
3 300 test
10+
11+
-- !select_enabled_wildcard --
12+
3 300 test
13+
14+
-- !select_inverted_index --
15+
3 300 test
16+
17+
-- !select_deleted_row --
18+
19+
-- !select_not_in_list --
20+
3 300 test
21+
22+
-- !select_latest_version --
23+
1 200 second
24+
2 300 third
25+
26+
-- !select_old_version --
27+
28+
-- !select_new_version --
29+
1 200 second
30+
31+
-- !select_multiple_tables --
32+
2 200
33+
34+
-- !select_mow_table --
35+
2 200
36+
37+
-- !select_dup_table --
38+
2 200
39+

0 commit comments

Comments
 (0)