Copilot commented on code in PR #6287:
URL: https://github.com/apache/hive/pull/6287#discussion_r2774846480


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/CreateDatabaseHandler.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.HMSHandler;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Catalog;
+import org.apache.hadoop.hive.metastore.api.CreateDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.PreCreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+@SuppressWarnings("unused")
+@RequestHandler(requestBody = CreateDatabaseRequest.class)
+public class CreateDatabaseHandler
+    extends AbstractRequestHandler<CreateDatabaseRequest, 
CreateDatabaseHandler.CreateDatabaseResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CreateDatabaseHandler.class);
+  private RawStore ms;
+  private Warehouse wh;
+  private Database db;
+  private boolean skipAuthorization;
+  private String name;
+
+  CreateDatabaseHandler(IHMSHandler handler, CreateDatabaseRequest request) {
+    super(handler, false, request);
+  }
+
+  @Override
+  protected CreateDatabaseResult execute() throws TException, IOException {
+    boolean success = false;
+    boolean madeManagedDir = false;
+    boolean madeExternalDir = false;
+    boolean isReplicated = isDbReplicationTarget(db);
+    Map<String, String> transactionalListenersResponses = 
Collections.emptyMap();
+    Path dbExtPath = new Path(db.getLocationUri());
+    Path dbMgdPath = db.getManagedLocationUri() != null ? new 
Path(db.getManagedLocationUri()) : null;
+    boolean isInTest = MetastoreConf.getBoolVar(handler.getConf(), 
HIVE_IN_TEST);
+    try {
+      Database authDb = new Database(db);
+      if (skipAuthorization) {
+        // @TODO could it move to authorization lawyer?
+        //null out to skip authorizer URI check
+        authDb.setManagedLocationUri(null);
+        authDb.setLocationUri(null);
+      }
+
+      ((HMSHandler) handler).firePreEvent(new PreCreateDatabaseEvent(authDb, 
handler));
+      if (db.getCatalogName() != null && 
!db.getCatalogName().equals(Warehouse.DEFAULT_CATALOG_NAME)) {
+        if (!wh.isDir(dbExtPath)) {
+          LOG.debug("Creating database path {}", dbExtPath);
+          if (!wh.mkdirs(dbExtPath)) {
+            throw new MetaException("Unable to create database path " + 
dbExtPath +
+                ", failed to create database " + db.getName());
+          }
+          madeExternalDir = true;
+        }
+      } else {
+        if (dbMgdPath != null) {
+          try {
+            // Since this may be done as random user (if doAs=true) he may not 
have access
+            // to the managed directory. We run this as an admin user
+            madeManagedDir = 
UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<Boolean>) 
() -> {
+              if (!wh.isDir(dbMgdPath)) {
+                LOG.info("Creating database path in managed directory {}", 
dbMgdPath);
+                if (!wh.mkdirs(dbMgdPath)) {
+                  throw new MetaException("Unable to create database managed 
path " + dbMgdPath +
+                      ", failed to create database " + db.getName());
+                }
+                return true;
+              }
+              return false;
+            });
+            if (madeManagedDir) {
+              LOG.info("Created database path in managed directory {}", 
dbMgdPath);
+            } else if (!isInTest || !isDbReplicationTarget(db)) { // Hive 
replication tests doesn't drop the db after each test
+              throw new MetaException("Unable to create database managed 
directory " + dbMgdPath +
+                  ", failed to create database " + db.getName());
+            }
+          } catch (IOException | InterruptedException e) {
+            throw new MetaException(
+                "Unable to create database managed directory " + dbMgdPath + 
", failed to create database " +
+                    db.getName() + ":" + e.getMessage());
+          }
+        }
+        try {
+          madeExternalDir = 
UserGroupInformation.getCurrentUser().doAs((PrivilegedExceptionAction<Boolean>) 
() -> {
+            if (!wh.isDir(dbExtPath)) {
+              LOG.info("Creating database path in external directory {}", 
dbExtPath);
+              return wh.mkdirs(dbExtPath);
+            }
+            return false;
+          });
+          if (madeExternalDir) {
+            LOG.info("Created database path in external directory {}", 
dbExtPath);
+          } else {
+            LOG.warn(
+                "Failed to create external path {} for database {}. " +
+                    "This may result in access not being allowed if the 
StorageBasedAuthorizationProvider is enabled",
+                dbExtPath, db.getName());
+          }
+        } catch (IOException | InterruptedException | 
UndeclaredThrowableException e) {
+          throw new MetaException("Failed to create external path " + 
dbExtPath + " for database " + db.getName() +
+                  ". This may result in access not being allowed if the " +
+              "StorageBasedAuthorizationProvider is enabled: " + 
e.getMessage());
+        }
+      }
+
+      ms.openTransaction();
+      ms.createDatabase(db);
+
+      if (!handler.getTransactionalListeners().isEmpty()) {
+        transactionalListenersResponses =
+            
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+                EventMessage.EventType.CREATE_DATABASE,
+                new CreateDatabaseEvent(db, true, handler, isReplicated));
+      }
+
+      success = ms.commitTransaction();
+    } finally {
+      if (!success) {
+        ms.rollbackTransaction();
+        if (db.getCatalogName() != null && 
!db.getCatalogName().equals(Warehouse.DEFAULT_CATALOG_NAME)) {
+          if (madeManagedDir && dbMgdPath != null) {
+            wh.deleteDir(dbMgdPath, true, db);
+          }
+        } else {
+          if (madeManagedDir && dbMgdPath != null) {
+            try {
+              
UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction<Void>) () 
-> {
+                wh.deleteDir(dbMgdPath, true, db);
+                return null;
+              });
+            } catch (IOException | InterruptedException e) {
+              LOG.error("Couldn't delete managed directory {} after it was 
created for database {} {}",
+                  dbMgdPath, db.getName(), e.getMessage());
+            }
+          }
+
+          if (madeExternalDir) {
+            try {
+              
UserGroupInformation.getCurrentUser().doAs((PrivilegedExceptionAction<Void>) () 
-> {
+                wh.deleteDir(dbExtPath, true, db);
+                return null;
+              });
+            } catch (IOException | InterruptedException e) {
+              LOG.error("Couldn't delete external directory {} after it was 
created for database {} {}",
+                  dbExtPath, db.getName(), e.getMessage());
+            }
+          }
+        }
+      }
+    }
+    return new CreateDatabaseResult(success, transactionalListenersResponses);
+  }
+
+  @Override
+  protected void beforeExecute() throws TException, IOException {
+    this.name = request.getDatabaseName();
+    if (!MetaStoreUtils.validateName(name, handler.getConf())) {
+      throw new InvalidObjectException(name + " is not a valid database name");
+    }
+    this.ms = handler.getMS();
+    String catalogName =
+        request.isSetCatalogName() ? request.getCatalogName() : 
getDefaultCatalog(handler.getConf());
+    Catalog cat;
+    try {
+      cat = ms.getCatalog(catalogName);
+    } catch (NoSuchObjectException e) {
+      LOG.error("No such catalog {}", catalogName);
+      throw new InvalidObjectException("No such catalog " + catalogName);
+    }
+
+    db = new Database(name, request.getDescription(), 
request.getLocationUri(), request.getParameters());
+    db.setPrivileges(request.getPrivileges());
+    db.setOwnerName(request.getOwnerName());
+    db.setOwnerType(request.getOwnerType());
+    db.setCatalogName(catalogName);
+    db.setCreateTime((int)(System.currentTimeMillis() / 1000));

Review Comment:
   Potential behavioral change: The old code in HMSHandler checked if 
createDatabaseRequest.isSetCreateTime() and used that value if set. The new 
CreateDatabaseHandler always sets createTime to System.currentTimeMillis() / 
1000, ignoring any createTime value in the request. This could break use cases 
where the caller wants to set a specific creation time (e.g., during 
replication). Consider adding: if (request.isSetCreateTime()) { 
db.setCreateTime(request.getCreateTime()); } else { 
db.setCreateTime((int)(System.currentTimeMillis() / 1000)); }
   ```suggestion
       if (request.isSetCreateTime()) {
         db.setCreateTime(request.getCreateTime());
       } else {
         db.setCreateTime((int)(System.currentTimeMillis() / 1000));
       }
   ```



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/SetAggrStatsHandler.java:
##########
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.model.MTable;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+@SuppressWarnings("unused")
+@RequestHandler(requestBody = SetPartitionsStatsRequest.class)
+public class SetAggrStatsHandler
+    extends AbstractRequestHandler<SetPartitionsStatsRequest, 
SetAggrStatsHandler.SetAggrStatsResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SetAggrStatsHandler.class);
+  private RawStore ms;
+  private String catName;
+  private String dbName;
+  private String tableName;
+  private Table t;
+  private boolean needMerge;
+  private Configuration conf;
+
+  SetAggrStatsHandler(IHMSHandler handler, SetPartitionsStatsRequest request) {
+    super(handler, false, request);
+  }
+
+  @Override
+  protected void beforeExecute() throws TException, IOException {
+    this.needMerge = request.isSetNeedMerge() && request.isNeedMerge();
+    this.conf = handler.getConf();
+    this.ms = handler.getMS();
+    List<ColumnStatistics> csNews = request.getColStats();
+    if (csNews != null && !csNews.isEmpty()) {
+      ColumnStatistics firstColStats = csNews.get(0);
+      ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+      this.catName = normalizeIdentifier(statsDesc.isSetCatName() ? 
statsDesc.getCatName() : getDefaultCatalog(conf));
+      this.dbName = normalizeIdentifier(statsDesc.getDbName());
+      this.tableName = normalizeIdentifier(statsDesc.getTableName());
+      this.t = ms.getTable(catName, dbName, tableName);
+      if (statsDesc.isIsTblLevel() && request.getColStatsSize() != 1) {
+        // there should be only one ColumnStatistics
+        throw new MetaException(
+            "Expecting only 1 ColumnStatistics for table's column stats, but 
find " + request.getColStatsSize());
+      }
+    }
+  }
+
+  @Override
+  protected SetAggrStatsResult execute() throws TException, IOException {
+    boolean ret = true;
+    List<ColumnStatistics> csNews = request.getColStats();
+    if (csNews == null || csNews.isEmpty()) {
+      return new SetAggrStatsResult(true);
+    }
+    // figure out if it is table level or partition level
+    ColumnStatistics firstColStats = csNews.get(0);
+    ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+    List<String> colNames = new ArrayList<>();
+    for (ColumnStatisticsObj obj : firstColStats.getStatsObj()) {
+      colNames.add(obj.getColName());
+    }
+    if (statsDesc.isIsTblLevel()) {
+      if (needMerge) {
+        return new 
SetAggrStatsResult(updateTableColumnStatsWithMerge(colNames));
+      } else {
+        // This is the overwrite case, we do not care about the accuracy.
+        return new 
SetAggrStatsResult(updateTableColumnStatsInternal(firstColStats,
+            request.getValidWriteIdList(), request.getWriteId()));
+      }
+    } else {
+      // partition level column stats merging
+      // note that we may have two or more duplicate partition names.
+      // see autoColumnStats_2.q under TestMiniLlapLocalCliDriver
+      Map<String, ColumnStatistics> newStatsMap = new HashMap<>();
+      for (ColumnStatistics csNew : csNews) {
+        String partName = csNew.getStatsDesc().getPartName();
+        if (newStatsMap.containsKey(partName)) {
+          MetaStoreServerUtils.mergeColStats(csNew, newStatsMap.get(partName));
+        }
+        newStatsMap.put(partName, csNew);
+      }
+
+      if (needMerge) {
+        ret = updatePartColumnStatsWithMerge(colNames, newStatsMap);
+      } else { // No merge.
+        // We don't short-circuit on errors here anymore. That can leave acid 
stats invalid.
+        if (MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.TRY_DIRECT_SQL)) {
+          ret = updatePartitionColStatsInBatch(newStatsMap,
+              request.getValidWriteIdList(), request.getWriteId());
+        } else {
+          MTable mTable = ms.ensureGetMTable(catName, dbName, tableName);
+          for (Map.Entry<String, ColumnStatistics> entry : 
newStatsMap.entrySet()) {
+            // We don't short-circuit on errors here anymore. That can leave 
acid stats invalid.
+            ret = updatePartitonColStatsInternal(mTable, entry.getValue(),
+                request.getValidWriteIdList(), request.getWriteId()) && ret;
+          }
+        }
+      }
+    }
+    return new SetAggrStatsResult(ret);
+  }
+
+  private boolean updateTableColumnStatsWithMerge(List<String> colNames) 
throws MetaException,
+      NoSuchObjectException, InvalidObjectException, InvalidInputException {
+    ColumnStatistics firstColStats = request.getColStats().get(0);
+    ms.openTransaction();
+    boolean isCommitted = false, result = false;
+    try {
+      ColumnStatistics csOld = ms.getTableColumnStatistics(catName, dbName, 
tableName, colNames,
+          request.getEngine(), request.getValidWriteIdList());
+      // we first use the valid stats list to prune the stats
+      boolean isInvalidTxnStats = csOld != null
+          && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
+      if (isInvalidTxnStats) {
+        // No columns can be merged; a shortcut for getMergableCols.
+        firstColStats.setStatsObj(Lists.newArrayList());
+      } else {
+        MetaStoreServerUtils.getMergableCols(firstColStats, t.getParameters());
+        // we merge those that can be merged
+        if (csOld != null && csOld.getStatsObjSize() != 0 && 
!firstColStats.getStatsObj().isEmpty()) {
+          MetaStoreServerUtils.mergeColStats(firstColStats, csOld);
+        }
+      }
+
+      if (!firstColStats.getStatsObj().isEmpty()) {
+        result = updateTableColumnStatsInternal(firstColStats,
+            request.getValidWriteIdList(), request.getWriteId());
+      } else if (isInvalidTxnStats) {
+        // For now because the stats state is such as it is, we will 
invalidate everything.
+        // Overall the sematics here are not clear - we could invalide only 
some columns, but does

Review Comment:
   Spelling error in comment: "invalide" should be "invalidate"
   ```suggestion
           // Overall the semantics here are not clear - we could invalidate 
only some columns, but does
   ```



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java:
##########
@@ -918,11 +910,12 @@ public void create_catalog(CreateCatalogRequest rqst)
         ms.createCatalog(catalog);
 
         // Create a default database inside the catalog
-        Database db = new Database(DEFAULT_DATABASE_NAME,
-            "Default database for catalog " + catalog.getName(), 
catalog.getLocationUri(),
-            Collections.emptyMap());
-        db.setCatalogName(catalog.getName());
-        create_database_core(ms, db);
+        CreateDatabaseRequest cdr = new 
CreateDatabaseRequest(DEFAULT_DATABASE_NAME);
+        cdr.setCatalogName(catalog.getName());
+        cdr.setLocationUri(catalog.getLocationUri());
+        cdr.setParameters(Collections.emptyMap());
+        cdr.setDescription("Default database for catalog " + 
catalog.getName());
+        success |= AbstractRequestHandler.offer(this, cdr).success();

Review Comment:
   The use of `success |= AbstractRequestHandler.offer(this, cdr).success()` is 
problematic because the result is immediately overwritten by `success = 
ms.commitTransaction()` on line 927. Either this line should use `=` instead of 
`|=`, or the result should be checked and potentially cause an exception if the 
database creation fails. Consider using `if 
(!AbstractRequestHandler.offer(this, cdr).success()) { throw new 
MetaException("Failed to create default database"); }` or simply 
`AbstractRequestHandler.offer(this, cdr).success();` without assignment if the 
result doesn't need to be checked.
   ```suggestion
           AbstractRequestHandler.offer(this, cdr).success();
   ```



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/SetAggrStatsHandler.java:
##########
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.model.MTable;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+@SuppressWarnings("unused")
+@RequestHandler(requestBody = SetPartitionsStatsRequest.class)
+public class SetAggrStatsHandler
+    extends AbstractRequestHandler<SetPartitionsStatsRequest, 
SetAggrStatsHandler.SetAggrStatsResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SetAggrStatsHandler.class);
+  private RawStore ms;
+  private String catName;
+  private String dbName;
+  private String tableName;
+  private Table t;
+  private boolean needMerge;
+  private Configuration conf;
+
+  SetAggrStatsHandler(IHMSHandler handler, SetPartitionsStatsRequest request) {
+    super(handler, false, request);
+  }
+
+  @Override
+  protected void beforeExecute() throws TException, IOException {
+    this.needMerge = request.isSetNeedMerge() && request.isNeedMerge();
+    this.conf = handler.getConf();
+    this.ms = handler.getMS();
+    List<ColumnStatistics> csNews = request.getColStats();
+    if (csNews != null && !csNews.isEmpty()) {
+      ColumnStatistics firstColStats = csNews.get(0);
+      ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+      this.catName = normalizeIdentifier(statsDesc.isSetCatName() ? 
statsDesc.getCatName() : getDefaultCatalog(conf));
+      this.dbName = normalizeIdentifier(statsDesc.getDbName());
+      this.tableName = normalizeIdentifier(statsDesc.getTableName());
+      this.t = ms.getTable(catName, dbName, tableName);
+      if (statsDesc.isIsTblLevel() && request.getColStatsSize() != 1) {
+        // there should be only one ColumnStatistics
+        throw new MetaException(
+            "Expecting only 1 ColumnStatistics for table's column stats, but 
find " + request.getColStatsSize());
+      }
+    }
+  }
+
+  @Override
+  protected SetAggrStatsResult execute() throws TException, IOException {
+    boolean ret = true;
+    List<ColumnStatistics> csNews = request.getColStats();
+    if (csNews == null || csNews.isEmpty()) {
+      return new SetAggrStatsResult(true);
+    }
+    // figure out if it is table level or partition level
+    ColumnStatistics firstColStats = csNews.get(0);
+    ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+    List<String> colNames = new ArrayList<>();
+    for (ColumnStatisticsObj obj : firstColStats.getStatsObj()) {
+      colNames.add(obj.getColName());
+    }
+    if (statsDesc.isIsTblLevel()) {
+      if (needMerge) {
+        return new 
SetAggrStatsResult(updateTableColumnStatsWithMerge(colNames));
+      } else {
+        // This is the overwrite case, we do not care about the accuracy.
+        return new 
SetAggrStatsResult(updateTableColumnStatsInternal(firstColStats,
+            request.getValidWriteIdList(), request.getWriteId()));
+      }
+    } else {
+      // partition level column stats merging
+      // note that we may have two or more duplicate partition names.
+      // see autoColumnStats_2.q under TestMiniLlapLocalCliDriver
+      Map<String, ColumnStatistics> newStatsMap = new HashMap<>();
+      for (ColumnStatistics csNew : csNews) {
+        String partName = csNew.getStatsDesc().getPartName();
+        if (newStatsMap.containsKey(partName)) {
+          MetaStoreServerUtils.mergeColStats(csNew, newStatsMap.get(partName));
+        }
+        newStatsMap.put(partName, csNew);
+      }
+
+      if (needMerge) {
+        ret = updatePartColumnStatsWithMerge(colNames, newStatsMap);
+      } else { // No merge.
+        // We don't short-circuit on errors here anymore. That can leave acid 
stats invalid.
+        if (MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.TRY_DIRECT_SQL)) {
+          ret = updatePartitionColStatsInBatch(newStatsMap,
+              request.getValidWriteIdList(), request.getWriteId());
+        } else {
+          MTable mTable = ms.ensureGetMTable(catName, dbName, tableName);
+          for (Map.Entry<String, ColumnStatistics> entry : 
newStatsMap.entrySet()) {
+            // We don't short-circuit on errors here anymore. That can leave 
acid stats invalid.
+            ret = updatePartitonColStatsInternal(mTable, entry.getValue(),
+                request.getValidWriteIdList(), request.getWriteId()) && ret;
+          }
+        }
+      }
+    }
+    return new SetAggrStatsResult(ret);
+  }
+
+  private boolean updateTableColumnStatsWithMerge(List<String> colNames) 
throws MetaException,
+      NoSuchObjectException, InvalidObjectException, InvalidInputException {
+    ColumnStatistics firstColStats = request.getColStats().get(0);
+    ms.openTransaction();
+    boolean isCommitted = false, result = false;
+    try {
+      ColumnStatistics csOld = ms.getTableColumnStatistics(catName, dbName, 
tableName, colNames,
+          request.getEngine(), request.getValidWriteIdList());
+      // we first use the valid stats list to prune the stats
+      boolean isInvalidTxnStats = csOld != null
+          && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
+      if (isInvalidTxnStats) {
+        // No columns can be merged; a shortcut for getMergableCols.
+        firstColStats.setStatsObj(Lists.newArrayList());
+      } else {
+        MetaStoreServerUtils.getMergableCols(firstColStats, t.getParameters());
+        // we merge those that can be merged
+        if (csOld != null && csOld.getStatsObjSize() != 0 && 
!firstColStats.getStatsObj().isEmpty()) {
+          MetaStoreServerUtils.mergeColStats(firstColStats, csOld);
+        }
+      }
+
+      if (!firstColStats.getStatsObj().isEmpty()) {
+        result = updateTableColumnStatsInternal(firstColStats,
+            request.getValidWriteIdList(), request.getWriteId());
+      } else if (isInvalidTxnStats) {
+        // For now because the stats state is such as it is, we will 
invalidate everything.
+        // Overall the sematics here are not clear - we could invalide only 
some columns, but does
+        // that make any physical sense? Could query affect some columns but 
not others?
+        t.setWriteId(request.getWriteId());
+        StatsSetupConst.clearColumnStatsState(t.getParameters());
+        StatsSetupConst.setBasicStatsState(t.getParameters(), 
StatsSetupConst.FALSE);
+        ms.alterTable(catName, dbName, tableName, t, 
request.getValidWriteIdList());
+      } else {
+        // TODO: why doesn't the original call for non acid tables invalidate 
the stats?
+        LOG.debug("All the column stats are not accurate to merge.");
+        result = true;
+      }
+
+      ms.commitTransaction();
+      isCommitted = true;
+    } finally {
+      if (!isCommitted) {
+        ms.rollbackTransaction();
+      }
+    }
+    return result;
+  }
+
+  private boolean updateTableColumnStatsInternal(ColumnStatistics colStats,
+      String validWriteIds, long writeId)
+      throws NoSuchObjectException, MetaException, InvalidObjectException, 
InvalidInputException {
+    normalizeColStatsInput(colStats);
+
+    Map<String, String> parameters = null;
+    ms.openTransaction();
+    boolean committed = false;
+    try {
+      parameters = ms.updateTableColumnStatistics(colStats, validWriteIds, 
writeId);
+      if (parameters != null) {
+        Table tableObj = ms.getTable(colStats.getStatsDesc().getCatName(),
+            colStats.getStatsDesc().getDbName(),
+            colStats.getStatsDesc().getTableName(), validWriteIds);
+        if (!handler.getTransactionalListeners().isEmpty()) {
+          
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+              EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
+              new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
+                  writeId, handler));
+        }
+        if (!handler.getListeners().isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
+              EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
+              new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
+                  writeId, handler));
+        }
+      }
+      committed = ms.commitTransaction();
+    } finally {
+      if (!committed) {
+        ms.rollbackTransaction();
+      }
+    }
+
+    return parameters != null;
+  }
+
+  private boolean updatePartColumnStatsWithMerge(
+      List<String> colNames, Map<String, ColumnStatistics> newStatsMap)
+      throws MetaException, NoSuchObjectException, InvalidObjectException, 
InvalidInputException {
+    ms.openTransaction();
+    boolean isCommitted = false, result = true;
+    try {
+      // a single call to get all column stats for all partitions
+      List<String> partitionNames = new ArrayList<>();
+      partitionNames.addAll(newStatsMap.keySet());
+      List<ColumnStatistics> csOlds = ms.getPartitionColumnStatistics(catName, 
dbName, tableName,
+          partitionNames, colNames, request.getEngine(), 
request.getValidWriteIdList());
+      if (newStatsMap.values().size() != csOlds.size()) {
+        // some of the partitions miss stats.
+        LOG.debug("Some of the partitions miss stats.");
+      }
+      Map<String, ColumnStatistics> oldStatsMap = new HashMap<>();
+      for (ColumnStatistics csOld : csOlds) {
+        oldStatsMap.put(csOld.getStatsDesc().getPartName(), csOld);
+      }
+
+      // another single call to get all the partition objects
+      List<Partition> partitions = ms.getPartitionsByNames(catName, dbName, 
tableName, partitionNames);
+      Map<String, Partition> mapToPart = new HashMap<>();
+      for (int index = 0; index < partitionNames.size(); index++) {
+        mapToPart.put(partitionNames.get(index), partitions.get(index));
+      }
+
+      MTable mTable = ms.ensureGetMTable(catName, dbName, tableName);
+      Map<String, ColumnStatistics> statsMap =  new HashMap<>();
+      boolean useDirectSql = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.TRY_DIRECT_SQL);
+      for (Map.Entry<String, ColumnStatistics> entry : newStatsMap.entrySet()) 
{
+        ColumnStatistics csNew = entry.getValue();
+        ColumnStatistics csOld = oldStatsMap.get(entry.getKey());
+        boolean isInvalidTxnStats = csOld != null
+            && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
+        Partition part = mapToPart.get(entry.getKey());
+        if (isInvalidTxnStats) {
+          // No columns can be merged; a shortcut for getMergableCols.
+          csNew.setStatsObj(Lists.newArrayList());
+        } else {
+          // we first use getParameters() to prune the stats
+          MetaStoreServerUtils.getMergableCols(csNew, part.getParameters());
+          // we merge those that can be merged
+          if (csOld != null && csOld.getStatsObjSize() != 0 && 
!csNew.getStatsObj().isEmpty()) {
+            MetaStoreServerUtils.mergeColStats(csNew, csOld);
+          }
+        }
+
+        if (!csNew.getStatsObj().isEmpty()) {
+          // We don't short-circuit on errors here anymore. That can leave 
acid stats invalid.
+          if (useDirectSql) {
+            statsMap.put(csNew.getStatsDesc().getPartName(), csNew);
+          } else {
+            result = updatePartitonColStatsInternal(mTable, csNew,
+                request.getValidWriteIdList(), request.getWriteId()) && result;
+          }
+        } else if (isInvalidTxnStats) {
+          // For now because the stats state is such as it is, we will 
invalidate everything.
+          // Overall the sematics here are not clear - we could invalide only 
some columns, but does

Review Comment:
   Spelling error in comment: "invalide" should be "invalidate"
   ```suggestion
             // Overall the semantics here are not clear - we could invalidate 
only some columns, but does
   ```



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java:
##########
@@ -949,9 +942,10 @@ public void create_catalog(CreateCatalogRequest rqst)
         }
       }
       success = true;
-    } catch (AlreadyExistsException|InvalidObjectException|MetaException e) {
-      ex = e;
-      throw e;
+    } catch (Exception e) {

Review Comment:
   Inconsistent exception handling: The exception caught at line 945 should be 
assigned to `ex` before being re-thrown, similar to how it's done in other 
methods like create_database_req (line 1173). Add `ex = e;` before the throw 
statement so that endFunction() at line 950 receives the actual exception for 
proper logging.
   ```suggestion
       } catch (Exception e) {
         ex = e;
   ```



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/SetAggrStatsHandler.java:
##########
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.model.MTable;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+@SuppressWarnings("unused")
+@RequestHandler(requestBody = SetPartitionsStatsRequest.class)
+public class SetAggrStatsHandler
+    extends AbstractRequestHandler<SetPartitionsStatsRequest, 
SetAggrStatsHandler.SetAggrStatsResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SetAggrStatsHandler.class);
+  private RawStore ms;
+  private String catName;
+  private String dbName;
+  private String tableName;
+  private Table t;
+  private boolean needMerge;
+  private Configuration conf;
+
+  SetAggrStatsHandler(IHMSHandler handler, SetPartitionsStatsRequest request) {
+    super(handler, false, request);
+  }
+
+  @Override
+  protected void beforeExecute() throws TException, IOException {
+    this.needMerge = request.isSetNeedMerge() && request.isNeedMerge();
+    this.conf = handler.getConf();
+    this.ms = handler.getMS();
+    List<ColumnStatistics> csNews = request.getColStats();
+    if (csNews != null && !csNews.isEmpty()) {
+      ColumnStatistics firstColStats = csNews.get(0);
+      ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+      this.catName = normalizeIdentifier(statsDesc.isSetCatName() ? 
statsDesc.getCatName() : getDefaultCatalog(conf));
+      this.dbName = normalizeIdentifier(statsDesc.getDbName());
+      this.tableName = normalizeIdentifier(statsDesc.getTableName());
+      this.t = ms.getTable(catName, dbName, tableName);
+      if (statsDesc.isIsTblLevel() && request.getColStatsSize() != 1) {
+        // there should be only one ColumnStatistics
+        throw new MetaException(
+            "Expecting only 1 ColumnStatistics for table's column stats, but 
find " + request.getColStatsSize());
+      }
+    }
+  }
+
+  @Override
+  protected SetAggrStatsResult execute() throws TException, IOException {
+    boolean ret = true;
+    List<ColumnStatistics> csNews = request.getColStats();
+    if (csNews == null || csNews.isEmpty()) {
+      return new SetAggrStatsResult(true);
+    }
+    // figure out if it is table level or partition level
+    ColumnStatistics firstColStats = csNews.get(0);
+    ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+    List<String> colNames = new ArrayList<>();
+    for (ColumnStatisticsObj obj : firstColStats.getStatsObj()) {
+      colNames.add(obj.getColName());
+    }
+    if (statsDesc.isIsTblLevel()) {
+      if (needMerge) {
+        return new 
SetAggrStatsResult(updateTableColumnStatsWithMerge(colNames));
+      } else {
+        // This is the overwrite case, we do not care about the accuracy.
+        return new 
SetAggrStatsResult(updateTableColumnStatsInternal(firstColStats,
+            request.getValidWriteIdList(), request.getWriteId()));
+      }
+    } else {
+      // partition level column stats merging
+      // note that we may have two or more duplicate partition names.
+      // see autoColumnStats_2.q under TestMiniLlapLocalCliDriver
+      Map<String, ColumnStatistics> newStatsMap = new HashMap<>();
+      for (ColumnStatistics csNew : csNews) {
+        String partName = csNew.getStatsDesc().getPartName();
+        if (newStatsMap.containsKey(partName)) {
+          MetaStoreServerUtils.mergeColStats(csNew, newStatsMap.get(partName));
+        }
+        newStatsMap.put(partName, csNew);
+      }
+
+      if (needMerge) {
+        ret = updatePartColumnStatsWithMerge(colNames, newStatsMap);
+      } else { // No merge.
+        // We don't short-circuit on errors here anymore. That can leave acid 
stats invalid.
+        if (MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.TRY_DIRECT_SQL)) {
+          ret = updatePartitionColStatsInBatch(newStatsMap,
+              request.getValidWriteIdList(), request.getWriteId());
+        } else {
+          MTable mTable = ms.ensureGetMTable(catName, dbName, tableName);
+          for (Map.Entry<String, ColumnStatistics> entry : 
newStatsMap.entrySet()) {
+            // We don't short-circuit on errors here anymore. That can leave 
acid stats invalid.
+            ret = updatePartitonColStatsInternal(mTable, entry.getValue(),
+                request.getValidWriteIdList(), request.getWriteId()) && ret;
+          }
+        }
+      }
+    }
+    return new SetAggrStatsResult(ret);
+  }
+
+  private boolean updateTableColumnStatsWithMerge(List<String> colNames) 
throws MetaException,
+      NoSuchObjectException, InvalidObjectException, InvalidInputException {
+    ColumnStatistics firstColStats = request.getColStats().get(0);
+    ms.openTransaction();
+    boolean isCommitted = false, result = false;
+    try {
+      ColumnStatistics csOld = ms.getTableColumnStatistics(catName, dbName, 
tableName, colNames,
+          request.getEngine(), request.getValidWriteIdList());
+      // we first use the valid stats list to prune the stats
+      boolean isInvalidTxnStats = csOld != null
+          && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
+      if (isInvalidTxnStats) {
+        // No columns can be merged; a shortcut for getMergableCols.
+        firstColStats.setStatsObj(Lists.newArrayList());
+      } else {
+        MetaStoreServerUtils.getMergableCols(firstColStats, t.getParameters());
+        // we merge those that can be merged
+        if (csOld != null && csOld.getStatsObjSize() != 0 && 
!firstColStats.getStatsObj().isEmpty()) {
+          MetaStoreServerUtils.mergeColStats(firstColStats, csOld);
+        }
+      }
+
+      if (!firstColStats.getStatsObj().isEmpty()) {
+        result = updateTableColumnStatsInternal(firstColStats,
+            request.getValidWriteIdList(), request.getWriteId());
+      } else if (isInvalidTxnStats) {
+        // For now because the stats state is such as it is, we will 
invalidate everything.
+        // Overall the sematics here are not clear - we could invalide only 
some columns, but does
+        // that make any physical sense? Could query affect some columns but 
not others?
+        t.setWriteId(request.getWriteId());
+        StatsSetupConst.clearColumnStatsState(t.getParameters());
+        StatsSetupConst.setBasicStatsState(t.getParameters(), 
StatsSetupConst.FALSE);
+        ms.alterTable(catName, dbName, tableName, t, 
request.getValidWriteIdList());
+      } else {
+        // TODO: why doesn't the original call for non acid tables invalidate 
the stats?
+        LOG.debug("All the column stats are not accurate to merge.");
+        result = true;
+      }
+
+      ms.commitTransaction();
+      isCommitted = true;
+    } finally {
+      if (!isCommitted) {
+        ms.rollbackTransaction();
+      }
+    }
+    return result;
+  }
+
+  private boolean updateTableColumnStatsInternal(ColumnStatistics colStats,
+      String validWriteIds, long writeId)
+      throws NoSuchObjectException, MetaException, InvalidObjectException, 
InvalidInputException {
+    normalizeColStatsInput(colStats);
+
+    Map<String, String> parameters = null;
+    ms.openTransaction();
+    boolean committed = false;
+    try {
+      parameters = ms.updateTableColumnStatistics(colStats, validWriteIds, 
writeId);
+      if (parameters != null) {
+        Table tableObj = ms.getTable(colStats.getStatsDesc().getCatName(),
+            colStats.getStatsDesc().getDbName(),
+            colStats.getStatsDesc().getTableName(), validWriteIds);
+        if (!handler.getTransactionalListeners().isEmpty()) {
+          
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+              EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
+              new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
+                  writeId, handler));
+        }
+        if (!handler.getListeners().isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
+              EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT,
+              new UpdateTableColumnStatEvent(colStats, tableObj, parameters,
+                  writeId, handler));
+        }
+      }
+      committed = ms.commitTransaction();
+    } finally {
+      if (!committed) {
+        ms.rollbackTransaction();
+      }
+    }
+
+    return parameters != null;
+  }
+
+  private boolean updatePartColumnStatsWithMerge(
+      List<String> colNames, Map<String, ColumnStatistics> newStatsMap)
+      throws MetaException, NoSuchObjectException, InvalidObjectException, 
InvalidInputException {
+    ms.openTransaction();
+    boolean isCommitted = false, result = true;
+    try {
+      // a single call to get all column stats for all partitions
+      List<String> partitionNames = new ArrayList<>();
+      partitionNames.addAll(newStatsMap.keySet());
+      List<ColumnStatistics> csOlds = ms.getPartitionColumnStatistics(catName, 
dbName, tableName,
+          partitionNames, colNames, request.getEngine(), 
request.getValidWriteIdList());
+      if (newStatsMap.values().size() != csOlds.size()) {
+        // some of the partitions miss stats.
+        LOG.debug("Some of the partitions miss stats.");
+      }
+      Map<String, ColumnStatistics> oldStatsMap = new HashMap<>();
+      for (ColumnStatistics csOld : csOlds) {
+        oldStatsMap.put(csOld.getStatsDesc().getPartName(), csOld);
+      }
+
+      // another single call to get all the partition objects
+      List<Partition> partitions = ms.getPartitionsByNames(catName, dbName, 
tableName, partitionNames);
+      Map<String, Partition> mapToPart = new HashMap<>();
+      for (int index = 0; index < partitionNames.size(); index++) {
+        mapToPart.put(partitionNames.get(index), partitions.get(index));
+      }
+
+      MTable mTable = ms.ensureGetMTable(catName, dbName, tableName);
+      Map<String, ColumnStatistics> statsMap =  new HashMap<>();
+      boolean useDirectSql = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.TRY_DIRECT_SQL);
+      for (Map.Entry<String, ColumnStatistics> entry : newStatsMap.entrySet()) 
{
+        ColumnStatistics csNew = entry.getValue();
+        ColumnStatistics csOld = oldStatsMap.get(entry.getKey());
+        boolean isInvalidTxnStats = csOld != null
+            && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
+        Partition part = mapToPart.get(entry.getKey());
+        if (isInvalidTxnStats) {
+          // No columns can be merged; a shortcut for getMergableCols.
+          csNew.setStatsObj(Lists.newArrayList());
+        } else {
+          // we first use getParameters() to prune the stats
+          MetaStoreServerUtils.getMergableCols(csNew, part.getParameters());
+          // we merge those that can be merged
+          if (csOld != null && csOld.getStatsObjSize() != 0 && 
!csNew.getStatsObj().isEmpty()) {
+            MetaStoreServerUtils.mergeColStats(csNew, csOld);
+          }
+        }
+
+        if (!csNew.getStatsObj().isEmpty()) {
+          // We don't short-circuit on errors here anymore. That can leave 
acid stats invalid.
+          if (useDirectSql) {
+            statsMap.put(csNew.getStatsDesc().getPartName(), csNew);
+          } else {
+            result = updatePartitonColStatsInternal(mTable, csNew,
+                request.getValidWriteIdList(), request.getWriteId()) && result;
+          }
+        } else if (isInvalidTxnStats) {
+          // For now because the stats state is such as it is, we will 
invalidate everything.
+          // Overall the sematics here are not clear - we could invalide only 
some columns, but does

Review Comment:
   Spelling error in comment: "sematics" should be "semantics"
   ```suggestion
             // Overall the semantics here are not clear - we could invalide 
only some columns, but does
   ```



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/SetAggrStatsHandler.java:
##########
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent;
+import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.model.MTable;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getPartValsFromName;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+@SuppressWarnings("unused")
+@RequestHandler(requestBody = SetPartitionsStatsRequest.class)
+public class SetAggrStatsHandler
+    extends AbstractRequestHandler<SetPartitionsStatsRequest, 
SetAggrStatsHandler.SetAggrStatsResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SetAggrStatsHandler.class);
+  private RawStore ms;
+  private String catName;
+  private String dbName;
+  private String tableName;
+  private Table t;
+  private boolean needMerge;
+  private Configuration conf;
+
+  SetAggrStatsHandler(IHMSHandler handler, SetPartitionsStatsRequest request) {
+    super(handler, false, request);
+  }
+
+  @Override
+  protected void beforeExecute() throws TException, IOException {
+    this.needMerge = request.isSetNeedMerge() && request.isNeedMerge();
+    this.conf = handler.getConf();
+    this.ms = handler.getMS();
+    List<ColumnStatistics> csNews = request.getColStats();
+    if (csNews != null && !csNews.isEmpty()) {
+      ColumnStatistics firstColStats = csNews.get(0);
+      ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+      this.catName = normalizeIdentifier(statsDesc.isSetCatName() ? 
statsDesc.getCatName() : getDefaultCatalog(conf));
+      this.dbName = normalizeIdentifier(statsDesc.getDbName());
+      this.tableName = normalizeIdentifier(statsDesc.getTableName());
+      this.t = ms.getTable(catName, dbName, tableName);
+      if (statsDesc.isIsTblLevel() && request.getColStatsSize() != 1) {
+        // there should be only one ColumnStatistics
+        throw new MetaException(
+            "Expecting only 1 ColumnStatistics for table's column stats, but 
find " + request.getColStatsSize());
+      }
+    }
+  }
+
+  @Override
+  protected SetAggrStatsResult execute() throws TException, IOException {
+    boolean ret = true;
+    List<ColumnStatistics> csNews = request.getColStats();
+    if (csNews == null || csNews.isEmpty()) {
+      return new SetAggrStatsResult(true);
+    }
+    // figure out if it is table level or partition level
+    ColumnStatistics firstColStats = csNews.get(0);
+    ColumnStatisticsDesc statsDesc = firstColStats.getStatsDesc();
+    List<String> colNames = new ArrayList<>();
+    for (ColumnStatisticsObj obj : firstColStats.getStatsObj()) {
+      colNames.add(obj.getColName());
+    }
+    if (statsDesc.isIsTblLevel()) {
+      if (needMerge) {
+        return new 
SetAggrStatsResult(updateTableColumnStatsWithMerge(colNames));
+      } else {
+        // This is the overwrite case, we do not care about the accuracy.
+        return new 
SetAggrStatsResult(updateTableColumnStatsInternal(firstColStats,
+            request.getValidWriteIdList(), request.getWriteId()));
+      }
+    } else {
+      // partition level column stats merging
+      // note that we may have two or more duplicate partition names.
+      // see autoColumnStats_2.q under TestMiniLlapLocalCliDriver
+      Map<String, ColumnStatistics> newStatsMap = new HashMap<>();
+      for (ColumnStatistics csNew : csNews) {
+        String partName = csNew.getStatsDesc().getPartName();
+        if (newStatsMap.containsKey(partName)) {
+          MetaStoreServerUtils.mergeColStats(csNew, newStatsMap.get(partName));
+        }
+        newStatsMap.put(partName, csNew);
+      }
+
+      if (needMerge) {
+        ret = updatePartColumnStatsWithMerge(colNames, newStatsMap);
+      } else { // No merge.
+        // We don't short-circuit on errors here anymore. That can leave acid 
stats invalid.
+        if (MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.TRY_DIRECT_SQL)) {
+          ret = updatePartitionColStatsInBatch(newStatsMap,
+              request.getValidWriteIdList(), request.getWriteId());
+        } else {
+          MTable mTable = ms.ensureGetMTable(catName, dbName, tableName);
+          for (Map.Entry<String, ColumnStatistics> entry : 
newStatsMap.entrySet()) {
+            // We don't short-circuit on errors here anymore. That can leave 
acid stats invalid.
+            ret = updatePartitonColStatsInternal(mTable, entry.getValue(),
+                request.getValidWriteIdList(), request.getWriteId()) && ret;
+          }
+        }
+      }
+    }
+    return new SetAggrStatsResult(ret);
+  }
+
+  private boolean updateTableColumnStatsWithMerge(List<String> colNames) 
throws MetaException,
+      NoSuchObjectException, InvalidObjectException, InvalidInputException {
+    ColumnStatistics firstColStats = request.getColStats().get(0);
+    ms.openTransaction();
+    boolean isCommitted = false, result = false;
+    try {
+      ColumnStatistics csOld = ms.getTableColumnStatistics(catName, dbName, 
tableName, colNames,
+          request.getEngine(), request.getValidWriteIdList());
+      // we first use the valid stats list to prune the stats
+      boolean isInvalidTxnStats = csOld != null
+          && csOld.isSetIsStatsCompliant() && !csOld.isIsStatsCompliant();
+      if (isInvalidTxnStats) {
+        // No columns can be merged; a shortcut for getMergableCols.
+        firstColStats.setStatsObj(Lists.newArrayList());
+      } else {
+        MetaStoreServerUtils.getMergableCols(firstColStats, t.getParameters());
+        // we merge those that can be merged
+        if (csOld != null && csOld.getStatsObjSize() != 0 && 
!firstColStats.getStatsObj().isEmpty()) {
+          MetaStoreServerUtils.mergeColStats(firstColStats, csOld);
+        }
+      }
+
+      if (!firstColStats.getStatsObj().isEmpty()) {
+        result = updateTableColumnStatsInternal(firstColStats,
+            request.getValidWriteIdList(), request.getWriteId());
+      } else if (isInvalidTxnStats) {
+        // For now because the stats state is such as it is, we will 
invalidate everything.
+        // Overall the sematics here are not clear - we could invalide only 
some columns, but does

Review Comment:
   Spelling error in comment: "sematics" should be "semantics"
   ```suggestion
           // Overall the semantics here are not clear - we could invalide only 
some columns, but does
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to