Copilot commented on code in PR #6287: URL: https://github.com/apache/hive/pull/6287#discussion_r2776572009
########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/SetAggrStatsHandler.java: ########## @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.handler; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.hadoop.hive.metastore.model.MTable; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; +import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; + +@SuppressWarnings("unused") +@RequestHandler(requestBody = SetPartitionsStatsRequest.class) +public class SetAggrStatsHandler + extends AbstractRequestHandler<SetPartitionsStatsRequest, SetAggrStatsHandler.SetAggrStatsResult> { + private static final Logger LOG = LoggerFactory.getLogger(SetAggrStatsHandler.class); + private RawStore ms; + private String catName; + private String dbName; + private String tableName; + private Table t; + private boolean needMerge; + private Configuration conf; + + SetAggrStatsHandler(IHMSHandler handler, SetPartitionsStatsRequest request) { + super(handler, false, request); + } + + @Override + protected void beforeExecute() throws TException, IOException { + this.needMerge = request.isSetNeedMerge() && request.isNeedMerge(); + this.conf = handler.getConf(); + this.ms = handler.getMS(); + List<ColumnStatistics> csNews = request.getColStats(); + if (csNews != null && !csNews.isEmpty()) { + ColumnStatistics firstColStats = csNews.getFirst(); + ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc(); + this.catName = normalizeIdentifier(statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf)); + this.dbName = normalizeIdentifier(statsDesc.getDbName()); + this.tableName = normalizeIdentifier(statsDesc.getTableName()); + this.t = ms.getTable(catName, dbName, tableName); + if (statsDesc.isIsTblLevel() && request.getColStatsSize() != 1) { + // there should be only one ColumnStatistics + throw new MetaException( + "Expecting only 1 ColumnStatistics for table's column stats, but find " + request.getColStatsSize()); + } + } + } + + @Override + protected SetAggrStatsResult execute() throws TException, IOException { + boolean ret = true; + List<ColumnStatistics> csNews = request.getColStats(); + if (csNews == null || csNews.isEmpty()) { + return new SetAggrStatsResult(true); + } + // figure out if it is table level or partition level + ColumnStatistics firstColStats = csNews.getFirst(); + ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc(); + List<String> colNames = new ArrayList<>(); + for (ColumnStatisticsObj obj : firstColStats.getStatsObj()) { + colNames.add(obj.getColName()); + } + if (statsDesc.isIsTblLevel()) { + if (needMerge) { + return new SetAggrStatsResult(updateTableColumnStatsWithMerge(colNames)); + } else { + // This is the overwrite case, we do not care about the accuracy. + return new SetAggrStatsResult(updateTableColumnStatsInternal(firstColStats, + request.getValidWriteIdList(), request.getWriteId())); + } + } else { + // partition level column stats merging + // note that we may have two or more duplicate partition names. + // see autoColumnStats_2.q under TestMiniLlapLocalCliDriver + Map<String, ColumnStatistics> newStatsMap = new HashMap<>(); + for (ColumnStatistics csNew : csNews) { + String partName = csNew.getStatsDesc().getPartName(); + if (newStatsMap.containsKey(partName)) { + MetaStoreServerUtils.mergeColStats(csNew, newStatsMap.get(partName)); + } + newStatsMap.put(partName, csNew); + } + + if (needMerge) { + ret = updatePartColumnStatsWithMerge(colNames, newStatsMap); + } else { // No merge. + // We don't short-circuit on errors here anymore. That can leave acid stats invalid. + if (MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.TRY_DIRECT_SQL)) { + ret = updatePartitionColStatsInBatch(newStatsMap, + request.getValidWriteIdList(), request.getWriteId()); + } else { + MTable mTable = ms.ensureGetMTable(catName, dbName, tableName); + for (Map.Entry<String, ColumnStatistics> entry : newStatsMap.entrySet()) { + // We don't short-circuit on errors here anymore. That can leave acid stats invalid. + ret = updatePartitonColStatsInternal(mTable, entry.getValue(), + request.getValidWriteIdList(), request.getWriteId()) && ret; + } + } + } + } + return new SetAggrStatsResult(ret); + } + + private boolean updateTableColumnStatsWithMerge(List<String> colNames) throws MetaException, + NoSuchObjectException, InvalidObjectException, InvalidInputException { + ColumnStatistics firstColStats = request.getColStats().getFirst(); + ms.openTransaction(); + boolean isCommitted = false, result = false; + try { + ColumnStatistics csOld = ms.getTableColumnStatistics(catName, dbName, tableName, colNames, + request.getEngine(), request.getValidWriteIdList()); + // we first use the valid stats list to prune the stats + boolean isInvalidTxnStats = csOld != null + && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant(); + if (isInvalidTxnStats) { + // No columns can be merged; a shortcut for getMergableCols. + firstColStats.setStatsObj(Lists.newArrayList()); + } else { + MetaStoreServerUtils.getMergableCols(firstColStats, t.getParameters()); + // we merge those that can be merged + if (csOld != null && csOld.getStatsObjSize() != 0 && !firstColStats.getStatsObj().isEmpty()) { + MetaStoreServerUtils.mergeColStats(firstColStats, csOld); + } + } + + if (!firstColStats.getStatsObj().isEmpty()) { + result = updateTableColumnStatsInternal(firstColStats, + request.getValidWriteIdList(), request.getWriteId()); + } else if (isInvalidTxnStats) { + // For now because the stats state is such as it is, we will invalidate everything. + // Overall the semantics here are not clear - we could invalidate only some columns, but does + // that make any physical sense? Could query affect some columns but not others? + t.setWriteId(request.getWriteId()); + StatsSetupConst.clearColumnStatsState(t.getParameters()); + StatsSetupConst.setBasicStatsState(t.getParameters(), StatsSetupConst.FALSE); + ms.alterTable(catName, dbName, tableName, t, request.getValidWriteIdList()); + } else { + // TODO: why doesn't the original call for non acid tables invalidate the stats? + LOG.debug("All the column stats are not accurate to merge."); + result = true; + } + + ms.commitTransaction(); + isCommitted = true; Review Comment: This transaction commit result is ignored. RawStore.commitTransaction() returns boolean; if it returns false, isCommitted is still set to true and rollback is skipped, which can leave the transaction open/uncommitted. Capture the boolean return value from commitTransaction() and use it to decide rollback/return status. ```suggestion isCommitted = ms.commitTransaction(); ``` ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/SetAggrStatsHandler.java: ########## @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.handler; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.IHMSHandler; +import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; +import org.apache.hadoop.hive.metastore.RawStore; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.InvalidInputException; +import org.apache.hadoop.hive.metastore.api.InvalidObjectException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.hadoop.hive.metastore.model.MTable; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; +import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; + +@SuppressWarnings("unused") +@RequestHandler(requestBody = SetPartitionsStatsRequest.class) +public class SetAggrStatsHandler + extends AbstractRequestHandler<SetPartitionsStatsRequest, SetAggrStatsHandler.SetAggrStatsResult> { + private static final Logger LOG = LoggerFactory.getLogger(SetAggrStatsHandler.class); + private RawStore ms; + private String catName; + private String dbName; + private String tableName; + private Table t; + private boolean needMerge; + private Configuration conf; + + SetAggrStatsHandler(IHMSHandler handler, SetPartitionsStatsRequest request) { + super(handler, false, request); + } + + @Override + protected void beforeExecute() throws TException, IOException { + this.needMerge = request.isSetNeedMerge() && request.isNeedMerge(); + this.conf = handler.getConf(); + this.ms = handler.getMS(); + List<ColumnStatistics> csNews = request.getColStats(); + if (csNews != null && !csNews.isEmpty()) { + ColumnStatistics firstColStats = csNews.getFirst(); + ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc(); + this.catName = normalizeIdentifier(statsDesc.isSetCatName() ? statsDesc.getCatName() : getDefaultCatalog(conf)); + this.dbName = normalizeIdentifier(statsDesc.getDbName()); + this.tableName = normalizeIdentifier(statsDesc.getTableName()); + this.t = ms.getTable(catName, dbName, tableName); + if (statsDesc.isIsTblLevel() && request.getColStatsSize() != 1) { + // there should be only one ColumnStatistics + throw new MetaException( + "Expecting only 1 ColumnStatistics for table's column stats, but find " + request.getColStatsSize()); + } + } + } + + @Override + protected SetAggrStatsResult execute() throws TException, IOException { + boolean ret = true; + List<ColumnStatistics> csNews = request.getColStats(); + if (csNews == null || csNews.isEmpty()) { + return new SetAggrStatsResult(true); + } + // figure out if it is table level or partition level + ColumnStatistics firstColStats = csNews.getFirst(); + ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc(); + List<String> colNames = new ArrayList<>(); + for (ColumnStatisticsObj obj : firstColStats.getStatsObj()) { + colNames.add(obj.getColName()); + } + if (statsDesc.isIsTblLevel()) { + if (needMerge) { + return new SetAggrStatsResult(updateTableColumnStatsWithMerge(colNames)); + } else { + // This is the overwrite case, we do not care about the accuracy. + return new SetAggrStatsResult(updateTableColumnStatsInternal(firstColStats, + request.getValidWriteIdList(), request.getWriteId())); + } + } else { + // partition level column stats merging + // note that we may have two or more duplicate partition names. + // see autoColumnStats_2.q under TestMiniLlapLocalCliDriver + Map<String, ColumnStatistics> newStatsMap = new HashMap<>(); + for (ColumnStatistics csNew : csNews) { + String partName = csNew.getStatsDesc().getPartName(); + if (newStatsMap.containsKey(partName)) { + MetaStoreServerUtils.mergeColStats(csNew, newStatsMap.get(partName)); + } + newStatsMap.put(partName, csNew); + } + + if (needMerge) { + ret = updatePartColumnStatsWithMerge(colNames, newStatsMap); + } else { // No merge. + // We don't short-circuit on errors here anymore. That can leave acid stats invalid. + if (MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.TRY_DIRECT_SQL)) { + ret = updatePartitionColStatsInBatch(newStatsMap, + request.getValidWriteIdList(), request.getWriteId()); + } else { + MTable mTable = ms.ensureGetMTable(catName, dbName, tableName); + for (Map.Entry<String, ColumnStatistics> entry : newStatsMap.entrySet()) { + // We don't short-circuit on errors here anymore. That can leave acid stats invalid. + ret = updatePartitonColStatsInternal(mTable, entry.getValue(), + request.getValidWriteIdList(), request.getWriteId()) && ret; + } + } + } + } + return new SetAggrStatsResult(ret); + } + + private boolean updateTableColumnStatsWithMerge(List<String> colNames) throws MetaException, + NoSuchObjectException, InvalidObjectException, InvalidInputException { + ColumnStatistics firstColStats = request.getColStats().getFirst(); + ms.openTransaction(); + boolean isCommitted = false, result = false; + try { + ColumnStatistics csOld = ms.getTableColumnStatistics(catName, dbName, tableName, colNames, + request.getEngine(), request.getValidWriteIdList()); + // we first use the valid stats list to prune the stats + boolean isInvalidTxnStats = csOld != null + && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant(); + if (isInvalidTxnStats) { + // No columns can be merged; a shortcut for getMergableCols. + firstColStats.setStatsObj(Lists.newArrayList()); + } else { + MetaStoreServerUtils.getMergableCols(firstColStats, t.getParameters()); + // we merge those that can be merged + if (csOld != null && csOld.getStatsObjSize() != 0 && !firstColStats.getStatsObj().isEmpty()) { + MetaStoreServerUtils.mergeColStats(firstColStats, csOld); + } + } + + if (!firstColStats.getStatsObj().isEmpty()) { + result = updateTableColumnStatsInternal(firstColStats, + request.getValidWriteIdList(), request.getWriteId()); + } else if (isInvalidTxnStats) { + // For now because the stats state is such as it is, we will invalidate everything. + // Overall the semantics here are not clear - we could invalidate only some columns, but does + // that make any physical sense? Could query affect some columns but not others? + t.setWriteId(request.getWriteId()); + StatsSetupConst.clearColumnStatsState(t.getParameters()); + StatsSetupConst.setBasicStatsState(t.getParameters(), StatsSetupConst.FALSE); + ms.alterTable(catName, dbName, tableName, t, request.getValidWriteIdList()); + } else { + // TODO: why doesn't the original call for non acid tables invalidate the stats? + LOG.debug("All the column stats are not accurate to merge."); + result = true; + } + + ms.commitTransaction(); + isCommitted = true; + } finally { + if (!isCommitted) { + ms.rollbackTransaction(); + } + } + return result; + } + + private boolean updateTableColumnStatsInternal(ColumnStatistics colStats, + String validWriteIds, long writeId) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + normalizeColStatsInput(colStats); + + Map<String, String> parameters = null; + ms.openTransaction(); + boolean committed = false; + try { + parameters = ms.updateTableColumnStatistics(colStats, validWriteIds, writeId); + if (parameters != null) { + Table tableObj = ms.getTable(colStats.getStatsDesc().getCatName(), + colStats.getStatsDesc().getDbName(), + colStats.getStatsDesc().getTableName(), validWriteIds); + if (!handler.getTransactionalListeners().isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(), + EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT, + new UpdateTableColumnStatEvent(colStats, tableObj, parameters, + writeId, handler)); + } + if (!handler.getListeners().isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(handler.getListeners(), + EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT, + new UpdateTableColumnStatEvent(colStats, tableObj, parameters, + writeId, handler)); + } + } + committed = ms.commitTransaction(); + } finally { + if (!committed) { + ms.rollbackTransaction(); + } + } + + return parameters != null; + } + + private boolean updatePartColumnStatsWithMerge( + List<String> colNames, Map<String, ColumnStatistics> newStatsMap) + throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { + ms.openTransaction(); + boolean isCommitted = false, result = true; + try { + // a single call to get all column stats for all partitions + List<String> partitionNames = new ArrayList<>(); + partitionNames.addAll(newStatsMap.keySet()); + List<ColumnStatistics> csOlds = ms.getPartitionColumnStatistics(catName, dbName, tableName, + partitionNames, colNames, request.getEngine(), request.getValidWriteIdList()); + if (newStatsMap.values().size() != csOlds.size()) { + // some of the partitions miss stats. + LOG.debug("Some of the partitions miss stats."); + } + Map<String, ColumnStatistics> oldStatsMap = new HashMap<>(); + for (ColumnStatistics csOld : csOlds) { + oldStatsMap.put(csOld.getStatsDesc().getPartName(), csOld); + } + + // another single call to get all the partition objects + List<Partition> partitions = ms.getPartitionsByNames(catName, dbName, tableName, partitionNames); + Map<String, Partition> mapToPart = new HashMap<>(); + for (int index = 0; index < partitionNames.size(); index++) { + mapToPart.put(partitionNames.get(index), partitions.get(index)); + } + + MTable mTable = ms.ensureGetMTable(catName, dbName, tableName); + Map<String, ColumnStatistics> statsMap = new HashMap<>(); + boolean useDirectSql = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.TRY_DIRECT_SQL); + for (Map.Entry<String, ColumnStatistics> entry : newStatsMap.entrySet()) { + ColumnStatistics csNew = entry.getValue(); + ColumnStatistics csOld = oldStatsMap.get(entry.getKey()); + boolean isInvalidTxnStats = csOld != null + && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant(); + Partition part = mapToPart.get(entry.getKey()); + if (isInvalidTxnStats) { + // No columns can be merged; a shortcut for getMergableCols. + csNew.setStatsObj(Lists.newArrayList()); + } else { + // we first use getParameters() to prune the stats + MetaStoreServerUtils.getMergableCols(csNew, part.getParameters()); + // we merge those that can be merged + if (csOld != null && csOld.getStatsObjSize() != 0 && !csNew.getStatsObj().isEmpty()) { + MetaStoreServerUtils.mergeColStats(csNew, csOld); + } + } + + if (!csNew.getStatsObj().isEmpty()) { + // We don't short-circuit on errors here anymore. That can leave acid stats invalid. + if (useDirectSql) { + statsMap.put(csNew.getStatsDesc().getPartName(), csNew); + } else { + result = updatePartitonColStatsInternal(mTable, csNew, + request.getValidWriteIdList(), request.getWriteId()) && result; + } + } else if (isInvalidTxnStats) { + // For now because the stats state is such as it is, we will invalidate everything. + // Overall the semantics here are not clear - we could invalidate only some columns, but does + // that make any physical sense? Could query affect some columns but not others? + part.setWriteId(request.getWriteId()); + StatsSetupConst.clearColumnStatsState(part.getParameters()); + StatsSetupConst.setBasicStatsState(part.getParameters(), StatsSetupConst.FALSE); + ms.alterPartition(catName, dbName, tableName, part.getValues(), part, + request.getValidWriteIdList()); + result = false; + } else { + // TODO: why doesn't the original call for non acid tables invalidate the stats? + LOG.debug("All the column stats " + csNew.getStatsDesc().getPartName() + + " are not accurate to merge."); + } + } + ms.commitTransaction(); + isCommitted = true; + // updatePartitionColStatsInBatch starts/commit transaction internally. As there is no write or select for update + // operations is done in this transaction, it is safe to commit it before calling updatePartitionColStatsInBatch. + if (!statsMap.isEmpty()) { Review Comment: This transaction commit result is ignored. RawStore.commitTransaction() returns boolean; if it returns false, isCommitted is still set to true and rollback is skipped, which can leave the transaction open/uncommitted. Capture the boolean return value from commitTransaction() and use it to decide rollback/return status. ```suggestion boolean commitSucceeded = ms.commitTransaction(); isCommitted = commitSucceeded; // updatePartitionColStatsInBatch starts/commit transaction internally. As there is no write or select for update // operations is done in this transaction, it is safe to commit it before calling updatePartitionColStatsInBatch. if (!statsMap.isEmpty() && commitSucceeded) { ``` ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AddPartitionsHandler.java: ########## @@ -186,7 +187,12 @@ protected AddPartitionsResult execute() throws TException, IOException { new long[0], new BitSet(), writeId); validWriteIds = validWriteIdList.toString(); } - ((HMSHandler)handler).updatePartitonColStatsInternal(table, null, partColStats, validWriteIds, writeId); + SetPartitionsStatsRequest setPartitionsStatsRequest = + new SetPartitionsStatsRequest(Arrays.asList(partColStats)); + setPartitionsStatsRequest.setWriteId(writeId); + setPartitionsStatsRequest.setValidWriteIdList(validWriteIds); + setPartitionsStatsRequest.setNeedMerge(false); + handler.update_partition_column_statistics_req(setPartitionsStatsRequest); } Review Comment: Partition column stats are updated by calling back into HMSHandler (update_partition_column_statistics_req) while the enclosing AddPartitionsHandler transaction is still open. Because AbstractRequestHandler runs handlers in a separate executor thread, this stats update will use a different thread-local RawStore/transaction and may not be able to see the uncommitted partitions just added, leading to failures or non-atomic updates. Keep the stats update in the same thread/RawStore transaction (e.g., update via the current RawStore directly, or refactor the shared logic into a synchronous helper) or move the stats update after the partitions transaction commits if atomicity is not required. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java: ########## @@ -918,11 +910,12 @@ public void create_catalog(CreateCatalogRequest rqst) ms.createCatalog(catalog); // Create a default database inside the catalog - Database db = new Database(DEFAULT_DATABASE_NAME, - "Default database for catalog " + catalog.getName(), catalog.getLocationUri(), - Collections.emptyMap()); - db.setCatalogName(catalog.getName()); - create_database_core(ms, db); + CreateDatabaseRequest cdr = new CreateDatabaseRequest(DEFAULT_DATABASE_NAME); + cdr.setCatalogName(catalog.getName()); + cdr.setLocationUri(catalog.getLocationUri()); + cdr.setParameters(Collections.emptyMap()); + cdr.setDescription("Default database for catalog " + catalog.getName()); + AbstractRequestHandler.offer(this, cdr).getResult(); Review Comment: create_catalog opens a RawStore transaction (ms.openTransaction()) and then creates the default DB via AbstractRequestHandler.offer(this, cdr).getResult(). Since handlers execute in a separate executor thread and RawStore is thread-local, the DB creation will run in a different RawStore/transaction that cannot see the uncommitted catalog and is not part of the surrounding transaction, breaking atomicity and potentially failing with NoSuchObjectException or leaving inconsistent catalog/db state on rollback. Create the default DB using the same RawStore/transaction (synchronous helper that accepts the current RawStore) or commit the catalog first and handle cleanup/rollback semantics explicitly. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractRequestHandler.java: ########## @@ -321,18 +319,20 @@ protected void afterExecute(A result) throws TException, IOException { } /** - * Get the prefix for logging the message on polling the operation status. + * Get the prefix for logging the message on polling the handler's status. * * @return message prefix */ protected abstract String getMessagePrefix(); /** - * Get the message about the operation progress. + * Get the handler's progress that will show at the client. * * @return the progress */ - protected abstract String getRequestProgress(); + protected String getRequestProgress() { + return "Done"; Review Comment: getRequestProgress() is used to build the "In-progress" status message, but the new default implementation returns "Done", which is misleading while a handler is still running (especially for async handlers that no longer override it). Consider changing the default to something like "In progress"/"Running" or making this method abstract again for async-capable handlers. ```suggestion return "In progress"; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
