This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4a7cb736a8a61ee31b165a8f6158b7e15cf0a32a
Author: Venu Reddy <[email protected]>
AuthorDate: Thu Jul 10 16:42:10 2025 +0530

    IMPALA-14209: Fixed incorrect usage of WriteEventInfos with commitTxnMessage
    
    HIVE-28976 modified CommitTxnMessage to pass write ids in the message.
    But, catalogd do not expect write ids in the CommitTxnMessage.
    writeEventInfos is explicitly fetched from HMS using
    getAllWriteEventInfo() API and populate them to its local
    CommitTxnMessage object using CommitTxnMessage.addWriteEventInfo().
    Since the write ids are already present in CommitTxnMessage, when we
    do addWriteEventInfo() on it, write ids are added again to the
    writeIds list in CommitTxnMessage and cause the
    IndexOutOfBoundsException upon CommitTxnMessage.getTableObj(i).
    
    Fix is to not populate the fetched WriteEventInfos to local
    CommitTxnMessage object. Instead, use them directly.
    
    Testing:
    - Executed existing tests.
    
    Change-Id: Ie9b955b2b7427db9771ffec7ac5a16617f5ea022
    Reviewed-on: http://gerrit.cloudera.org:8080/23152
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/compat/MetastoreShim.java    | 89 ++++++++++++----------
 1 file changed, 50 insertions(+), 39 deletions(-)

diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index beabac03e..06bcd4514 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -123,6 +123,7 @@ import 
org.apache.impala.catalog.metastore.CatalogMetastoreServer;
 import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Metrics;
+import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.hive.common.MutableValidWriteIdList;
 import org.apache.impala.service.BackendConfig;
