This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit f545a0828df9874881a224b49575e0aebb195dab Author: Sai Hemanth Gantasala <[email protected]> AuthorDate: Thu May 8 14:03:14 2025 -0700 IMPALA-14051: Use batch insert HMS API to fire insert events Firing insert statements from Impala can be improved by leveraging the batch insert HMS API addWriteNotificationLogInBatch() available (via HIVE-25205) starting from Hive 4. Right now, Impala is using a single call for each partition, so firing insert statements on a huge partitioned table can become a performance bottleneck. This patch addresses the above concern by leveraging addWriteNotificationLogInBatch() API to fire batch insert partition events. Note: This optimization is applicable only for transactional partitioned tables. The external partitioned table case is already handled. Change-Id: Iff4a2691631fe9e9e1dc07714c69580a4ace6a8b Reviewed-on: http://gerrit.cloudera.org:8080/22869 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Riza Suminto <[email protected]> --- .../org/apache/impala/compat/MetastoreShim.java | 43 +++++++++++++--------- .../java/org/apache/impala/util/MetaStoreUtil.java | 2 +- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index f2c71fab8..beabac03e 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; import org.apache.hadoop.hive.metastore.api.SourceTable; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; +import org.apache.hadoop.hive.metastore.api.WriteNotificationLogBatchRequest; import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; @@ -127,6 +128,7 @@ import org.apache.impala.hive.common.MutableValidWriteIdList; import org.apache.impala.service.BackendConfig; import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.util.AcidUtils.TblTransaction; +import org.apache.impala.util.MetaStoreUtil; import org.apache.impala.util.MetaStoreUtil.TableInsertEventInfo; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -443,26 +445,33 @@ public class MetastoreShim extends Hive3MetastoreShimBase { private static void fireInsertTransactionalEventHelper( IMetaStoreClient hiveClient, TableInsertEventInfo insertEventInfo, String dbName, String tableName) throws TException { - for (InsertEventRequestData insertData : insertEventInfo.getInsertEventReqData()) { - // TODO(Vihang) unfortunately there is no bulk insert event API for transactional - // tables. It is possible that this may take long time here if there are lots of - // partitions which were inserted. + int insertBatchSize = insertEventInfo.getInsertEventReqData().size(); + int maxRPCBatchSize = MetaStoreUtil.DEFAULT_MAX_PARTITIONS_PER_RPC; + for (int i = 0; i < insertBatchSize; i += maxRPCBatchSize) { + List<WriteNotificationLogRequest> batchRequestList = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + List<InsertEventRequestData> insertDataList = + insertEventInfo.getInsertEventReqData().subList(i, + Math.min(i + maxRPCBatchSize, insertBatchSize)); + for (InsertEventRequestData insertData : insertDataList) { + WriteNotificationLogRequest req = new WriteNotificationLogRequest( + insertEventInfo.getTxnId(), insertEventInfo.getWriteId(), dbName, tableName, + insertData); + if (insertData.isSetPartitionVal()) { + req.setPartitionVals(insertData.getPartitionVal()); + sb.append(insertData.getPartitionVal()); + } + batchRequestList.add(req); + } + WriteNotificationLogBatchRequest batchedReq = new WriteNotificationLogBatchRequest( + getDefaultCatalogName(), dbName, tableName, batchRequestList); + hiveClient.addWriteNotificationLogInBatch(batchedReq); if (LOG.isDebugEnabled()) { - String msg = - "Firing write notification log request for table " + dbName + "." + tableName - + (insertData.isSetPartitionVal() ? " on partition " + insertData - .getPartitionVal() : ""); + String msg = "Firing write notification log request for table " + dbName + "." + + tableName + (!sb.toString().isEmpty() ? " on partitions " + + sb.toString() : ""); LOG.debug(msg); } - WriteNotificationLogRequest rqst = new WriteNotificationLogRequest( - insertEventInfo.getTxnId(), insertEventInfo.getWriteId(), dbName, tableName, - insertData); - if (insertData.isSetPartitionVal()) { - rqst.setPartitionVals(insertData.getPartitionVal()); - } - // TODO(Vihang) metastore should return the event id here so that we can get rid - // of firing INSERT event types for transactional tables. - hiveClient.addWriteNotificationLog(rqst); } } diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java index 03d4756b5..58d825b47 100644 --- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java +++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java @@ -73,7 +73,7 @@ public class MetaStoreUtil { // The default maximum number of partitions to fetch from the Hive metastore in one // RPC. - private static final short DEFAULT_MAX_PARTITIONS_PER_RPC = 1000; + public static final short DEFAULT_MAX_PARTITIONS_PER_RPC = 1000; // The maximum number of partitions to fetch from the metastore in one RPC. // Read from the 'hive.metastore.batch.retrieve.table.partition.max' Hive configuration
