This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4a7cb736a8a61ee31b165a8f6158b7e15cf0a32a Author: Venu Reddy <[email protected]> AuthorDate: Thu Jul 10 16:42:10 2025 +0530 IMPALA-14209: Fixed incorrect usage of WriteEventInfos with commitTxnMessage HIVE-28976 modified CommitTxnMessage to pass write ids in the message. But, catalogd do not expect write ids in the CommitTxnMessage. writeEventInfos is explicitly fetched from HMS using getAllWriteEventInfo() API and populate them to its local CommitTxnMessage object using CommitTxnMessage.addWriteEventInfo(). Since the write ids are already present in CommitTxnMessage, when we do addWriteEventInfo() on it, write ids are added again to the writeIds list in CommitTxnMessage and cause the IndexOutOfBoundsException upon CommitTxnMessage.getTableObj(i). Fix is to not populate the fetched WriteEventInfos to local CommitTxnMessage object. Instead, use them directly. Testing: - Executed existing tests. Change-Id: Ie9b955b2b7427db9771ffec7ac5a16617f5ea022 Reviewed-on: http://gerrit.cloudera.org:8080/23152 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/compat/MetastoreShim.java | 89 ++++++++++++---------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index beabac03e..06bcd4514 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -123,6 +123,7 @@ import org.apache.impala.catalog.metastore.CatalogMetastoreServer; import org.apache.impala.catalog.metastore.ICatalogMetastoreServer; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Metrics; +import org.apache.impala.common.Pair; import org.apache.impala.common.PrintUtils; import org.apache.impala.hive.common.MutableValidWriteIdList; import org.apache.impala.service.BackendConfig; @@ -592,8 +593,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase { .getReloadMessage(event.getMessage()); Map<String, Object> updatedFields = new HashMap<>(); try { - org.apache.hadoop.hive.metastore.api.Table msTbl = Preconditions.checkNotNull( - reloadMessage.getTableObj()); + Table msTbl = Preconditions.checkNotNull(reloadMessage.getTableObj()); Partition reloadPartition = reloadMessage.getPtnObj(); boolean isRefresh = reloadMessage.isRefreshEvent(); updatedFields.put("table", msTbl); @@ -867,7 +867,6 @@ public class MetastoreShim extends Hive3MetastoreShimBase { */ public static class CommitTxnEvent extends MetastoreEvent { public static final String EVENT_TYPE = "COMMIT_TXN"; - private final CommitTxnMessage commitTxnMessage_; private final long txnId_; private Set<TableWriteId> tableWriteIds_ = Collections.emptySet(); private final Set<String> tableNames_ = new HashSet<>(); @@ -877,9 +876,10 @@ public class MetastoreShim extends Hive3MetastoreShimBase { super(catalogOpExecutor, metrics, event); Preconditions.checkState(getEventType().equals(MetastoreEventType.COMMIT_TXN)); Preconditions.checkNotNull(event.getMessage()); - commitTxnMessage_ = MetastoreEventsProcessor.getMessageDeserializer() + CommitTxnMessage commitTxnMessage = + MetastoreEventsProcessor.getMessageDeserializer() .getCommitTxnMessage(event.getMessage()); - txnId_ = commitTxnMessage_.getTxnId(); + txnId_ = commitTxnMessage.getTxnId(); tableWriteIds_ = catalog_.removeWriteIds(txnId_); LOG.info("EventId: {} EventType: COMMIT_TXN transaction id: {}", getEventId(), txnId_); @@ -908,8 +908,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase { try { if (writeEventInfoList != null && !writeEventInfoList.isEmpty()) { - commitTxnMessage_.addWriteEventInfo(writeEventInfoList); - addCommittedWriteIdsAndRefreshPartitions(); + addCommittedWriteIdsAndRefreshPartitions(writeEventInfoList); } // committed write ids for DDL need to be added here addCommittedWriteIdsToTables(tableWriteIds_); @@ -952,28 +951,35 @@ public class MetastoreShim extends Hive3MetastoreShimBase { } } - private void addCommittedWriteIdsAndRefreshPartitions() throws Exception { - Preconditions.checkNotNull(commitTxnMessage_.getWriteIds()); - List<Long> writeIds = Collections.unmodifiableList(commitTxnMessage_.getWriteIds()); + private void addCommittedWriteIdsAndRefreshPartitions( + List<WriteEventInfo> writeEventInfoList) throws Exception { + List<Long> writeIds = writeEventInfoList.stream().map(WriteEventInfo::getWriteId) + .collect(Collectors.toList()); List<Partition> parts = new ArrayList<>(); // To load partitions together for the same table, indexes are grouped by table name - Map<TableName, List<Integer>> tableNameToIdxs = new HashMap<>(); + Map<TableName, Pair<Table, List<Integer>>> tableNameToIdxs = new HashMap<>(); for (int i = 0; i < writeIds.size(); i++) { - org.apache.hadoop.hive.metastore.api.Table tbl = commitTxnMessage_.getTableObj(i); + Table tbl = (Table) MessageBuilder.getTObj( + writeEventInfoList.get(i).getTableObj(), Table.class); TableName tableName = new TableName(tbl.getDbName(), tbl.getTableName()); - parts.add(commitTxnMessage_.getPartitionObj(i)); - tableNameToIdxs.computeIfAbsent(tableName, k -> new ArrayList<>()).add(i); + Partition partition = null; + if (writeEventInfoList.get(i).getPartitionObj() != null) { + partition = (Partition) MessageBuilder.getTObj( + writeEventInfoList.get(i).getPartitionObj(), Partition.class); + } + parts.add(partition); + Pair<Table, List<Integer>> pair = tableNameToIdxs.computeIfAbsent(tableName, + k -> new Pair<>(tbl, new ArrayList<>())); + pair.getSecond().add(i); tableNames_.add(tableName.toString()); } - for (Map.Entry<TableName, List<Integer>> entry : tableNameToIdxs.entrySet()) { - org.apache.hadoop.hive.metastore.api.Table tbl = - commitTxnMessage_.getTableObj(entry.getValue().get(0)); - List<Long> writeIdsForTable = entry.getValue().stream() - .map(i -> writeIds.get(i)) - .collect(Collectors.toList()); - List<Partition> partsForTable = entry.getValue().stream() - .map(i -> parts.get(i)) - .collect(Collectors.toList()); + for (Map.Entry<TableName, Pair<Table, List<Integer>>> entry : + tableNameToIdxs.entrySet()) { + Table tbl = entry.getValue().getFirst(); + List<Long> writeIdsForTable = entry.getValue().getSecond().stream() + .map(i -> writeIds.get(i)).collect(Collectors.toList()); + List<Partition> partsForTable = entry.getValue().getSecond().stream() + .map(i -> parts.get(i)).collect(Collectors.toList()); addCommittedWriteIdsAndReload(getCatalogOpExecutor(), tbl.getDbName(), tbl.getTableName(), tbl.getPartitionKeysSize() > 0, MetaStoreUtils.isMaterializedViewTable(tbl), writeIdsForTable, partsForTable, @@ -1119,39 +1125,44 @@ public class MetastoreShim extends Hive3MetastoreShimBase { } try { // Build table name to write id indices mapping from the HMS event - CommitTxnMessage commitTxnMessage = event.commitTxnMessage_; - List<Long> writeIds; List<Partition> parts = new ArrayList<>(); - Map<TableName, List<Integer>> tableNameToIdxs = new HashMap<>(); + Map<TableName, Pair<Table, List<Integer>>> tableNameToIdxs = new HashMap<>(); if (writeEventInfoList != null && !writeEventInfoList.isEmpty()) { - commitTxnMessage.addWriteEventInfo(writeEventInfoList); - writeIds = commitTxnMessage.getWriteIds(); + List<Long> writeIds = writeEventInfoList.stream().map(WriteEventInfo::getWriteId) + .collect(Collectors.toList()); for (int i = 0; i < writeIds.size(); i++) { - org.apache.hadoop.hive.metastore.api.Table tbl = - commitTxnMessage.getTableObj(i); + Table tbl = (Table) MessageBuilder.getTObj( + writeEventInfoList.get(i).getTableObj(), Table.class); TableName tableName = new TableName(tbl.getDbName(), tbl.getTableName()); - parts.add(commitTxnMessage.getPartitionObj(i)); - tableNameToIdxs.computeIfAbsent(tableName, k -> new ArrayList<>()).add(i); + Partition partition = null; + if (writeEventInfoList.get(i).getPartitionObj() != null) { + partition = (Partition) MessageBuilder.getTObj( + writeEventInfoList.get(i).getPartitionObj(), Partition.class); + } + parts.add(partition); + Pair<Table, List<Integer>> pair = tableNameToIdxs.computeIfAbsent(tableName, + k -> new Pair<>(tbl, new ArrayList<>())); + pair.getSecond().add(i); } // Form list of PseudoCommitTxnEvent List<Long> finalWriteIds = writeIds; - for (Map.Entry<TableName, List<Integer>> entry : tableNameToIdxs.entrySet()) { + for (Map.Entry<TableName, Pair<Table, List<Integer>>> entry : + tableNameToIdxs.entrySet()) { List<Long> writeIdsInCatalog = tableNameToWriteIds.remove(entry.getKey()); - org.apache.hadoop.hive.metastore.api.Table tbl = - commitTxnMessage.getTableObj(entry.getValue().get(0)); + Table tbl = entry.getValue().getFirst(); pseudoEvents.add( new PseudoCommitTxnEvent(event, tbl.getDbName(), tbl.getTableName(), tbl.getPartitionKeysSize() > 0, MetaStoreUtils.isMaterializedViewTable(tbl), writeIdsInCatalog, - entry.getValue().stream().map(i -> finalWriteIds.get(i)) + entry.getValue().getSecond().stream().map(i -> finalWriteIds.get(i)) .collect(Collectors.toList()), - entry.getValue().stream().map(i -> parts.get(i)) + entry.getValue().getSecond().stream().map(i -> parts.get(i)) .collect(Collectors.toList()))); } } for (Map.Entry<TableName, List<Long>> entry : tableNameToWriteIds.entrySet()) { - org.apache.hadoop.hive.metastore.api.Table tbl = event.getCatalogOpExecutor() - .getCatalog().getTable(entry.getKey().getDb(), entry.getKey().getTbl()) + Table tbl = event.getCatalogOpExecutor().getCatalog() + .getTable(entry.getKey().getDb(), entry.getKey().getTbl()) .getMetaStoreTable(); pseudoEvents.add( new PseudoCommitTxnEvent(event, tbl.getDbName(), tbl.getTableName(),
