-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-28655: Implement HMS Related Drop Stats Changes (Part2) COL_STATS_ACCURATE related changes #6198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
HIVE-28655: Implement HMS Related Drop Stats Changes (Part2) COL_STATS_ACCURATE related changes #6198
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| import static org.apache.hadoop.hive.metastore.ColumnType.VARCHAR_TYPE_NAME; | ||
|
|
||
| import java.sql.Connection; | ||
| import java.sql.ResultSet; | ||
| import java.sql.SQLException; | ||
| import java.sql.Statement; | ||
| import java.util.ArrayList; | ||
|
|
@@ -53,6 +54,8 @@ | |
| import javax.jdo.Transaction; | ||
| import javax.jdo.datastore.JDOConnection; | ||
|
|
||
| import com.fasterxml.jackson.core.type.TypeReference; | ||
| import com.fasterxml.jackson.databind.ObjectMapper; | ||
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.collect.ImmutableSet; | ||
| import org.apache.commons.lang3.StringUtils; | ||
|
|
@@ -3304,6 +3307,183 @@ public List<Void> run(List<String> input) throws Exception { | |
| return true; | ||
| } | ||
|
|
||
| /** | ||
| a helper function which will firstly get the current COLUMN_STATS_ACCURATE parameter on table level | ||
| secondly convert the JSON String into map, and update the information in it, and convert it back to JSON | ||
| thirdly update the COLUMN_STATS_ACCURATE parameter with the new value on table level using directSql | ||
| */ | ||
| public long updateColumnStatsAccurateForTable(Table table, List<String> droppedCols) throws MetaException { | ||
| String currentValue = table.getParameters().get(StatsSetupConst.COLUMN_STATS_ACCURATE); | ||
| if (currentValue == null) { | ||
| return 0; | ||
| } | ||
|
|
||
| try { | ||
| ObjectMapper mapper = new ObjectMapper(); | ||
|
|
||
| // Deserialize the JSON into a map | ||
| Map<String, Object> statsMap = mapper.readValue(currentValue, new TypeReference<Map<String, Object>>() {}); | ||
|
|
||
| // Get the COLUMN_STATS object if it exists | ||
| Object columnStatsObj = statsMap.get(StatsSetupConst.COLUMN_STATS); | ||
|
|
||
| if (columnStatsObj instanceof Map) { | ||
| Map<String, String> columnStats = (Map<String, String>) columnStatsObj; | ||
|
|
||
| boolean removeAll = droppedCols == null || droppedCols.isEmpty() || droppedCols.size() == columnStats.size(); | ||
|
|
||
| if (removeAll) { | ||
| // Remove entire column stats | ||
| statsMap.remove(StatsSetupConst.COLUMN_STATS); | ||
| } else { | ||
| // Remove only the dropped columns | ||
| for (String col : droppedCols) { | ||
| if (col != null) { | ||
| columnStats.remove(col.toLowerCase()); | ||
| } | ||
DanielZhu58 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
| } | ||
|
|
||
| // Serialize the map into a new JSON string | ||
| String updatedValue = mapper.writeValueAsString(statsMap); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To update it is the intentional behavior. I made the new JSON String, and I updated the COLUMN_STATS_ACCURATE instead of removing it. It can show that user deleted column stats manually. |
||
|
|
||
| // Update the COLUMN_STATS_ACCURATE parameter | ||
| return updateTableParam(table, StatsSetupConst.COLUMN_STATS_ACCURATE, currentValue, updatedValue); | ||
| } catch (Exception e) { | ||
| throw new MetaException("Failed to parse/update COLUMN_STATS_ACCURATE: " + e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
|
|
||
|
|
||
| public boolean updateColumnStatsAccurateForPartitions(String catName, String dbName, Table table, | ||
| List<String> partNames, List<String> colNames) throws MetaException { | ||
| if (partNames == null || partNames.isEmpty()) { | ||
| return true; | ||
| } | ||
|
|
||
| ObjectMapper mapper = new ObjectMapper(); | ||
|
|
||
| // If colNames is empty, then all the column stats of all columns should be deleted fetch all table column names | ||
| List<String> effectiveColNames; | ||
| if (colNames == null || colNames.isEmpty()) { | ||
| if (table.getSd().getCols() == null) { | ||
| effectiveColNames = new ArrayList<>(); | ||
| } else { | ||
| effectiveColNames = table.getSd().getCols().stream() | ||
| .map(f -> f.getName().toLowerCase()) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| } else { | ||
| effectiveColNames = colNames.stream().map(String::toLowerCase).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| try { | ||
| Batchable.runBatched(batchSize, partNames, new Batchable<String, Void>() { | ||
| @Override | ||
| public List<Void> run(List<String> input) throws Exception { | ||
| // 1. Construct SQL filter for partition names | ||
| String sqlFilter = PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")"; | ||
|
|
||
| // 2. Fetch PART_IDs of the partitions which are need to be changed | ||
| List<Long> partitionIds = getPartitionIdsViaSqlFilter( | ||
| catName, dbName, table.getTableName(), sqlFilter, input, Collections.emptyList(), -1); | ||
|
|
||
| if (partitionIds.isEmpty()) return null; | ||
|
|
||
| // 3. Get current COLUMN_STATS_ACCURATE values | ||
| Map<Long, String> partStatsAccurateMap = getColumnStatsAccurateByPartitionIds(partitionIds); | ||
|
|
||
| // 4. Iterate each partition to update COLUMN_STATS_ACCURATE | ||
| for (Long partId : partitionIds) { | ||
| String currentValue = partStatsAccurateMap.get(partId); | ||
| if (currentValue == null) continue; | ||
|
|
||
| try { | ||
| Map<String, Object> statsMap = mapper.readValue( | ||
| currentValue, new TypeReference<Map<String, Object>>() {}); | ||
| Object columnStatsObj = statsMap.get("COLUMN_STATS"); | ||
|
|
||
| boolean changed = false; | ||
| if (columnStatsObj instanceof Map) { | ||
| Map<String, String> columnStats = (Map<String, String>) columnStatsObj; | ||
| for (String col : effectiveColNames) { | ||
| if (columnStats.remove(col) != null) { | ||
| changed = true; | ||
| } | ||
| } | ||
|
|
||
| if (columnStats.isEmpty()) { | ||
| statsMap.remove("COLUMN_STATS"); | ||
| changed = true; | ||
| } | ||
| } | ||
|
|
||
| if (!statsMap.containsKey("COLUMN_STATS")) { | ||
| if (statsMap.remove("BASIC_STATS") != null) { | ||
| changed = true; | ||
| } | ||
| } | ||
|
|
||
| if (changed) { | ||
| String updatedValue = mapper.writeValueAsString(statsMap); | ||
| updatePartitionParam(partId, | ||
| StatsSetupConst.COLUMN_STATS_ACCURATE, currentValue, updatedValue); | ||
| } | ||
|
|
||
| } catch (Exception e) { | ||
| throw new MetaException("Failed to update COLUMN_STATS_ACCURATE for PART_ID " + partId + ": " + e.getMessage()); | ||
| } | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
| }); | ||
|
|
||
| return true; // All succeeded | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to update COLUMN_STATS_ACCURATE for some partitions", e); | ||
| return false; // Failed batch | ||
| } | ||
| } | ||
|
|
||
|
|
||
| private Map<Long, String> getColumnStatsAccurateByPartitionIds(List<Long> partIds) throws MetaException { | ||
| if (partIds == null || partIds.isEmpty()) { | ||
| return Collections.emptyMap(); | ||
| } | ||
|
|
||
| StringBuilder queryText = new StringBuilder(); | ||
| queryText.append("SELECT \"PART_ID\", \"PARAM_VALUE\" FROM ") | ||
| .append(PARTITION_PARAMS) | ||
| .append(" WHERE \"PARAM_KEY\" = ? AND \"PART_ID\" IN (") | ||
| .append(makeParams(partIds.size())) | ||
| .append(")"); | ||
|
|
||
| // Create params: first COLUMN_STATS_ACCURATE, then all partIds | ||
| Object[] params = new Object[1 + partIds.size()]; | ||
| params[0] = StatsSetupConst.COLUMN_STATS_ACCURATE; | ||
| for (int i = 0; i < partIds.size(); i++) { | ||
| params[i + 1] = partIds.get(i); | ||
| } | ||
|
|
||
| try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText.toString()))) { | ||
| @SuppressWarnings("unchecked") | ||
| List<Object> sqlResult = executeWithArray(query.getInnerQuery(), params, queryText.toString()); | ||
|
|
||
| Map<Long, String> result = new HashMap<>(); | ||
| for (Object row : sqlResult) { | ||
| Object[] fields = (Object[]) row; | ||
| Long partId = MetastoreDirectSqlUtils.extractSqlLong(fields[0]); | ||
| String value = fields[1] == null ? null : fields[1].toString(); | ||
| result.put(partId, value); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
| } | ||
|
|
||
| public Map<String, Map<String, String>> updatePartitionColumnStatisticsBatch( | ||
| Map<String, ColumnStatistics> partColStatsMap, | ||
| Table tbl, | ||
|
|
@@ -3424,4 +3604,14 @@ long updateTableParam(Table table, String key, String expectedValue, String newV | |
| Query query = pm.newQuery("javax.jdo.query.SQL", statement); | ||
| return (long) query.executeWithArray(newValue, table.getId(), key, expectedValue); | ||
| } | ||
|
|
||
| long updatePartitionParam(Long partitionID, String key, String expectedValue, String newValue) { | ||
| String statement = TxnUtils.createUpdatePreparedStmt( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about update the partitions in batch? |
||
| "\"PARTITION_PARAMS\"", | ||
| ImmutableList.of("\"PARAM_VALUE\""), | ||
| ImmutableList.of("\"PART_ID\"", "\"PARAM_KEY\"", dbType.toVarChar("\"PARAM_VALUE\""))); | ||
|
|
||
| Query query = pm.newQuery("javax.jdo.query.SQL", statement); | ||
| return (long) query.executeWithArray(newValue, partitionID, key, expectedValue); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9963,7 +9963,11 @@ protected String describeResult() { | |
| } | ||
| @Override | ||
| protected Boolean getSqlResult(GetHelper<Boolean> ctx) throws MetaException { | ||
| return directSql.deletePartitionColumnStats(catName, dbName, tableName, partNames, colNames, engine); | ||
| if (directSql.deletePartitionColumnStats(catName, dbName, tableName, partNames, colNames, engine)){ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how if the |
||
| directSql.updateColumnStatsAccurateForPartitions(catName, dbName, getTable(), partNames, colNames); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| @Override | ||
| protected Boolean getJdoResult(GetHelper<Boolean> ctx) | ||
|
|
@@ -10034,6 +10038,9 @@ public List<Void> run(List<String> input) throws Exception { | |
| } finally { | ||
| b.closeAllQueries(); | ||
| } | ||
| // Update COLUMN_STATS_ACCURATE after stats are dropped | ||
| Table tbl = getTable(catName, dbName, tableName); | ||
| directSql.updateColumnStatsAccurateForPartitions(catName, dbName, tbl, partNames, colNames); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use |
||
| ret = commitTransaction(); | ||
| } finally { | ||
| rollbackAndCleanup(ret, null); | ||
|
|
@@ -10056,7 +10063,18 @@ protected String describeResult() { | |
| } | ||
| @Override | ||
| protected Boolean getSqlResult(GetHelper<Boolean> ctx) throws MetaException { | ||
| return directSql.deleteTableColumnStatistics(getTable().getId(), colNames, engine); | ||
| MetaStoreDirectSql d = directSql; // snapshot | ||
| if (d == null) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why could
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acknowledged. This is over protection. This whole function would be invoked when directSql is not null. |
||
| // Initialize the directSql again in case it became null in-between | ||
| String schema = PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema"); | ||
| schema = org.apache.commons.lang3.StringUtils.defaultIfBlank(schema, null); | ||
| d = new MetaStoreDirectSql(pm, conf, schema); | ||
| } | ||
| if (d.deleteTableColumnStatistics(getTable().getId(), colNames, engine)) { | ||
| d.updateColumnStatsAccurateForTable(getTable(), colNames); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| @Override | ||
| protected Boolean getJdoResult(GetHelper<Boolean> ctx) | ||
|
|
@@ -10089,14 +10107,14 @@ private boolean deleteTableColumnStatisticsViaJdo(String catName, String dbName, | |
| query.setFilter(filter); | ||
| query.declareParameters(parameters); | ||
| List<Object> params = new ArrayList<>(); | ||
| params.add(normalizeIdentifier(tableName)); | ||
| params.add(normalizeIdentifier(dbName)); | ||
| params.add(normalizeIdentifier(catName)); | ||
| params.add(tableName == null ? null : normalizeIdentifier(tableName)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acknowledged.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Acknowledged. |
||
| params.add(dbName == null ? null : normalizeIdentifier(dbName)); | ||
| params.add(catName == null ? null : normalizeIdentifier(catName)); | ||
| if (colNames != null && !colNames.isEmpty()) { | ||
| List<String> normalizedColNames = new ArrayList<>(); | ||
| for (String colName : colNames){ | ||
| // trim the extra spaces, and change to lowercase | ||
| normalizedColNames.add(normalizeIdentifier(colName)); | ||
| normalizedColNames.add(colName == null ? null : normalizeIdentifier(colName)); | ||
| } | ||
| params.add(normalizedColNames); | ||
| } | ||
|
|
@@ -10111,6 +10129,19 @@ private boolean deleteTableColumnStatisticsViaJdo(String catName, String dbName, | |
| throw new NoSuchObjectException("Column stats doesn't exist for db=" + dbName + " table=" | ||
| + tableName + " col=" + String.join(", ", colNames)); | ||
| } | ||
| if (mStatsObjColl != null) { | ||
| pm.deletePersistentAll(mStatsObjColl); | ||
DanielZhu58 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Update COLUMN_STATS_ACCURATE to reflect the deletion | ||
| Table tbl = getTable(catName, dbName, tableName); | ||
| // Initialize the directSql again in case it became null in-between | ||
| String schema = PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema"); | ||
| schema = org.apache.commons.lang3.StringUtils.defaultIfBlank(schema, null); | ||
| directSql = new MetaStoreDirectSql(pm, conf, schema); // Use the original colNames (can be null or empty) | ||
| directSql.updateColumnStatsAccurateForTable(tbl, colNames); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use |
||
| } else { | ||
| throw new NoSuchObjectException("Column stats doesn't exist for db=" + dbName + | ||
| " table=" + tableName + " col=" + (colNames != null ? String.join(", ", colNames) : "ALL")); | ||
| } | ||
| ret = commitTransaction(); | ||
| } finally { | ||
| rollbackAndCleanup(ret, query); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1847,6 +1847,11 @@ public void testColumnStatistics() throws Throwable { | |
| List<ColumnStatisticsObj> stats = client.getTableColumnStatistics( | ||
| dbName, tblName, Lists.newArrayList(colName[1]), ENGINE); | ||
| assertTrue("stats are not empty: " + stats, stats.isEmpty()); | ||
| // test if all columns are deleted from parameter COLUMN_STATS_ACCURATE | ||
| Map<String, String> tableParams = client.getTable(dbName, tblName).getParameters(); | ||
| String table_column_stats_accurate = tableParams.get("COLUMN_STATS_ACCURATE"); | ||
| assertTrue("parameter COLUMN_STATS_ACCURATE is not accurate in " + tblName, table_column_stats_accurate == null || | ||
| (!table_column_stats_accurate.contains(colName[0]) && !table_column_stats_accurate.contains(colName[1]))); | ||
|
|
||
| colStats.setStatsDesc(statsDesc); | ||
| colStats.setStatsObj(statsObjs); | ||
|
|
@@ -1864,6 +1869,11 @@ public void testColumnStatistics() throws Throwable { | |
| // multiple columns | ||
| request.setCol_names(Arrays.asList(colName)); | ||
| assertTrue(client.deleteColumnStatistics(request)); | ||
| // test if the columns in colName array are deleted from parameter COLUMN_STATS_ACCURATE | ||
| tableParams = client.getTable(dbName, tblName).getParameters(); | ||
| table_column_stats_accurate = tableParams.get("COLUMN_STATS_ACCURATE"); | ||
| assertTrue("parameter COLUMN_STATS_ACCURATE is not accurate in " + tblName, table_column_stats_accurate == null || | ||
| (!table_column_stats_accurate.contains(colName[0]) && !table_column_stats_accurate.contains(colName[1]))); | ||
| colStats3 = client.getTableColumnStatistics( | ||
| dbName, tblName, Lists.newArrayList(colName), ENGINE); | ||
| assertTrue("stats are not empty: " + colStats3, colStats3.isEmpty()); | ||
|
|
@@ -1959,6 +1969,12 @@ public void testColumnStatistics() throws Throwable { | |
| Lists.newArrayList(partitions.get(0), partitions.get(1), partitions.get(2)), Lists.newArrayList(colName), ENGINE); | ||
| assertEquals(1, stats2.size()); | ||
| assertEquals(2, stats2.get(partitions.get(2)).size()); | ||
| // test if all columns are deleted from parameter COLUMN_STATS_ACCURATE | ||
| Partition partition_0 = client.getPartition(dbName, tblName, partitions.get(0)); | ||
| Map<String, String> partitionParams = partition_0.getParameters(); | ||
| String partition_column_stats_accurate = partitionParams.get("COLUMN_STATS_ACCURATE"); | ||
| assertTrue("parameter COLUMN_STATS_ACCURATE is not accurate in " + partitions.get(0),partition_column_stats_accurate == null || | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for drop all columns, the |
||
| (!table_column_stats_accurate.contains(colName[0]) && !table_column_stats_accurate.contains(colName[1]))); | ||
|
|
||
| // no partition or column name is set | ||
| request.unsetPart_names(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.