Skip to content

Commit 8916667

Browse files
author
yingying
committed
new aggregate function:merge_map_with_keytime
1 parent e92b27a commit 8916667

File tree

5 files changed

+301
-0
lines changed

5 files changed

+301
-0
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4358,6 +4358,16 @@ public InlineElement getDescription() {
43584358
}
43594359
}
43604360

4361+
public static final Integer TS_FIELD_INDEX = Integer.valueOf("ts_field_index");
4362+
4363+
public int fieldMergeMapWithKeyTimeAggTsFieldIndex() {
4364+
try {
4365+
return Integer.parseInt(String.valueOf(TS_FIELD_INDEX));
4366+
} catch (NumberFormatException e) {
4367+
return 1;
4368+
}
4369+
}
4370+
43614371
/**
43624372
* Action to take when an UPDATE (e.g. via MERGE INTO) modifies columns that are covered by a
43634373
* global index.
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.mergetree.compact.aggregate;
20+
21+
import org.apache.paimon.data.GenericMap;
22+
import org.apache.paimon.data.InternalArray;
23+
import org.apache.paimon.data.InternalMap;
24+
import org.apache.paimon.data.InternalRow;
25+
import org.apache.paimon.types.DataTypes;
26+
import org.apache.paimon.types.MapType;
27+
import org.apache.paimon.types.RowType;
28+
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
/** Aggregator for merging maps with key and timestamp. */
33+
public class FieldMergeMapWithKeyTimeAgg extends FieldAggregator {
34+
35+
private static final long serialVersionUID = 1L;
36+
37+
private final InternalArray.ElementGetter keyGetter;
38+
private final InternalArray.ElementGetter valueGetter;
39+
private final int timestampFieldIndex;
40+
41+
public FieldMergeMapWithKeyTimeAgg(String name, MapType dataType, int timestampFieldIndex) {
42+
super(name, dataType);
43+
this.keyGetter = InternalArray.createElementGetter(dataType.getKeyType());
44+
this.valueGetter = InternalArray.createElementGetter(dataType.getValueType());
45+
46+
if (!(dataType.getValueType() instanceof RowType)) {
47+
throw new IllegalArgumentException("Value type must be ROW<value, timestamp>");
48+
}
49+
RowType rowType = (RowType) dataType.getValueType();
50+
if (rowType.getFieldCount() < 2) {
51+
throw new IllegalArgumentException("ROW type must have at least 2 fields");
52+
}
53+
54+
if (!rowType.getTypeAt(1).equals(DataTypes.STRING())) {
55+
throw new IllegalArgumentException("Timestamp field must be STRING");
56+
}
57+
58+
this.timestampFieldIndex = timestampFieldIndex;
59+
}
60+
61+
@Override
62+
public Object agg(Object accumulator, Object inputField) {
63+
if (accumulator == null) {
64+
return inputField;
65+
}
66+
if (inputField == null) {
67+
return accumulator;
68+
}
69+
70+
InternalMap accMap = (InternalMap) accumulator;
71+
InternalMap inputMap = (InternalMap) inputField;
72+
73+
Map<Object, Object> resultMap = new HashMap<>();
74+
putToMap(resultMap, accMap);
75+
76+
mergeInputMap(resultMap, inputMap);
77+
78+
return new GenericMap(resultMap);
79+
}
80+
81+
private void putToMap(Map<Object, Object> map, InternalMap data) {
82+
InternalArray keyArray = data.keyArray();
83+
InternalArray valueArray = data.valueArray();
84+
for (int i = 0; i < keyArray.size(); i++) {
85+
Object key = keyGetter.getElementOrNull(keyArray, i);
86+
Object value = valueGetter.getElementOrNull(valueArray, i);
87+
map.put(key, value);
88+
}
89+
}
90+
91+
private void mergeInputMap(Map<Object, Object> resultMap, InternalMap inputMap) {
92+
InternalArray keyArray = inputMap.keyArray();
93+
InternalArray valueArray = inputMap.valueArray();
94+
95+
for (int i = 0; i < keyArray.size(); i++) {
96+
Object key = keyGetter.getElementOrNull(keyArray, i);
97+
InternalRow newRow = (InternalRow) valueGetter.getElementOrNull(valueArray, i);
98+
99+
if (newRow == null) {
100+
resultMap.remove(key);
101+
continue;
102+
}
103+
104+
if (newRow.isNullAt(timestampFieldIndex)) {
105+
continue;
106+
}
107+
108+
Object existingValue = resultMap.get(key);
109+
if (existingValue == null) {
110+
resultMap.put(key, newRow);
111+
} else {
112+
InternalRow existingRow = (InternalRow) existingValue;
113+
if (existingRow.isNullAt(timestampFieldIndex)) {
114+
resultMap.put(key, newRow);
115+
} else {
116+
String newTs = newRow.getString(timestampFieldIndex).toString();
117+
String existingTs = existingRow.getString(timestampFieldIndex).toString();
118+
if (newTs.compareTo(existingTs) > 0) {
119+
resultMap.put(key, newRow);
120+
}
121+
}
122+
}
123+
}
124+
}
125+
126+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.mergetree.compact.aggregate.factory;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapWithKeyTimeAgg;
23+
import org.apache.paimon.types.DataType;
24+
import org.apache.paimon.types.DataTypes;
25+
import org.apache.paimon.types.MapType;
26+
import org.apache.paimon.types.RowType;
27+
28+
import static org.apache.paimon.utils.Preconditions.checkArgument;
29+
30+
/** Factory for {@link FieldMergeMapWithKeyTimeAgg}. */
31+
public class FieldMergeMapWithKeyTimeAggFactory implements FieldAggregatorFactory {
32+
33+
public static final String NAME = "merge_map_with_keytime";
34+
35+
@Override
36+
public FieldMergeMapWithKeyTimeAgg create(
37+
DataType fieldType, CoreOptions options, String field) {
38+
checkArgument(
39+
fieldType instanceof MapType,
40+
"Data type for field '%s' must be 'MAP' but was '%s'",
41+
field,
42+
fieldType);
43+
44+
MapType mapType = (MapType) fieldType;
45+
DataType valueType = mapType.getValueType();
46+
47+
checkArgument(
48+
valueType instanceof RowType,
49+
"Value type of MAP for field '%s' must be ROW but was '%s'",
50+
field,
51+
valueType);
52+
53+
RowType rowType = (RowType) valueType;
54+
checkArgument(
55+
rowType.getFieldCount() >= 2,
56+
"ROW type for field '%s' must have at least 2 fields, but found %s",
57+
field,
58+
rowType.getFieldCount());
59+
60+
checkArgument(
61+
DataTypes.STRING().equals(rowType.getTypeAt(1)),
62+
"The second field (timestamp) of ROW in field '%s' must be STRING, but was '%s'",
63+
field,
64+
rowType.getTypeAt(1));
65+
66+
return new FieldMergeMapWithKeyTimeAgg(
67+
NAME, mapType, options.fieldMergeMapWithKeyTimeAggTsFieldIndex());
68+
}
69+
70+
@Override
71+
public String identifier() {
72+
return NAME;
73+
}
74+
}

paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory
2828
org.apache.paimon.mergetree.compact.aggregate.factory.FieldListaggAggFactory
2929
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory
3030
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory
31+
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapWithKeyTimeAggFactory
3132
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory
3233
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory
3334
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory

paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171

7272
import java.io.IOException;
7373
import java.math.BigDecimal;
74+
import java.util.AbstractMap;
7475
import java.util.Arrays;
7576
import java.util.Collections;
7677
import java.util.HashMap;
@@ -1215,4 +1216,93 @@ private Map<Object, Object> toJavaMap(Object data) {
12151216
}
12161217
return result;
12171218
}
1219+
1220+
@Test
1221+
public void testFieldMergeMapWithKeyTimeAgg() {
1222+
MapType mapType =
1223+
DataTypes.MAP(
1224+
DataTypes.STRING(),
1225+
DataTypes.ROW(
1226+
DataTypes.FIELD(0, "actual_value", DataTypes.STRING()),
1227+
DataTypes.FIELD(1, "dbsync_ts", DataTypes.STRING())));
1228+
FieldMergeMapWithKeyTimeAgg agg = new FieldMergeMapWithKeyTimeAgg("test", mapType, 1);
1229+
1230+
GenericMap map1 =
1231+
createTestMap(
1232+
createEntry("key1", "A", "17682882903686900100"),
1233+
createEntry("key2", "B", "17682882903686900100"));
1234+
GenericMap map2 =
1235+
createTestMap(
1236+
createEntry("key1", "A1", "17682882903686900200"),
1237+
createEntry("key3", "C", "17682882903686900200"));
1238+
GenericMap map3 = createTestMap(createEntry("key2", "B2", "17682882903686900050"));
1239+
1240+
Object acc = agg.agg(null, map1);
1241+
assertTestMap(acc, createExpectedEntry("key1", "A"), createExpectedEntry("key2", "B"));
1242+
1243+
acc = agg.agg(acc, map2);
1244+
assertTestMap(
1245+
acc,
1246+
createExpectedEntry("key1", "A1"),
1247+
createExpectedEntry("key2", "B"),
1248+
createExpectedEntry("key3", "C"));
1249+
1250+
acc = agg.agg(acc, map3);
1251+
assertTestMap(
1252+
acc,
1253+
createExpectedEntry("key1", "A1"),
1254+
createExpectedEntry("key2", "B"),
1255+
createExpectedEntry("key3", "C"));
1256+
}
1257+
1258+
private Map.Entry<BinaryString, InternalRow> createEntry(String key, String value, String ts) {
1259+
return new AbstractMap.SimpleEntry<>(
1260+
BinaryString.fromString(key),
1261+
GenericRow.of(
1262+
value == null ? null : BinaryString.fromString(value),
1263+
ts == null ? null : BinaryString.fromString(ts)));
1264+
}
1265+
1266+
private Map.Entry<String, String> createExpectedEntry(String key, String value) {
1267+
return new AbstractMap.SimpleEntry<>(key, value);
1268+
}
1269+
1270+
@SafeVarargs
1271+
private final GenericMap createTestMap(Map.Entry<BinaryString, InternalRow>... entries) {
1272+
Map<BinaryString, InternalRow> map = new HashMap<>();
1273+
for (Map.Entry<BinaryString, InternalRow> entry : entries) {
1274+
map.put(entry.getKey(), entry.getValue());
1275+
}
1276+
return new GenericMap(map);
1277+
}
1278+
1279+
@SafeVarargs
1280+
private final void assertTestMap(Object mapObj, Map.Entry<String, String>... expected) {
1281+
InternalMap map = (InternalMap) mapObj;
1282+
Map<String, String> actual = new HashMap<>();
1283+
1284+
InternalArray keyArray = map.keyArray();
1285+
InternalArray valueArray = map.valueArray();
1286+
1287+
for (int i = 0; i < map.size(); i++) {
1288+
BinaryString keyBinary = keyArray.getString(i);
1289+
String key = keyBinary.toString();
1290+
1291+
InternalRow row = valueArray.getRow(i, 2);
1292+
1293+
String value = null;
1294+
if (!row.isNullAt(0)) {
1295+
BinaryString valueBinary = row.getString(0);
1296+
value = valueBinary != null ? valueBinary.toString() : null;
1297+
}
1298+
actual.put(key, value);
1299+
}
1300+
1301+
Map<String, String> expectedMap = new HashMap<>();
1302+
for (Map.Entry<String, String> e : expected) {
1303+
expectedMap.put(e.getKey(), e.getValue());
1304+
}
1305+
1306+
assertThat(actual).containsExactlyInAnyOrderEntriesOf(expectedMap);
1307+
}
12181308
}

0 commit comments

Comments
 (0)