Skip to content

Commit ecac033

Browse files
committed
fix comments
1 parent bb94fff commit ecac033

File tree

3 files changed

+27
-57
lines changed

3 files changed

+27
-57
lines changed

ql/src/java/org/apache/hadoop/hive/ql/optimizer/LimitPushdownOptimizer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ public Object process(Node nd, Stack<Node> stack,
198198
}
199199
currentOp = currentOp.getChildOperators().get(0);
200200
}
201+
// Skip RS copy for plans that are already TopNKey-optimized to avoid re-propagating
202+
// TopNKey configuration into the same ReduceSink.
203+
if (((LimitPushdownContext)procCtx).disallowRSCopy && foundGroupByOperator) {
204+
return false;
205+
}
201206
List<ExprNodeDesc> cKeys = cRS.getConf().getKeyCols();
202207
List<ExprNodeDesc> pKeys = pRS.getConf().getKeyCols();
203208
if (pRS.getChildren().get(0) instanceof GroupByOperator &&
@@ -250,8 +255,12 @@ private static class LimitPushdownContext implements NodeProcessorCtx {
250255

251256
private final float threshold;
252257

258+
private final boolean disallowRSCopy;
259+
253260
public LimitPushdownContext(HiveConf conf) throws SemanticException {
254261
threshold = conf.getFloatVar(HiveConf.ConfVars.HIVE_LIMIT_PUSHDOWN_MEMORY_USAGE);
262+
disallowRSCopy = conf.getBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_TOPNKEY) &&
263+
!conf.getBoolVar(HiveConf.ConfVars.HIVE_MAPSIDE_AGGREGATE);
255264
if (threshold <= 0 || threshold >= 1) {
256265
throw new SemanticException("Invalid memory usage value " + threshold +
257266
" for " + HiveConf.ConfVars.HIVE_LIMIT_PUSHDOWN_MEMORY_USAGE);

ql/src/java/org/apache/hadoop/hive/ql/optimizer/topnkey/TopNKeyProcessor.java

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,10 @@
1717
*/
1818
package org.apache.hadoop.hive.ql.optimizer.topnkey;
1919

20-
import java.util.ArrayDeque;
2120
import java.util.Collections;
22-
import java.util.Deque;
23-
import java.util.HashSet;
2421
import java.util.List;
25-
import java.util.Set;
2622
import java.util.Stack;
2723

28-
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
29-
import org.apache.hadoop.hive.ql.exec.JoinOperator;
3024
import org.apache.hadoop.hive.ql.exec.Operator;
3125
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
3226
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -77,12 +71,6 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
7771
return null;
7872
}
7973

80-
// Skip the current optimization when a simple global ORDER BY...LIMIT is present
81-
// This plan structure is handled more efficiently by the specialized 'TopN In Reducer' optimization.
82-
if (!reduceSinkDesc.isPTFReduceSink() && reduceSinkDesc.getTopN() > -1 && isOrderByLimitPath(reduceSinkOperator)) {
83-
return null;
84-
}
85-
8674
if (reduceSinkDesc.getTopN() > maxTopNAllowed) {
8775
return null;
8876
}
@@ -123,50 +111,6 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
123111
return null;
124112
}
125113

126-
/**
127-
* Returns true if the ReduceSink is only under an ORDER BY + LIMIT plan
128-
* and has no GroupBy or Join operators in its upstream ancestry.
129-
* This is used to disable TopNKey for pure ORDER BY LIMIT queries where
130-
* LIMIT pushdown must take precedence.
131-
*/
132-
public static boolean isOrderByLimitPath(ReduceSinkOperator rs) {
133-
if (rs == null) {
134-
return false;
135-
}
136-
137-
Deque<Operator<?>> stack = new ArrayDeque<>();
138-
Set<Operator<?>> visited = new HashSet<>();
139-
stack.push(rs);
140-
141-
while (!stack.isEmpty()) {
142-
Operator<?> current = stack.pop();
143-
144-
if (!visited.add(current)) {
145-
continue;
146-
}
147-
148-
List<Operator<?>> parents = current.getParentOperators();
149-
if (parents == null || parents.isEmpty()) {
150-
continue;
151-
}
152-
153-
for (Operator<?> parent : parents) {
154-
155-
// If any GroupBy or Join or topNkey operator exists in the upstream path,
156-
// this is NOT a pure ORDER BY LIMIT plan.
157-
if (parent instanceof GroupByOperator || parent instanceof JoinOperator || parent instanceof TopNKeyOperator) {
158-
return false;
159-
}
160-
161-
stack.push(parent);
162-
}
163-
}
164-
165-
// Reached root without seeing GBY or JOIN or topNkey=> pure ORDER BY + LIMIT
166-
return true;
167-
}
168-
169-
170114
public static Operator<? extends OperatorDesc> copyDown(Operator<? extends OperatorDesc> child, OperatorDesc operatorDesc) {
171115
final List<Operator<? extends OperatorDesc>> parents = child.getParentOperators();
172116

ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1300,9 +1300,17 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx)
13001300
return;
13011301
}
13021302

1303+
// Restrict TopNKey matching to GROUP BY / JOIN ReduceSinks, except for PTF queries.
1304+
String reduceSinkOp = ReduceSinkOperator.getOperatorName() + "%";
1305+
String topNKeyRegexPattern = hasPTFReduceSink(procCtx) ? reduceSinkOp :
1306+
".*(" + GroupByOperator.getOperatorName() + "|" +
1307+
JoinOperator.getOperatorName() + "|" +
1308+
MapJoinOperator.getOperatorName() + "|" +
1309+
CommonMergeJoinOperator.getOperatorName() + "|" +
1310+
")%.*" + reduceSinkOp;
13031311
Map<SemanticRule, SemanticNodeProcessor> opRules = new LinkedHashMap<SemanticRule, SemanticNodeProcessor>();
13041312
opRules.put(
1305-
new RuleRegExp("Top n key optimization", ReduceSinkOperator.getOperatorName() + "%"),
1313+
new RuleRegExp("Top n key optimization", topNKeyRegexPattern),
13061314
new TopNKeyProcessor(
13071315
HiveConf.getIntVar(procCtx.conf, HiveConf.ConfVars.HIVE_MAX_TOPN_ALLOWED),
13081316
HiveConf.getFloatVar(procCtx.conf, ConfVars.HIVE_TOPN_EFFICIENCY_THRESHOLD),
@@ -1322,6 +1330,15 @@ private static void runTopNKeyOptimization(OptimizeTezProcContext procCtx)
13221330
ogw.startWalking(topNodes, null);
13231331
}
13241332

1333+
private static boolean hasPTFReduceSink(OptimizeTezProcContext ctx) {
1334+
for (ReduceSinkOperator rs : ctx.visitedReduceSinks) {
1335+
if (rs.getConf().isPTFReduceSink()) {
1336+
return true;
1337+
}
1338+
}
1339+
return false;
1340+
}
1341+
13251342
private boolean findParallelSemiJoinBranch(Operator<?> mapjoin, TableScanOperator bigTableTS,
13261343
ParseContext parseContext,
13271344
Map<ReduceSinkOperator, TableScanOperator> semijoins,

0 commit comments

Comments
 (0)