Skip to content

Commit 7eff186

Browse files
committed
HIVE-29354: Projection and Filter Pushdown for Shredded VARIANT Columns
1 parent 3d58586 commit 7eff186

30 files changed

+4265
-566
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@
4242
import org.apache.iceberg.expressions.Expressions;
4343
import org.apache.iceberg.expressions.Literal;
4444
import org.apache.iceberg.expressions.UnboundTerm;
45+
import org.apache.iceberg.mr.hive.variant.VariantPathUtil;
4546
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4647
import org.apache.iceberg.transforms.Transforms;
48+
import org.apache.iceberg.types.Type;
4749
import org.apache.iceberg.types.Types;
4850
import org.apache.iceberg.util.DateTimeUtil;
4951
import org.apache.iceberg.util.NaNUtil;
@@ -112,8 +114,8 @@ private static Expression translate(ExpressionTree tree, List<PredicateLeaf> lea
112114
private static Expression translateLeaf(PredicateLeaf leaf) {
113115
TransformSpec transformSpec = TransformSpec.fromStringWithColumnName(leaf.getColumnName());
114116
String columnName = transformSpec.getColumnName();
115-
UnboundTerm<Object> column =
116-
ObjectUtils.defaultIfNull(toTerm(columnName, transformSpec), Expressions.ref(columnName));
117+
118+
UnboundTerm<Object> column = resolveTerm(columnName, leaf, transformSpec);
117119

118120
switch (leaf.getOperator()) {
119121
case EQUALS:
@@ -144,6 +146,15 @@ private static Expression translateLeaf(PredicateLeaf leaf) {
144146
}
145147
}
146148

149+
private static UnboundTerm<Object> resolveTerm(String columnName, PredicateLeaf leaf, TransformSpec transformSpec) {
150+
UnboundTerm<Object> column = tryVariantExtractTerm(columnName, leaf);
151+
if (column != null) {
152+
return column;
153+
}
154+
return ObjectUtils.defaultIfNull(
155+
toTerm(columnName, transformSpec), Expressions.ref(columnName));
156+
}
157+
147158
public static UnboundTerm<Object> toTerm(String columnName, TransformSpec transformSpec) {
148159
if (transformSpec == null) {
149160
return null;
@@ -168,6 +179,66 @@ public static UnboundTerm<Object> toTerm(String columnName, TransformSpec transf
168179
}
169180
}
170181

182+
/**
183+
* Converts a shredded variant pseudo-column (e.g. {@code data.typed_value.age}) into an Iceberg variant extract term
184+
* (e.g. {@code extract(data, "$.age", "long")}).
185+
*
186+
* <p>This enables Iceberg to prune manifests/files using variant metrics produced when variant shredding is enabled.
187+
*/
188+
private static UnboundTerm<Object> tryVariantExtractTerm(String columnName, PredicateLeaf leaf) {
189+
int typedIdx = columnName.indexOf(VariantPathUtil.TYPED_VALUE_SEGMENT);
190+
if (typedIdx < 0) {
191+
return null;
192+
}
193+
194+
String variantColumn = columnName.substring(0, typedIdx);
195+
String extractedPath =
196+
columnName.substring(typedIdx + VariantPathUtil.TYPED_VALUE_SEGMENT.length());
197+
if (variantColumn.isEmpty() || extractedPath.isEmpty()) {
198+
return null;
199+
}
200+
201+
Type.PrimitiveType icebergType = extractPrimitiveType(leaf);
202+
if (icebergType == null) {
203+
return null;
204+
}
205+
206+
// Build an RFC9535 shorthand JSONPath-like path: "$.field" or "$.a.b"
207+
String jsonPath = "$." + extractedPath;
208+
try {
209+
return Expressions.extract(variantColumn, jsonPath, icebergType.toString());
210+
} catch (RuntimeException e) {
211+
// Invalid path/type; fall back to normal reference handling.
212+
return null;
213+
}
214+
}
215+
216+
private static Type.PrimitiveType extractPrimitiveType(PredicateLeaf leaf) {
217+
// Returned types must serialize (via toString) into Iceberg primitive type strings accepted by
218+
// Types.fromPrimitiveString.
219+
switch (leaf.getType()) {
220+
case LONG:
221+
return Types.LongType.get();
222+
case FLOAT:
223+
// Hive SARG uses FLOAT for both float/double; using double is the safest default.
224+
return Types.DoubleType.get();
225+
case STRING:
226+
return Types.StringType.get();
227+
case BOOLEAN:
228+
return Types.BooleanType.get();
229+
case DATE:
230+
return Types.DateType.get();
231+
case TIMESTAMP:
232+
// Iceberg timestamps are represented as micros in a long, but the Iceberg type is timestamp.
233+
return Types.TimestampType.withoutZone();
234+
case DECIMAL:
235+
// Precision/scale are not available in the SARG leaf type.
236+
return null;
237+
default:
238+
return null;
239+
}
240+
}
241+
171242
// PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to
172243
// Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it.
173244
private static final DynFields.UnboundField<?> LITERAL_FIELD = DynFields.builder()

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java

Lines changed: 98 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.iceberg.expressions.ResidualEvaluator;
5555
import org.apache.iceberg.hive.HiveVersion;
5656
import org.apache.iceberg.mr.InputFormatConfig;
57+
import org.apache.iceberg.mr.hive.variant.VariantFilterRewriter;
5758
import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
5859
import org.apache.iceberg.mr.mapred.Container;
5960
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
@@ -91,65 +92,129 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
9192
}
9293

9394
/**
94-
* Converts the Hive filter found in the job conf to an Iceberg filter expression.
95-
* @param conf - job conf
96-
* @return - Iceberg data filter expression
95+
* Encapsulates planning-time and reader-time Iceberg filter expressions derived from Hive predicates.
9796
*/
98-
static Expression icebergDataFilterFromHiveConf(Configuration conf) {
99-
Expression icebergFilter = SerializationUtil.deserializeFromBase64(conf.get(InputFormatConfig.FILTER_EXPRESSION));
100-
if (icebergFilter != null) {
101-
// in case we already have it prepared..
102-
return icebergFilter;
97+
private static final class FilterExpressions {
98+
99+
private static Expression planningFilter(Configuration conf) {
100+
// Planning-safe filter (extract removed) may already be serialized for reuse.
101+
Expression planningFilter = SerializationUtil
102+
.deserializeFromBase64(conf.get(InputFormatConfig.FILTER_EXPRESSION));
103+
if (planningFilter != null) {
104+
// in case we already have it prepared..
105+
return planningFilter;
106+
}
107+
// Reader filter should retain extract(...) for row-group pruning. Rebuild from Hive predicate to avoid losing
108+
// variant rewrites when planningFilter was stripped.
109+
Expression readerFilter = icebergDataFilterFromHiveConf(conf);
110+
if (readerFilter != null) {
111+
return VariantFilterRewriter.stripVariantExtractPredicates(readerFilter);
112+
}
113+
return null;
103114
}
104-
String hiveFilter = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
105-
if (hiveFilter != null) {
106-
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
107-
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
108-
return getFilterExpr(conf, exprNodeDesc);
115+
116+
private static Expression icebergDataFilterFromHiveConf(Configuration conf) {
117+
// Build an Iceberg filter from Hive's serialized predicate so we can preserve extract(...) terms for
118+
// reader-level pruning (e.g. Parquet shredded VARIANT row-group pruning).
119+
//
120+
// This intentionally does NOT consult FILTER_EXPRESSION, because FILTER_EXPRESSION must remain safe for
121+
// Iceberg planning-time utilities (some of which cannot stringify extract(...) terms).
122+
String hiveFilter = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
123+
if (hiveFilter != null) {
124+
ExprNodeGenericFuncDesc exprNodeDesc =
125+
SerializationUtilities.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
126+
return getFilterExpr(conf, exprNodeDesc);
127+
}
128+
return null;
129+
}
130+
131+
private static Expression planningResidual(FileScanTask task, Configuration conf) {
132+
return residual(task, conf, planningFilter(conf));
133+
}
134+
135+
private static Expression readerResidual(FileScanTask task, Configuration conf) {
136+
return residual(task, conf, icebergDataFilterFromHiveConf(conf));
137+
}
138+
139+
private static Expression residual(FileScanTask task, Configuration conf, Expression filter) {
140+
if (filter == null) {
141+
return Expressions.alwaysTrue();
142+
}
143+
boolean caseSensitive = conf.getBoolean(
144+
InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT);
145+
146+
return ResidualEvaluator.of(task.spec(), filter, caseSensitive)
147+
.residualFor(task.file().partition());
109148
}
110-
return null;
111149
}
112150

113151
/**
114-
* getFilterExpr extracts search argument from ExprNodeGenericFuncDesc and returns Iceberg Filter Expression
152+
* Builds an Iceberg filter expression from a Hive predicate expression node.
115153
* @param conf - job conf
116154
* @param exprNodeDesc - Describes a GenericFunc node
117155
* @return Iceberg Filter Expression
118156
*/
119157
static Expression getFilterExpr(Configuration conf, ExprNodeGenericFuncDesc exprNodeDesc) {
120-
if (exprNodeDesc != null) {
121-
SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc);
122-
try {
123-
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
124-
} catch (UnsupportedOperationException e) {
125-
LOG.warn("Unable to create Iceberg filter, proceeding without it (will be applied by Hive later): ", e);
158+
if (exprNodeDesc == null) {
159+
return null;
160+
}
161+
162+
ExprNodeGenericFuncDesc exprForSarg = exprNodeDesc;
163+
if (Boolean.parseBoolean(conf.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED))) {
164+
ExprNodeGenericFuncDesc rewritten = VariantFilterRewriter.rewriteForShredding(exprNodeDesc);
165+
if (rewritten != null) {
166+
exprForSarg = rewritten;
126167
}
127168
}
128-
return null;
169+
170+
SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprForSarg);
171+
if (sarg == null) {
172+
return null;
173+
}
174+
175+
try {
176+
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
177+
} catch (UnsupportedOperationException e) {
178+
LOG.warn(
179+
"Unable to create Iceberg filter, proceeding without it (will be applied by Hive later): ",
180+
e);
181+
return null;
182+
}
129183
}
130184

131185
/**
132-
* Converts Hive filter found in the passed job conf to an Iceberg filter expression. Then evaluates this
133-
* against the task's partition value producing a residual filter expression.
186+
* Returns a residual expression that is safe to apply as a record-level filter.
187+
*
188+
* <p>This residual is derived from the task-level Iceberg planning filter (already extract-free) after
189+
* evaluating it against the task's partition value.
134190
* @param task - file scan task to evaluate the expression against
135191
* @param conf - job conf
136192
* @return - Iceberg residual filter expression
137193
*/
138194
public static Expression residualForTask(FileScanTask task, Configuration conf) {
139-
Expression dataFilter = icebergDataFilterFromHiveConf(conf);
140-
if (dataFilter == null) {
141-
return Expressions.alwaysTrue();
142-
}
143-
return ResidualEvaluator.of(
144-
task.spec(), dataFilter,
145-
conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT)
146-
).residualFor(task.file().partition());
195+
return FilterExpressions.planningResidual(task, conf);
196+
}
197+
198+
/**
199+
* Returns a residual expression intended only for reader-level pruning (best-effort).
200+
*
201+
* <p>This residual is derived from the task-level Iceberg filter after evaluating it against the task's
202+
* partition value. It may include {@code extract(...)} predicates and is suitable for formats/readers that
203+
* can leverage such terms for pruning (e.g. Parquet row-group pruning using shredded VARIANT columns).
204+
*
205+
* <p><strong>Do not</strong> use this for record-level residual filtering, as {@code extract} cannot be
206+
* evaluated at record level in Iceberg readers.
207+
*/
208+
public static Expression residualForReaderPruning(FileScanTask task, Configuration conf) {
209+
return FilterExpressions.readerResidual(task, conf);
147210
}
148211

149212
@Override
150213
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
151-
Expression filter = icebergDataFilterFromHiveConf(job);
214+
Expression filter = FilterExpressions.planningFilter(job);
152215
if (filter != null) {
216+
// Iceberg planning-time utilities may attempt to stringify the filter. Ensure the planning filter never
217+
// contains extract(...) or shredded typed_value references.
153218
job.set(InputFormatConfig.FILTER_EXPRESSION, SerializationUtil.serializeToBase64(filter));
154219
}
155220

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@
187187
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
188188
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
189189
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder;
190-
import org.apache.iceberg.parquet.VariantUtil;
191190
import org.apache.iceberg.puffin.Blob;
192191
import org.apache.iceberg.puffin.BlobMetadata;
193192
import org.apache.iceberg.puffin.Puffin;
@@ -1752,8 +1751,7 @@ private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps)
17521751
if (FileFormat.AVRO == IcebergTableUtil.defaultFileFormat(tableProps::getProperty) ||
17531752
isValidMetadataTable(tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY)) ||
17541753
hasOrcTimeInSchema(tableProps, tableSchema) ||
1755-
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema) ||
1756-
VariantUtil.shouldUseVariantShredding(tableProps::getProperty, tableSchema)) {
1754+
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
17571755
// disable vectorization
17581756
SessionStateUtil.getQueryState(conf).ifPresent(queryState ->
17591757
queryState.getConf().setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false));

0 commit comments

Comments
 (0)