Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,16 @@ private String getValidWriteIds(String dbName, String tblName) throws Throwable
private void validateTablePara(String dbName, String tblName) throws Throwable {
Table tblRead = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
Table tblRead1 = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, dbName, tblName);
Assert.assertEquals(tblRead.getParameters(), tblRead1.getParameters());
// Prepare both the expected and actual table parameters
Map<String, String> expected = new HashMap<>(tblRead.getParameters());
Map<String, String> actual = new HashMap<>(tblRead1.getParameters());

// Remove the COLUMN_STATS_ACCURATE entry from both maps, because it is now completely removed
expected.remove("COLUMN_STATS_ACCURATE");
actual.remove("COLUMN_STATS_ACCURATE");

// Now assert equality without the COLUMN_STATS_ACCURATE key
Assert.assertEquals(expected, actual);
}

private void validatePartPara(String dbName, String tblName, String partName) throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,11 @@ public static void removeColumnStatsState(Map<String, String> params, List<Strin
return;
}
ColumnStatsAccurate stats = parseStatsAcc(params.get(COLUMN_STATS_ACCURATE));
colNames.forEach(colName ->
stats.columnStats.remove(colName.toLowerCase()));
colNames.forEach(colName -> {
if (colName != null) {
stats.columnStats.remove(colName.toLowerCase());
}
});

