This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f545a0828df9874881a224b49575e0aebb195dab
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Thu May 8 14:03:14 2025 -0700

    IMPALA-14051: Use batch insert HMS API to fire insert events
    
    Firing insert statements from Impala can be improved by leveraging the
    batch insert HMS API addWriteNotificationLogInBatch() available (via
    HIVE-25205) starting from Hive 4. Right now, Impala is using a single
    call for each partition, so firing insert statements on a huge
    partitioned table can become a performance bottleneck.
    
    This patch addresses the above concern by leveraging
    addWriteNotificationLogInBatch() API to fire batch insert partition
    events.
    
    Note: This optimization is applicable only for transactional partitioned
    tables. The external partitioned table case is already handled.
    
    Change-Id: Iff4a2691631fe9e9e1dc07714c69580a4ace6a8b
    Reviewed-on: http://gerrit.cloudera.org:8080/22869
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Riza Suminto <[email protected]>
---
 .../org/apache/impala/compat/MetastoreShim.java    | 43 +++++++++++++---------
 .../java/org/apache/impala/util/MetaStoreUtil.java |  2 +-
 2 files changed, 27 insertions(+), 18 deletions(-)

diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index f2c71fab8..beabac03e 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -76,6 +76,7 @@ import 
org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
 import org.apache.hadoop.hive.metastore.api.SourceTable;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogBatchRequest;
 import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
@@ -127,6 +128,7 @@ import 
org.apache.impala.hive.common.MutableValidWriteIdList;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.util.AcidUtils.TblTransaction;
+import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.MetaStoreUtil.TableInsertEventInfo;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -443,26 +445,33 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
   private static void fireInsertTransactionalEventHelper(
       IMetaStoreClient hiveClient, TableInsertEventInfo insertEventInfo, 
String dbName,
       String tableName) throws TException {
-    for (InsertEventRequestData insertData : 
insertEventInfo.getInsertEventReqData()) {
-      // TODO(Vihang) unfortunately there is no bulk insert event API for 
transactional
-      // tables. It is possible that this may take long time here if there are 
lots of
-      // partitions which were inserted.
+    int insertBatchSize = insertEventInfo.getInsertEventReqData().size();
+    int maxRPCBatchSize = MetaStoreUtil.DEFAULT_MAX_PARTITIONS_PER_RPC;
+    for (int i = 0; i < insertBatchSize; i += maxRPCBatchSize) {
+      List<WriteNotificationLogRequest> batchRequestList = new ArrayList<>();
+      StringBuilder sb = new StringBuilder();
+      List<InsertEventRequestData> insertDataList =
+          insertEventInfo.getInsertEventReqData().subList(i,
+              Math.min(i + maxRPCBatchSize, insertBatchSize));
+      for (InsertEventRequestData insertData : insertDataList) {
+        WriteNotificationLogRequest req = new WriteNotificationLogRequest(
+            insertEventInfo.getTxnId(), insertEventInfo.getWriteId(), dbName, 
tableName,
+            insertData);
+        if (insertData.isSetPartitionVal()) {
+          req.setPartitionVals(insertData.getPartitionVal());
+          sb.append(insertData.getPartitionVal());
+        }
+        batchRequestList.add(req);
+      }
+      WriteNotificationLogBatchRequest batchedReq = new 
WriteNotificationLogBatchRequest(
+          getDefaultCatalogName(), dbName, tableName, batchRequestList);
+      hiveClient.addWriteNotificationLogInBatch(batchedReq);
       if (LOG.isDebugEnabled()) {
-        String msg =
-            "Firing write notification log request for table " + dbName + "." 
+ tableName
-                + (insertData.isSetPartitionVal() ? " on partition " + 
insertData
-                .getPartitionVal() : "");
+        String msg = "Firing write notification log request for table " + 
dbName + "."
+            + tableName + (!sb.toString().isEmpty() ? " on partitions "
+            + sb.toString() : "");
         LOG.debug(msg);
       }
-      WriteNotificationLogRequest rqst = new WriteNotificationLogRequest(
-          insertEventInfo.getTxnId(), insertEventInfo.getWriteId(), dbName, 
tableName,
-          insertData);
-      if (insertData.isSetPartitionVal()) {
-        rqst.setPartitionVals(insertData.getPartitionVal());
-      }
-      // TODO(Vihang) metastore should return the event id here so that we can 
get rid
-      // of firing INSERT event types for transactional tables.
-      hiveClient.addWriteNotificationLog(rqst);
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java 
b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index 03d4756b5..58d825b47 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -73,7 +73,7 @@ public class MetaStoreUtil {
 
   // The default maximum number of partitions to fetch from the Hive metastore 
in one
   // RPC.
-  private static final short DEFAULT_MAX_PARTITIONS_PER_RPC = 1000;
+  public static final short DEFAULT_MAX_PARTITIONS_PER_RPC = 1000;
 
   // The maximum number of partitions to fetch from the metastore in one RPC.
   // Read from the 'hive.metastore.batch.retrieve.table.partition.max' Hive 
configuration

Reply via email to