@@ -592,8 +593,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
             .getReloadMessage(event.getMessage());
     Map<String, Object> updatedFields = new HashMap<>();
     try {
-      org.apache.hadoop.hive.metastore.api.Table msTbl = 
Preconditions.checkNotNull(
-          reloadMessage.getTableObj());
+      Table msTbl = Preconditions.checkNotNull(reloadMessage.getTableObj());
       Partition reloadPartition = reloadMessage.getPtnObj();
       boolean isRefresh = reloadMessage.isRefreshEvent();
       updatedFields.put("table", msTbl);
@@ -867,7 +867,6 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
    */
   public static class CommitTxnEvent extends MetastoreEvent {
     public static final String EVENT_TYPE = "COMMIT_TXN";
-    private final CommitTxnMessage commitTxnMessage_;
     private final long txnId_;
     private Set<TableWriteId> tableWriteIds_ = Collections.emptySet();
     private final Set<String> tableNames_ = new HashSet<>();
@@ -877,9 +876,10 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
       super(catalogOpExecutor, metrics, event);
       
Preconditions.checkState(getEventType().equals(MetastoreEventType.COMMIT_TXN));
       Preconditions.checkNotNull(event.getMessage());
-      commitTxnMessage_ = MetastoreEventsProcessor.getMessageDeserializer()
+      CommitTxnMessage commitTxnMessage =
+          MetastoreEventsProcessor.getMessageDeserializer()
           .getCommitTxnMessage(event.getMessage());
-      txnId_ = commitTxnMessage_.getTxnId();
+      txnId_ = commitTxnMessage.getTxnId();
       tableWriteIds_ = catalog_.removeWriteIds(txnId_);
       LOG.info("EventId: {} EventType: COMMIT_TXN transaction id: {}", 
getEventId(),
           txnId_);
@@ -908,8 +908,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
 
       try {
         if (writeEventInfoList != null && !writeEventInfoList.isEmpty()) {
-          commitTxnMessage_.addWriteEventInfo(writeEventInfoList);
-          addCommittedWriteIdsAndRefreshPartitions();
+          addCommittedWriteIdsAndRefreshPartitions(writeEventInfoList);
         }
         // committed write ids for DDL need to be added here
         addCommittedWriteIdsToTables(tableWriteIds_);
@@ -952,28 +951,35 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
       }
     }
 
-    private void addCommittedWriteIdsAndRefreshPartitions() throws Exception {
-      Preconditions.checkNotNull(commitTxnMessage_.getWriteIds());
-      List<Long> writeIds = 
Collections.unmodifiableList(commitTxnMessage_.getWriteIds());
+    private void addCommittedWriteIdsAndRefreshPartitions(
+        List<WriteEventInfo> writeEventInfoList) throws Exception {
+      List<Long> writeIds = 
writeEventInfoList.stream().map(WriteEventInfo::getWriteId)
+          .collect(Collectors.toList());
       List<Partition> parts = new ArrayList<>();
       // To load partitions together for the same table, indexes are grouped 
by table name
-      Map<TableName, List<Integer>> tableNameToIdxs = new HashMap<>();
+      Map<TableName, Pair<Table, List<Integer>>> tableNameToIdxs = new 
HashMap<>();
       for (int i = 0; i < writeIds.size(); i++) {
-        org.apache.hadoop.hive.metastore.api.Table tbl = 
commitTxnMessage_.getTableObj(i);
+        Table tbl = (Table) MessageBuilder.getTObj(
+            writeEventInfoList.get(i).getTableObj(), Table.class);
         TableName tableName = new TableName(tbl.getDbName(), 
tbl.getTableName());
-        parts.add(commitTxnMessage_.getPartitionObj(i));
-        tableNameToIdxs.computeIfAbsent(tableName, k -> new 
ArrayList<>()).add(i);
+        Partition partition = null;
+        if (writeEventInfoList.get(i).getPartitionObj() != null) {
+          partition = (Partition) MessageBuilder.getTObj(
+              writeEventInfoList.get(i).getPartitionObj(), Partition.class);
+        }
+        parts.add(partition);
+        Pair<Table, List<Integer>> pair = 
tableNameToIdxs.computeIfAbsent(tableName,
+            k -> new Pair<>(tbl, new ArrayList<>()));
+        pair.getSecond().add(i);
         tableNames_.add(tableName.toString());
       }
-      for (Map.Entry<TableName, List<Integer>> entry : 
tableNameToIdxs.entrySet()) {
-        org.apache.hadoop.hive.metastore.api.Table tbl =
-            commitTxnMessage_.getTableObj(entry.getValue().get(0));
-        List<Long> writeIdsForTable = entry.getValue().stream()
-            .map(i -> writeIds.get(i))
-            .collect(Collectors.toList());
-        List<Partition> partsForTable = entry.getValue().stream()
-            .map(i -> parts.get(i))
-            .collect(Collectors.toList());
+      for (Map.Entry<TableName, Pair<Table, List<Integer>>> entry :
+          tableNameToIdxs.entrySet()) {
+        Table tbl = entry.getValue().getFirst();
+        List<Long> writeIdsForTable = entry.getValue().getSecond().stream()
+            .map(i -> writeIds.get(i)).collect(Collectors.toList());
+        List<Partition> partsForTable = entry.getValue().getSecond().stream()
+            .map(i -> parts.get(i)).collect(Collectors.toList());
         addCommittedWriteIdsAndReload(getCatalogOpExecutor(), tbl.getDbName(),
             tbl.getTableName(), tbl.getPartitionKeysSize() > 0,
             MetaStoreUtils.isMaterializedViewTable(tbl), writeIdsForTable, 
partsForTable,
@@ -1119,39 +1125,44 @@ public class MetastoreShim extends 
Hive3MetastoreShimBase {
     }
     try {
       // Build table name to write id indices mapping from the HMS event
-      CommitTxnMessage commitTxnMessage = event.commitTxnMessage_;
-      List<Long> writeIds;
       List<Partition> parts = new ArrayList<>();
-      Map<TableName, List<Integer>> tableNameToIdxs = new HashMap<>();
+      Map<TableName, Pair<Table, List<Integer>>> tableNameToIdxs = new 
HashMap<>();
       if (writeEventInfoList != null && !writeEventInfoList.isEmpty()) {
-        commitTxnMessage.addWriteEventInfo(writeEventInfoList);
-        writeIds = commitTxnMessage.getWriteIds();
+        List<Long> writeIds = 
writeEventInfoList.stream().map(WriteEventInfo::getWriteId)
+            .collect(Collectors.toList());
         for (int i = 0; i < writeIds.size(); i++) {
-          org.apache.hadoop.hive.metastore.api.Table tbl =
-              commitTxnMessage.getTableObj(i);
+          Table tbl = (Table) MessageBuilder.getTObj(
+              writeEventInfoList.get(i).getTableObj(), Table.class);
           TableName tableName = new TableName(tbl.getDbName(), 
tbl.getTableName());
-          parts.add(commitTxnMessage.getPartitionObj(i));
-          tableNameToIdxs.computeIfAbsent(tableName, k -> new 
ArrayList<>()).add(i);
+          Partition partition = null;
+          if (writeEventInfoList.get(i).getPartitionObj() != null) {
+            partition = (Partition) MessageBuilder.getTObj(
+                writeEventInfoList.get(i).getPartitionObj(), Partition.class);
+          }
+          parts.add(partition);
+          Pair<Table, List<Integer>> pair = 
tableNameToIdxs.computeIfAbsent(tableName,
+              k -> new Pair<>(tbl, new ArrayList<>()));
+          pair.getSecond().add(i);
         }
         // Form list of PseudoCommitTxnEvent
         List<Long> finalWriteIds = writeIds;
-        for (Map.Entry<TableName, List<Integer>> entry : 
tableNameToIdxs.entrySet()) {
+        for (Map.Entry<TableName, Pair<Table, List<Integer>>> entry :
+            tableNameToIdxs.entrySet()) {
           List<Long> writeIdsInCatalog = 
tableNameToWriteIds.remove(entry.getKey());
-          org.apache.hadoop.hive.metastore.api.Table tbl =
-              commitTxnMessage.getTableObj(entry.getValue().get(0));
+          Table tbl = entry.getValue().getFirst();
           pseudoEvents.add(
               new PseudoCommitTxnEvent(event, tbl.getDbName(), 
tbl.getTableName(),
                   tbl.getPartitionKeysSize() > 0,
                   MetaStoreUtils.isMaterializedViewTable(tbl), 
writeIdsInCatalog,
-                  entry.getValue().stream().map(i -> finalWriteIds.get(i))
+                  entry.getValue().getSecond().stream().map(i -> 
finalWriteIds.get(i))
                       .collect(Collectors.toList()),
-                  entry.getValue().stream().map(i -> parts.get(i))
+                  entry.getValue().getSecond().stream().map(i -> parts.get(i))
                       .collect(Collectors.toList())));
         }
       }
       for (Map.Entry<TableName, List<Long>> entry : 
tableNameToWriteIds.entrySet()) {
-        org.apache.hadoop.hive.metastore.api.Table tbl = 
event.getCatalogOpExecutor()
-            .getCatalog().getTable(entry.getKey().getDb(), 
entry.getKey().getTbl())
+        Table tbl = event.getCatalogOpExecutor().getCatalog()
+            .getTable(entry.getKey().getDb(), entry.getKey().getTbl())
             .getMetaStoreTable();
         pseudoEvents.add(
             new PseudoCommitTxnEvent(event, tbl.getDbName(), 
tbl.getTableName(),

Reply via email to