try {
params.put(COLUMN_STATS_ACCURATE, ColumnStatsAccurate.objectWriter.writeValueAsString(stats));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.hadoop.hive.metastore.ColumnType.VARCHAR_TYPE_NAME;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
Expand All @@ -53,6 +54,8 @@
import javax.jdo.Transaction;
import javax.jdo.datastore.JDOConnection;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -3304,6 +3307,183 @@ public List<Void> run(List<String> input) throws Exception {
return true;
}

/**
a helper function which will firstly get the current COLUMN_STATS_ACCURATE parameter on table level
secondly convert the JSON String into map, and update the information in it, and convert it back to JSON
thirdly update the COLUMN_STATS_ACCURATE parameter with the new value on table level using directSql
*/
public long updateColumnStatsAccurateForTable(Table table, List<String> droppedCols) throws MetaException {
String currentValue = table.getParameters().get(StatsSetupConst.COLUMN_STATS_ACCURATE);
if (currentValue == null) {
return 0;
}

try {
ObjectMapper mapper = new ObjectMapper();

// Deserialize the JSON into a map
Map<String, Object> statsMap = mapper.readValue(currentValue, new TypeReference<Map<String, Object>>() {});

// Get the COLUMN_STATS object if it exists
Object columnStatsObj = statsMap.get(StatsSetupConst.COLUMN_STATS);

if (columnStatsObj instanceof Map) {
Map<String, String> columnStats = (Map<String, String>) columnStatsObj;

boolean removeAll = droppedCols == null || droppedCols.isEmpty() || droppedCols.size() == columnStats.size();

if (removeAll) {
// Remove entire column stats
statsMap.remove(StatsSetupConst.COLUMN_STATS);
} else {
// Remove only the dropped columns
for (String col : droppedCols) {
if (col != null) {
columnStats.remove(col.toLowerCase());
}
}
}
}

// Serialize the map into a new JSON string
String updatedValue = mapper.writeValueAsString(statsMap);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use StatsSetupConst.removeColumnStatsState?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the statsMap is empty, should we drop it instead of update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To update it is the intentional behavior. I made the new JSON String, and I updated the COLUMN_STATS_ACCURATE instead of removing it. It can show that user deleted column stats manually.
And for the readers, they would be also aware that the COLUMN_STATS_ACCURATE is not correct.


// Update the COLUMN_STATS_ACCURATE parameter
return updateTableParam(table, StatsSetupConst.COLUMN_STATS_ACCURATE, currentValue, updatedValue);
} catch (Exception e) {
throw new MetaException("Failed to parse/update COLUMN_STATS_ACCURATE: " + e.getMessage());
}
}



public boolean updateColumnStatsAccurateForPartitions(String catName, String dbName, Table table,
List<String> partNames, List<String> colNames) throws MetaException {
if (partNames == null || partNames.isEmpty()) {
return true;
}

ObjectMapper mapper = new ObjectMapper();

// If colNames is empty, then all the column stats of all columns should be deleted fetch all table column names
List<String> effectiveColNames;
if (colNames == null || colNames.isEmpty()) {
if (table.getSd().getCols() == null) {
effectiveColNames = new ArrayList<>();
} else {
effectiveColNames = table.getSd().getCols().stream()
.map(f -> f.getName().toLowerCase())
.collect(Collectors.toList());
}
} else {
effectiveColNames = colNames.stream().map(String::toLowerCase).collect(Collectors.toList());
}

try {
Batchable.runBatched(batchSize, partNames, new Batchable<String, Void>() {
@Override
public List<Void> run(List<String> input) throws Exception {
// 1. Construct SQL filter for partition names
String sqlFilter = PARTITIONS + ".\"PART_NAME\" in (" + makeParams(input.size()) + ")";

// 2. Fetch PART_IDs of the partitions which are need to be changed
List<Long> partitionIds = getPartitionIdsViaSqlFilter(
catName, dbName, table.getTableName(), sqlFilter, input, Collections.emptyList(), -1);

if (partitionIds.isEmpty()) return null;

// 3. Get current COLUMN_STATS_ACCURATE values
Map<Long, String> partStatsAccurateMap = getColumnStatsAccurateByPartitionIds(partitionIds);

// 4. Iterate each partition to update COLUMN_STATS_ACCURATE
for (Long partId : partitionIds) {
String currentValue = partStatsAccurateMap.get(partId);
if (currentValue == null) continue;

try {
Map<String, Object> statsMap = mapper.readValue(
currentValue, new TypeReference<Map<String, Object>>() {});
Object columnStatsObj = statsMap.get("COLUMN_STATS");

boolean changed = false;
if (columnStatsObj instanceof Map) {
Map<String, String> columnStats = (Map<String, String>) columnStatsObj;
for (String col : effectiveColNames) {
if (columnStats.remove(col) != null) {
changed = true;
}
}

if (columnStats.isEmpty()) {
statsMap.remove("COLUMN_STATS");
changed = true;
}
}

if (!statsMap.containsKey("COLUMN_STATS")) {
if (statsMap.remove("BASIC_STATS") != null) {
changed = true;
}
}

if (changed) {
String updatedValue = mapper.writeValueAsString(statsMap);
updatePartitionParam(partId,
StatsSetupConst.COLUMN_STATS_ACCURATE, currentValue, updatedValue);
}

} catch (Exception e) {
throw new MetaException("Failed to update COLUMN_STATS_ACCURATE for PART_ID " + partId + ": " + e.getMessage());
}
}

return null;
}
});

return true; // All succeeded
} catch (Exception e) {
LOG.warn("Failed to update COLUMN_STATS_ACCURATE for some partitions", e);
return false; // Failed batch
}
}


private Map<Long, String> getColumnStatsAccurateByPartitionIds(List<Long> partIds) throws MetaException {
if (partIds == null || partIds.isEmpty()) {
return Collections.emptyMap();
}

StringBuilder queryText = new StringBuilder();
queryText.append("SELECT \"PART_ID\", \"PARAM_VALUE\" FROM ")
.append(PARTITION_PARAMS)
.append(" WHERE \"PARAM_KEY\" = ? AND \"PART_ID\" IN (")
.append(makeParams(partIds.size()))
.append(")");

// Create params: first COLUMN_STATS_ACCURATE, then all partIds
Object[] params = new Object[1 + partIds.size()];
params[0] = StatsSetupConst.COLUMN_STATS_ACCURATE;
for (int i = 0; i < partIds.size(); i++) {
params[i + 1] = partIds.get(i);
}

try (QueryWrapper query = new QueryWrapper(pm.newQuery("javax.jdo.query.SQL", queryText.toString()))) {
@SuppressWarnings("unchecked")
List<Object> sqlResult = executeWithArray(query.getInnerQuery(), params, queryText.toString());

Map<Long, String> result = new HashMap<>();
for (Object row : sqlResult) {
Object[] fields = (Object[]) row;
Long partId = MetastoreDirectSqlUtils.extractSqlLong(fields[0]);
String value = fields[1] == null ? null : fields[1].toString();
result.put(partId, value);
}

return result;
}
}

public Map<String, Map<String, String>> updatePartitionColumnStatisticsBatch(
Map<String, ColumnStatistics> partColStatsMap,
Table tbl,
Expand Down Expand Up @@ -3424,4 +3604,14 @@ long updateTableParam(Table table, String key, String expectedValue, String newV
Query query = pm.newQuery("javax.jdo.query.SQL", statement);
return (long) query.executeWithArray(newValue, table.getId(), key, expectedValue);
}

long updatePartitionParam(Long partitionID, String key, String expectedValue, String newValue) {
String statement = TxnUtils.createUpdatePreparedStmt(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about update the partitions in batch?

"\"PARTITION_PARAMS\"",
ImmutableList.of("\"PARAM_VALUE\""),
ImmutableList.of("\"PART_ID\"", "\"PARAM_KEY\"", dbType.toVarChar("\"PARAM_VALUE\"")));

Query query = pm.newQuery("javax.jdo.query.SQL", statement);
return (long) query.executeWithArray(newValue, partitionID, key, expectedValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9963,7 +9963,11 @@ protected String describeResult() {
}
@Override
protected Boolean getSqlResult(GetHelper<Boolean> ctx) throws MetaException {
return directSql.deletePartitionColumnStats(catName, dbName, tableName, partNames, colNames, engine);
if (directSql.deletePartitionColumnStats(catName, dbName, tableName, partNames, colNames, engine)){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how if the directSql.deletePartitionColumnStats returns a list of partition ids, so the updateColumnStatsAccurateForPartitions can reuse it

directSql.updateColumnStatsAccurateForPartitions(catName, dbName, getTable(), partNames, colNames);
return true;
}
return false;
}
@Override
protected Boolean getJdoResult(GetHelper<Boolean> ctx)
Expand Down Expand Up @@ -10034,6 +10038,9 @@ public List<Void> run(List<String> input) throws Exception {
} finally {
b.closeAllQueries();
}
// Update COLUMN_STATS_ACCURATE after stats are dropped
Table tbl = getTable(catName, dbName, tableName);
directSql.updateColumnStatsAccurateForPartitions(catName, dbName, tbl, partNames, colNames);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ObjectStore.alterPartitionsViaJdo in jdo

ret = commitTransaction();
} finally {
rollbackAndCleanup(ret, null);
Expand All @@ -10056,7 +10063,18 @@ protected String describeResult() {
}
@Override
protected Boolean getSqlResult(GetHelper<Boolean> ctx) throws MetaException {
return directSql.deleteTableColumnStatistics(getTable().getId(), colNames, engine);
MetaStoreDirectSql d = directSql; // snapshot
if (d == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why could d be null here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged. This is over protection. This whole function would be invoked when directSql is not null.

// Initialize the directSql again in case it became null in-between
String schema = PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema");
schema = org.apache.commons.lang3.StringUtils.defaultIfBlank(schema, null);
d = new MetaStoreDirectSql(pm, conf, schema);
}
if (d.deleteTableColumnStatistics(getTable().getId(), colNames, engine)) {
d.updateColumnStatsAccurateForTable(getTable(), colNames);
return true;
}
return false;
}
@Override
protected Boolean getJdoResult(GetHelper<Boolean> ctx)
Expand Down Expand Up @@ -10089,14 +10107,14 @@ private boolean deleteTableColumnStatisticsViaJdo(String catName, String dbName,
query.setFilter(filter);
query.declareParameters(parameters);
List<Object> params = new ArrayList<>();
params.add(normalizeIdentifier(tableName));
params.add(normalizeIdentifier(dbName));
params.add(normalizeIdentifier(catName));
params.add(tableName == null ? null : normalizeIdentifier(tableName));
Copy link
Member

@dengzhhu653 dengzhhu653 Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why tableName == null is null since we tend to drop the table column statistics, should throw exception in such cace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged.
This change was based on a test failure.

params.add(dbName == null ? null : normalizeIdentifier(dbName));
params.add(catName == null ? null : normalizeIdentifier(catName));
if (colNames != null && !colNames.isEmpty()) {
List<String> normalizedColNames = new ArrayList<>();
for (String colName : colNames){
// trim the extra spaces, and change to lowercase
normalizedColNames.add(normalizeIdentifier(colName));
normalizedColNames.add(colName == null ? null : normalizeIdentifier(colName));
}
params.add(normalizedColNames);
}
Expand All @@ -10111,6 +10129,19 @@ private boolean deleteTableColumnStatisticsViaJdo(String catName, String dbName,
throw new NoSuchObjectException("Column stats doesn't exist for db=" + dbName + " table="
+ tableName + " col=" + String.join(", ", colNames));
}
if (mStatsObjColl != null) {
pm.deletePersistentAll(mStatsObjColl);
// Update COLUMN_STATS_ACCURATE to reflect the deletion
Table tbl = getTable(catName, dbName, tableName);
// Initialize the directSql again in case it became null in-between
String schema = PersistenceManagerProvider.getProperty("javax.jdo.mapping.Schema");
schema = org.apache.commons.lang3.StringUtils.defaultIfBlank(schema, null);
directSql = new MetaStoreDirectSql(pm, conf, schema); // Use the original colNames (can be null or empty)
directSql.updateColumnStatsAccurateForTable(tbl, colNames);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ObjectStore.alterTable in the jdo path?

} else {
throw new NoSuchObjectException("Column stats doesn't exist for db=" + dbName +
" table=" + tableName + " col=" + (colNames != null ? String.join(", ", colNames) : "ALL"));
}
ret = commitTransaction();
} finally {
rollbackAndCleanup(ret, query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1847,6 +1847,11 @@ public void testColumnStatistics() throws Throwable {
List<ColumnStatisticsObj> stats = client.getTableColumnStatistics(
dbName, tblName, Lists.newArrayList(colName[1]), ENGINE);
assertTrue("stats are not empty: " + stats, stats.isEmpty());
// test if all columns are deleted from parameter COLUMN_STATS_ACCURATE
Map<String, String> tableParams = client.getTable(dbName, tblName).getParameters();
String table_column_stats_accurate = tableParams.get("COLUMN_STATS_ACCURATE");
assertTrue("parameter COLUMN_STATS_ACCURATE is not accurate in " + tblName, table_column_stats_accurate == null ||
(!table_column_stats_accurate.contains(colName[0]) && !table_column_stats_accurate.contains(colName[1])));

colStats.setStatsDesc(statsDesc);
colStats.setStatsObj(statsObjs);
Expand All @@ -1864,6 +1869,11 @@ public void testColumnStatistics() throws Throwable {
// multiple columns
request.setCol_names(Arrays.asList(colName));
assertTrue(client.deleteColumnStatistics(request));
// test if the columns in colName array are deleted from parameter COLUMN_STATS_ACCURATE
tableParams = client.getTable(dbName, tblName).getParameters();
table_column_stats_accurate = tableParams.get("COLUMN_STATS_ACCURATE");
assertTrue("parameter COLUMN_STATS_ACCURATE is not accurate in " + tblName, table_column_stats_accurate == null ||
(!table_column_stats_accurate.contains(colName[0]) && !table_column_stats_accurate.contains(colName[1])));
colStats3 = client.getTableColumnStatistics(
dbName, tblName, Lists.newArrayList(colName), ENGINE);
assertTrue("stats are not empty: " + colStats3, colStats3.isEmpty());
Expand Down Expand Up @@ -1959,6 +1969,12 @@ public void testColumnStatistics() throws Throwable {
Lists.newArrayList(partitions.get(0), partitions.get(1), partitions.get(2)), Lists.newArrayList(colName), ENGINE);
assertEquals(1, stats2.size());
assertEquals(2, stats2.get(partitions.get(2)).size());
// test if all columns are deleted from parameter COLUMN_STATS_ACCURATE
Partition partition_0 = client.getPartition(dbName, tblName, partitions.get(0));
Map<String, String> partitionParams = partition_0.getParameters();
String partition_column_stats_accurate = partitionParams.get("COLUMN_STATS_ACCURATE");
assertTrue("parameter COLUMN_STATS_ACCURATE is not accurate in " + partitions.get(0),partition_column_stats_accurate == null ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for drop all columns, the partition_column_stats_accurate == null should be true, do we still need to evaluate on !table_column_stats_accurate.contains(colName[0]) && !table_column_stats_accurate.contains(colName[1]))?

(!table_column_stats_accurate.contains(colName[0]) && !table_column_stats_accurate.contains(colName[1])));

// no partition or column name is set
request.unsetPart_names();
Expand Down