This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8a3642864edbec450661a1aabd21dab0fc574c27 Author: stiga-huang <[email protected]> AuthorDate: Fri Mar 21 21:02:56 2025 +0800 IMPALA-13884: Add more details in metadata loading logs Adds the target of the event in HMS event processing logs. For AlterPartitionEvent, the partition name will also be printed. Note that Add/DropPartitionEvent could have multiple partitions so ignore them for now. E.g. EventId: 5271 EventType: ALTER_PARTITION Target: db1.tbl year=2009/month=1 EventId: 28781 EventType: ALTER_TABLE Target: tpcds.store_sales In the reason of loading table metadata, adds the event id if the request comes from the EventProcessor. E.g. Reloading metadata for table definition and all partition(s) of db1.tbl (INSERT event 15765) Reloading partition metadata: db1.tbl part=1 (INSERT event 15685) Reloading partition metadata: cachedb.cached_tbl_part j=4,j=1,j=2... and 2 others (batch of 5 ALTER_PARTITIONS events from 38063 to 38067) For logs of execDdl requests on AlterTable operations, add the AlterType. E.g. execDdl request: ALTER_TABLE db1.tbl UPDATE_STATS issued by user1 execDdl request: ALTER_TABLE db1.tbl RECOVER_PARTITIONS issued by user1 For the warning of waiting for table write lock too long in CatalogServiceCatalog.tryLock(), also logs the stacktrace. E.g. Write lock for table db1.tbl was acquired in 3329 msec. Caller stacktrace: org.apache.impala.catalog.CatalogServiceCatalog.tryLock(CatalogServiceCatalog.java:635) at org.apache.impala.catalog.CatalogServiceCatalog.tryWriteLock(CatalogServiceCatalog.java:537) at org.apache.impala.catalog.CatalogServiceCatalog.evaluateSelfEvent(CatalogServiceCatalog.java:1250) at org.apache.impala.catalog.events.MetastoreEvents$MetastoreEvent.isSelfEvent(MetastoreEvents.java:928) at org.apache.impala.catalog.events.MetastoreEvents$MetastoreTableEvent.isSelfEvent(MetastoreEvents.java:1259) at org.apache.impala.catalog.events.MetastoreEvents$BatchPartitionEvent.processTableEvent(MetastoreEvents.java:2819) at org.apache.impala.catalog.events.MetastoreEvents$MetastoreTableEvent.process(MetastoreEvents.java:1348) at org.apache.impala.catalog.events.MetastoreEvents$MetastoreEvent.processIfEnabled(MetastoreEvents.java:703) at org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:1354) at org.apache.impala.catalog.events.MetastoreEventsProcessor.processEvents(MetastoreEventsProcessor.java:1097) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) For metadata loading that got ignored due to the table being updated, e.g. invalidated concurrently, log the reason. E.g. Not updating table db1.tbl since it has been modified. Current catalog version: 30229. Expected catalog version: 25877 Adds a log when start recovering partitions. E.g. Recovering 9 partitions for db1.tbl Also adds an overload of AcidUtils.isTransactionalTable() for FeTable to simplify some codes. Fix a wrong errorMessageTemplate of Preconditions.checkArgument() in FeCatalogUtils.parsePartitionKeyValues(). Only "%s" can be used as placeholders. Arguments will be converted to strings using String.valueOf(Object) so don't need "%d". Tests: - Passed exhaustive tests Change-Id: I204d5922e055fd8501b5573e3b913f8874d891d6 Reviewed-on: http://gerrit.cloudera.org:8080/22653 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/compat/MetastoreShim.java | 6 +- .../java/org/apache/impala/analysis/Analyzer.java | 2 +- .../impala/analysis/ConvertTableToIcebergStmt.java | 3 +- .../apache/impala/analysis/ResetMetadataStmt.java | 3 +- .../apache/impala/analysis/StmtMetadataLoader.java | 2 +- .../impala/catalog/CatalogServiceCatalog.java | 56 ++++++++----- .../org/apache/impala/catalog/FeCatalogUtils.java | 2 +- .../impala/catalog/events/MetastoreEvents.java | 97 +++++++++++++--------- .../catalog/events/MetastoreEventsProcessor.java | 2 +- .../apache/impala/service/CatalogOpExecutor.java | 20 ++--- .../java/org/apache/impala/service/Frontend.java | 11 +-- .../java/org/apache/impala/util/AcidUtils.java | 8 +- .../java/org/apache/impala/util/CatalogOpUtil.java | 26 +++--- .../events/MetastoreEventsProcessorTest.java | 2 +- .../org/apache/impala/util/CatalogOpUtilTest.java | 4 +- 15 files changed, 144 insertions(+), 100 deletions(-) diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 5d199174b..4a0356e6e 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -946,8 +946,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase { try { catalogOpExecutor_.addCommittedWriteIdsAndReloadPartitionsIfExist( getEventId(), entry.getKey().getDb(), entry.getKey().getTbl(), - writeIdsForTable, partsForTable, "Processing event id: " + - getEventId() + ", event type: " + getEventType()); + writeIdsForTable, partsForTable, "COMMIT_TXN event " + getEventId()); } catch (TableNotLoadedException e) { debugLog("Ignoring reloading since table {} is not loaded", entry.getKey()); @@ -957,7 +956,8 @@ public class MetastoreShim extends Hive3MetastoreShimBase { } } else { catalog_.reloadTableIfExists(entry.getKey().getDb(), entry.getKey().getTbl(), - "CommitTxnEvent", getEventId(), /*isSkipFileMetadataReload*/false); + "COMMIT_TXN event " + getEventId(), getEventId(), + /*isSkipFileMetadataReload*/false); } } } diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 26d235c46..6e89cdaae 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -358,7 +358,7 @@ public class Analyzer { public static void ensureTableNotTransactional(FeTable table, String operationStr) throws AnalysisException { - if (AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) { + if (AcidUtils.isTransactionalTable(table)) { throw new AnalysisException(String.format(TRANSACTIONAL_TABLE_NOT_SUPPORTED, operationStr, table.getFullName())); } diff --git a/fe/src/main/java/org/apache/impala/analysis/ConvertTableToIcebergStmt.java b/fe/src/main/java/org/apache/impala/analysis/ConvertTableToIcebergStmt.java index b0b5069ba..557c77dc1 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ConvertTableToIcebergStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ConvertTableToIcebergStmt.java @@ -90,8 +90,7 @@ public class ConvertTableToIcebergStmt extends StatementBase { table.getClass().getSimpleName()); } - if (table.getMetaStoreTable().getParameters() != null && - AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) { + if (AcidUtils.isTransactionalTable(table)) { throw new AnalysisException( "CONVERT TO ICEBERG is not supported for transactional tables"); } diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java index 171312ccc..07723289b 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java @@ -174,8 +174,7 @@ public class ResetMetadataStmt extends StatementBase { // Get local table info without reaching out to HMS FeTable table = analyzer.getTable(dbName, tableName_.getTbl(), /* must_exist */ true); - if (AcidUtils.isTransactionalTable( - table.getMetaStoreTable().getParameters())) { + if (AcidUtils.isTransactionalTable(table)) { throw new AnalysisException("Refreshing a partition is not allowed on " + "transactional tables. Try to refresh the whole table instead."); } diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java index fcac20df4..8e2751094 100644 --- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java @@ -286,7 +286,7 @@ public class StmtMetadataLoader { boolean hasAcidTbls = false; for (FeTable iTbl : loadedOrFailedTbls_.values()) { if (iTbl instanceof FeIncompleteTable) continue; - if (AcidUtils.isTransactionalTable(iTbl.getMetaStoreTable().getParameters())) { + if (AcidUtils.isTransactionalTable(iTbl)) { validIdsBuf.append("\n"); validIdsBuf.append(" "); validIdsBuf.append(iTbl.getValidWriteIds().writeToString()); diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index a5f68f6ab..17da4c622 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -631,8 +631,15 @@ public class CatalogServiceCatalog extends Catalog { if (lock.tryLock(0, TimeUnit.SECONDS)) { long duration = System.currentTimeMillis() - begin; if (duration > LOCK_ACQUIRING_DURATION_WARN_MS) { - LOG.warn("{} lock for table {} was acquired in {} msec", - useWriteLock ? "Write" : "Read", tbl.getFullName(), duration); + // Get the caller stacktrace and convert it into string. + StackTraceElement[] stack = Thread.currentThread().getStackTrace(); + // Skip the first method which is always java.lang.Thread.getStackTrace(). + String st = Arrays.stream(stack).skip(1) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n\tat ")); + LOG.warn("{} lock for table {} was acquired in {} msec. " + + "Caller stacktrace: {}", + useWriteLock ? "Write" : "Read", tbl.getFullName(), duration, st); } return true; } @@ -2509,9 +2516,7 @@ public class CatalogServiceCatalog extends Catalog { // In the external front end use case it is possible that an external table might // have validWriteIdList, so we can simply ignore this value if table is external if (isLoaded - && (validWriteIdList == null - || (!AcidUtils.isTransactionalTable( - tbl.getMetaStoreTable().getParameters())))) { + && (validWriteIdList == null || (!AcidUtils.isTransactionalTable(tbl)))) { incrementCatalogDCacheHitMetric(reason); LOG.trace("returning already loaded table {}", tbl.getFullName()); return tbl; @@ -2530,10 +2535,9 @@ public class CatalogServiceCatalog extends Catalog { // calls for the same table. If there are concurrent calls, it is possible we // refresh a partition for multiple times but that doesn't break the table's // consistency because we refresh the file metadata based on the same writeIdList - Preconditions.checkState( - AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters()), - "Compaction id check cannot be done for non-transactional table " - + tbl.getFullName()); + Preconditions.checkState(AcidUtils.isTransactionalTable(tbl), + "Compaction id check cannot be done for non-transactional table %s", + tbl.getFullName()); readLock(tbl, catalogTimeline); try { partsToBeRefreshed = @@ -2623,17 +2627,31 @@ public class CatalogServiceCatalog extends Catalog { // method to not update the table when the updatedTbl has a higher ValidWriteIdList // if we just rely on catalog version comparison which would break the logic to // reload on stale ValidWriteIdList logic. - if (existingTbl == null - || (existingTbl.getCatalogVersion() != expectedCatalogVersion - && (!(existingTbl instanceof HdfsTable) - || AcidUtils.compare((HdfsTable) existingTbl, - updatedTbl.getValidWriteIds(), tableId) - >= 0))) { - LOG.trace("returning existing table {} with last synced id: ", - existingTbl.getFullName(), existingTbl.getLastSyncedEventId()); - return existingTbl; + if (existingTbl == null) { + LOG.info("Not updating table {} since it has been removed", + updatedTbl.getFullName()); + return null; + } + long currentVersion = existingTbl.getCatalogVersion(); + if (!(existingTbl instanceof HdfsTable) + || !AcidUtils.isTransactionalTable(existingTbl)) { + if (currentVersion != expectedCatalogVersion) { + LOG.info("Not updating table {} since it has been modified. Current catalog " + + "version: {}. Expected catalog version: {}", + existingTbl.getFullName(), currentVersion, expectedCatalogVersion); + return existingTbl; + } + } else if (currentVersion != expectedCatalogVersion) { + int cmp = AcidUtils.compare( + (HdfsTable) existingTbl, updatedTbl.getValidWriteIds(), tableId); + if (cmp >= 0) { + LOG.info("Not updating table {} (transactional). Current catalog version: {}." + + " Expected catalog version: {}. Acid compare: {}. Last synced id: {}", + existingTbl.getFullName(), currentVersion, expectedCatalogVersion, cmp, + existingTbl.getLastSyncedEventId()); + return existingTbl; + } } - if (existingTbl instanceof HdfsTable) { // Add the old instance to the deleteLog_ so we can send isDeleted updates for diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java index 9b9bc20cb..6fb4f6399 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java @@ -246,7 +246,7 @@ public abstract class FeCatalogUtils { Preconditions.checkArgument( hmsPartitionValues.size() == table.getNumClusteringCols(), "Cannot parse partition values '%s' for table %s: " + - "expected %d values but got %d", + "expected %s values but got %s", hmsPartitionValues, table.getFullName(), table.getNumClusteringCols(), hmsPartitionValues.size()); List<LiteralExpr> keyValues = new ArrayList<>(); diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 2b1d21a7e..6dd934dfe 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -119,6 +119,8 @@ public class MetastoreEvents { } public enum MetastoreEventType { + // It's intended that some eventTypes have a suffix of "_EVENT". This is consistent + // with Hive so the type matching can use string equal checks. CREATE_TABLE("CREATE_TABLE"), DROP_TABLE("DROP_TABLE"), ALTER_TABLE("ALTER_TABLE"), @@ -135,7 +137,7 @@ public class MetastoreEvents { ALLOC_WRITE_ID_EVENT("ALLOC_WRITE_ID_EVENT"), COMMIT_TXN("COMMIT_TXN"), ABORT_TXN("ABORT_TXN"), - COMMIT_COMPACTION("COMMIT_COMPACTION_EVENT"), + COMMIT_COMPACTION_EVENT("COMMIT_COMPACTION_EVENT"), OTHER("OTHER"); private final String eventType_; @@ -229,7 +231,7 @@ public class MetastoreEvents { return new ReloadEvent(catalogOpExecutor_, metrics, event); case INSERT: return new InsertEvent(catalogOpExecutor_, metrics, event); - case COMMIT_COMPACTION: + case COMMIT_COMPACTION_EVENT: return new CommitCompactionEvent(catalogOpExecutor_, metrics, event); default: // ignore all the unknown events by creating a IgnoredEvent @@ -580,10 +582,12 @@ public class MetastoreEvents { public static abstract class MetastoreEvent { // String.format compatible string to prepend event id and type - private static final String STR_FORMAT_EVENT_ID_TYPE = "EventId: %d EventType: %s "; + private static final String STR_FORMAT_EVENT_ID_TYPE = + "EventId: %d EventType: %s Target: %s. "; // logger format compatible string to prepend to a log formatted message - private static final String LOG_FORMAT_EVENT_ID_TYPE = "EventId: {} EventType: {} "; + private static final String LOG_FORMAT_EVENT_ID_TYPE = + "EventId: {} EventType: {} Target: {}. "; protected static final String CLUSTER_WIDE_TARGET = "CLUSTER_WIDE"; @@ -662,6 +666,10 @@ public class MetastoreEvents { return dbName_ + "." + tblName_; } + public String getEventDesc() { + return eventType_ + " event " + eventId_; + } + /** * Method to inject error randomly for certain events during the processing. * It is used for testing purpose. @@ -802,10 +810,17 @@ public class MetastoreEvents { * Helper method to generate the format args after prepending the event id and type */ private Object[] getLogFormatArgs(Object[] args) { - Object[] formatArgs = new Object[args.length + 2]; + Object[] formatArgs = new Object[args.length + 3]; formatArgs[0] = getEventId(); formatArgs[1] = getEventType(); - int i = 2; + String target = getTargetName(); + // AlterPartitionEvent is on a single partition so we can also log the partition + // name. Add/DropPartitionEvent might have multiple partitions so ignore them here. + if (this instanceof AlterPartitionEvent) { + target += ":" + ((AlterPartitionEvent) this).getPartName(); + } + formatArgs[2] = target; + int i = 3; for (Object arg : args) { formatArgs[i] = arg; i++; @@ -1069,11 +1084,10 @@ public class MetastoreEvents { * Helper function to initiate a table reload on Catalog. Re-throws the exception if * the catalog operation throws. */ - protected boolean reloadTableFromCatalog(String operation, boolean isTransactional) + protected boolean reloadTableFromCatalog(boolean isTransactional) throws CatalogException { try { - if (!catalog_.reloadTableIfExists(dbName_, tblName_, - "Processing " + operation + " event from HMS", getEventId(), + if (!catalog_.reloadTableIfExists(dbName_, tblName_, getEventDesc(), getEventId(), skipFileMetadataReload_)) { debugLog("Automatic refresh on table {} failed as the table " + "either does not exist anymore or is not in loaded state.", @@ -1669,7 +1683,7 @@ public class MetastoreEvents { // forcing file metadata reload so that new files (due to insert) are reflected // HdfsPartition reloadPartitions(Arrays.asList(insertPartition_), - FileMetadataLoadOpts.FORCE_LOAD, "INSERT event", false); + FileMetadataLoadOpts.FORCE_LOAD, getEventDesc(), false); } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh " + "partition on table {} partition {} failed. Event processing cannot " @@ -1686,7 +1700,7 @@ public class MetastoreEvents { // For non-partitioned tables, refresh the whole table. Preconditions.checkState(insertPartition_ == null); try { - boolean notSkipped = reloadTableFromCatalog("INSERT event", false); + boolean notSkipped = reloadTableFromCatalog(false); if (!notSkipped) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } @@ -1845,7 +1859,7 @@ public class MetastoreEvents { // refresh, eg. this could be due to as simple as adding a new parameter or a // full blown adding or changing column type // rename is already handled above - boolean notSkipped = reloadTableFromCatalog("ALTER_TABLE", false); + boolean notSkipped = reloadTableFromCatalog(false); if (!notSkipped) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } @@ -2514,7 +2528,7 @@ public class MetastoreEvents { BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable(); if ((AcidUtils.isTransactionalTable(msTbl_.getParameters()) && !isSelfEvent() && !incrementalRefresh) || MetaStoreUtils.isMaterializedViewTable(msTbl_)) { - boolean notSkipped = reloadTableFromCatalog("ADD_PARTITION", true); + boolean notSkipped = reloadTableFromCatalog(true); if (!notSkipped) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } @@ -2564,6 +2578,7 @@ public class MetastoreEvents { private final String serviceIdFromEvent_; // true if this alter event was due to a truncate operation in metastore private final boolean isTruncateOp_; + private final String partName_; /** * Prevent instantiation from outside should use MetastoreEventFactory instead @@ -2590,12 +2605,16 @@ public class MetastoreEvents { MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1")); serviceIdFromEvent_ = MetastoreEvents.getStringProperty( parameters, MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), ""); + partName_ = HdfsTable.constructPartitionName(getTPartitionSpecFromHmsPartition( + msTbl_, partitionAfter_)); } catch (Exception e) { throw new MetastoreNotificationException( debugString("Unable to parse the alter partition message"), e); } } + public String getPartName() { return partName_; } + @Override protected MetastoreEventType getBatchEventType() { return MetastoreEventType.ALTER_PARTITIONS; @@ -2673,8 +2692,6 @@ public class MetastoreEvents { } else { // Refresh the partition that was altered. Preconditions.checkNotNull(partitionAfter_); - List<TPartitionKeyValue> tPartSpec = getTPartitionSpecFromHmsPartition(msTbl_, - partitionAfter_); try { // load file metadata only if storage descriptor of partitionAfter_ differs // from sd of HdfsPartition. If the alter_partition event type is of truncate @@ -2683,13 +2700,11 @@ public class MetastoreEvents { isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD : FileMetadataLoadOpts.LOAD_IF_SD_CHANGED; reloadPartitions(Arrays.asList(partitionAfter_), fileMetadataLoadOpts, - "ALTER_PARTITION event", false); + getEventDesc(), false); } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException( - debugString("Refresh partition on table {} partition {} failed. Event " + - "processing cannot continue. Issue an invalidate command to reset " + - "the event processor state.", getFullyQualifiedTblName(), - HdfsTable.constructPartitionName(tPartSpec)), e); + debugString("Refresh partition on table {} partition {} failed.", + getFullyQualifiedTblName(), partName_), e); } } } @@ -2726,9 +2741,9 @@ public class MetastoreEvents { BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable(); if (incrementalRefresh) { reloadPartitionsFromEvent(Collections.singletonList(partitionAfter_), - "ALTER_PARTITION EVENT FOR TRANSACTIONAL TABLE"); + getEventDesc() + " FOR TRANSACTIONAL TABLE"); } else { - boolean notSkipped = reloadTableFromCatalog("ALTER_PARTITION", true); + boolean notSkipped = reloadTableFromCatalog(true); if (!notSkipped) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } @@ -2787,6 +2802,15 @@ public class MetastoreEvents { return batchedEvents_.get(batchedEvents_.size() - 1).getEventTime(); } + @Override + public String getEventDesc() { + if (batchedEvents_.isEmpty()) return "empty event batch"; + int batch = batchedEvents_.size(); + // E.g. ALTER_PARTITIONS 4 events from 76697 to 76703 + return String.format("%s %d events from %d to %d", getEventType(), batch, + batchedEvents_.get(0).getEventId(), batchedEvents_.get(batch - 1).getEventId()); + } + /** * * @param event The event under consideration to be batched into this event. It can @@ -2844,7 +2868,7 @@ public class MetastoreEvents { // Reload the whole table if it's a transactional table. if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) { - boolean notSkipped = reloadTableFromCatalog(getEventType().toString(), true); + boolean notSkipped = reloadTableFromCatalog(true); if (!notSkipped) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) .inc(eventsToProcess.size() + partitionEventsToForceReload.size()); @@ -2859,21 +2883,20 @@ public class MetastoreEvents { if (baseEvent_ instanceof InsertEvent) { // for insert event, always reload file metadata so that new files // are reflected in HdfsPartition - reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD, - getEventType().toString() + " event", true); + reloadPartitions(partitions, FileMetadataLoadOpts.FORCE_LOAD, getEventDesc(), + true); } else { if (!partitionEventsToForceReload.isEmpty()) { // force reload truncated partitions reloadPartitions(partitionEventsToForceReload, - FileMetadataLoadOpts.FORCE_LOAD, getEventType().toString() - + " event", true); + FileMetadataLoadOpts.FORCE_LOAD, getEventDesc() + " FOR TRUNCATE", + true); } if (!partitions.isEmpty()) { // alter partition event. Reload file metadata of only those partitions // for which sd has changed reloadPartitions(partitions, - FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, getEventType().toString() - + " event", true); + FileMetadataLoadOpts.LOAD_IF_SD_CHANGED, getEventDesc(), true); } } } catch (CatalogException e) { @@ -2973,7 +2996,7 @@ public class MetastoreEvents { BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable(); if ((AcidUtils.isTransactionalTable(msTbl_.getParameters()) && !incrementalRefresh) || MetaStoreUtils.isMaterializedViewTable(msTbl_)) { - boolean notSkipped = reloadTableFromCatalog("DROP_PARTITION", true); + boolean notSkipped = reloadTableFromCatalog(true); if (!notSkipped) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } @@ -3169,7 +3192,7 @@ public class MetastoreEvents { // forcing file metadata reload so that new files (due to refresh) are reflected // HdfsPartition reloadPartitions(Arrays.asList(reloadPartition_), - FileMetadataLoadOpts.FORCE_LOAD, "RELOAD event", false); + FileMetadataLoadOpts.FORCE_LOAD, getEventDesc(), false); } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException(debugString("Refresh " + "partition on table {} partition {} failed. Event processing cannot " @@ -3187,7 +3210,7 @@ public class MetastoreEvents { Preconditions.checkState(reloadPartition_ == null); try { // we always treat the table as non-transactional so all the files are reloaded - boolean notSkipped = reloadTableFromCatalog("RELOAD event", false); + boolean notSkipped = reloadTableFromCatalog(false); if (!notSkipped) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } @@ -3316,8 +3339,8 @@ public class MetastoreEvents { } /** - * Metastore event handler for COMMIT_COMPACTION events. Handles - * COMMIT_COMPACTION event for transactional tables. + * Metastore event handler for COMMIT_COMPACTION_EVENT events. Handles + * COMMIT_COMPACTION_EVENT event for transactional tables. */ public static class CommitCompactionEvent extends MetastoreTableEvent { private String partitionName_; @@ -3326,7 +3349,7 @@ public class MetastoreEvents { NotificationEvent event) throws MetastoreNotificationException { super(catalogOpExecutor, metrics, event); Preconditions.checkState( - getEventType().equals(MetastoreEventType.COMMIT_COMPACTION)); + getEventType().equals(MetastoreEventType.COMMIT_COMPACTION_EVENT)); Preconditions.checkNotNull(event.getMessage()); try { partitionName_ = @@ -3340,13 +3363,13 @@ public class MetastoreEvents { protected void processTableEvent() throws MetastoreNotificationException { try { if (partitionName_ == null) { - boolean notSkipped = reloadTableFromCatalog("Commit Compaction event", true); + boolean notSkipped = reloadTableFromCatalog(true); if (!notSkipped) { metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc(); } } else { reloadPartitionsFromNames(Arrays.asList(partitionName_), - "Commit compaction event", FileMetadataLoadOpts.FORCE_LOAD); + getEventDesc(), FileMetadataLoadOpts.FORCE_LOAD); } } catch (CatalogException e) { throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to " diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java index 4928e4b2b..79cf8c305 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java @@ -458,7 +458,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { // For ACID tables, events may include commit_txn and abort_txn which doesn't have // db_name and table_name. So it makes sense to fetch all the events and filter // them in catalogD. - if (AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) { + if (AcidUtils.isTransactionalTable(tbl)) { metaDataFilter = new MetaDataFilter(getTableNotificationEventFilter(tbl)); } else { metaDataFilter = new MetaDataFilter(getTableNotificationEventFilter(tbl), diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index a71be6644..eab8ce765 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -2717,8 +2717,7 @@ public class CatalogOpExecutor { params.getTable_name().getTable_name(), "Load for DROP STATS", catalogTimeline); Preconditions.checkNotNull(table); // There is no transactional HMS API to drop stats at the moment (HIVE-22104). - Preconditions.checkState(!AcidUtils.isTransactionalTable( - table.getMetaStoreTable().getParameters())); + Preconditions.checkState(!AcidUtils.isTransactionalTable(table)); tryWriteLock(table, "dropping stats", catalogTimeline); InProgressTableModification modification = null; @@ -3128,7 +3127,7 @@ public class CatalogOpExecutor { Table tbl = catalog_.getTableIfCachedNoThrow(tableName.getDb(), tableName.getTbl()); long lockId = -1; if (tbl != null && !(tbl instanceof IncompleteTable) && - AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) { + AcidUtils.isTransactionalTable(tbl)) { HeartbeatContext ctx = new HeartbeatContext( String.format("Drop table/view %s.%s", tableName.getDb(), tableName.getTbl()), System.nanoTime()); @@ -3381,7 +3380,7 @@ public class CatalogOpExecutor { try { long newCatalogVersion = 0; try { - if (AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) { + if (AcidUtils.isTransactionalTable(table)) { newCatalogVersion = truncateTransactionalTable(params, table, lockMaxWaitTime, catalogTimeline); } else if (table instanceof FeIcebergTable) { @@ -5429,7 +5428,7 @@ public class CatalogOpExecutor { Table tbl, List<Partition> partitions, Map<String, Long> partitionToEventId, boolean ifNotExists, EventSequence catalogTimeline, String debugAction) throws ImpalaException { - if (!AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) { + if (!AcidUtils.isTransactionalTable(tbl)) { if (DebugUtils.hasDebugAction(debugAction, DebugUtils.ENABLE_EVENT_PROCESSOR)) { catalog_.startEventsProcessor(); } @@ -6374,6 +6373,7 @@ public class CatalogOpExecutor { Map<String, Long> partitionToEventId = Maps.newHashMap(); String annotation = String.format("Recovering %d partitions for %s", hmsPartitions.size(), tbl.getFullName()); + LOG.info(annotation); if (DebugUtils.hasDebugAction(debugAction, DebugUtils.ENABLE_EVENT_PROCESSOR)) { catalog_.startEventsProcessor(); } @@ -7095,9 +7095,7 @@ public class CatalogOpExecutor { CatalogObject.ThriftObjectType.FULL; if (isTableLoadedInCatalog) { if (req.isSetPartition_spec()) { - boolean isTransactional = AcidUtils.isTransactionalTable( - tbl.getMetaStoreTable().getParameters()); - Preconditions.checkArgument(!isTransactional); + Preconditions.checkArgument(!AcidUtils.isTransactionalTable(tbl)); Reference<Boolean> wasPartitionRefreshed = new Reference<>(false); // TODO if the partition was not really refreshed because the partSpec // was wrong, do we still need to send back the table? @@ -7667,8 +7665,7 @@ public class CatalogOpExecutor { long writeId = tblTxn == null ? -1: tblTxn.writeId; // If the table is transaction table we should generate a transactional // insert event type. This would show up in HMS as an ACID_WRITE event. - boolean isTransactional = AcidUtils.isTransactionalTable(table.getMetaStoreTable() - .getParameters()); + boolean isTransactional = AcidUtils.isTransactionalTable(table); if (isTransactional) { Preconditions.checkState(txnId > 0, "Invalid transaction id %s for table %s", txnId, table.getFullName()); @@ -7792,8 +7789,7 @@ public class CatalogOpExecutor { InsertEventRequestData insertEventRequestData = new InsertEventRequestData( Lists.newArrayListWithCapacity( newFiles.size())); - boolean isTransactional = AcidUtils - .isTransactionalTable(tbl.getMetaStoreTable().getParameters()); + boolean isTransactional = AcidUtils.isTransactionalTable(tbl); // in case of unpartitioned table, partVals will be empty boolean isPartitioned = !partVals.isEmpty(); if (isPartitioned) { diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 40c7df2a7..63a1151ea 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -2806,8 +2806,7 @@ public class Frontend { (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())) { InsertStmt insertStmt = analysisResult.getInsertStmt(); FeTable targetTable = insertStmt.getTargetTable(); - if (AcidUtils.isTransactionalTable( - targetTable.getMetaStoreTable().getParameters())) { + if (AcidUtils.isTransactionalTable(targetTable)) { if (planCtx.compilationState_.getWriteId() == -1) { // 1st time compilation. Open a transaction and save the writeId. @@ -3367,8 +3366,7 @@ public class Frontend { throws TransactionException { Preconditions.checkState(queryCtx.isSetTransaction_id()); Preconditions.checkState(table instanceof FeFsTable); - Preconditions.checkState( - AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())); + Preconditions.checkState(AcidUtils.isTransactionalTable(table)); try (MetaStoreClient client = metaStoreClientPool_.getClient()) { IMetaStoreClient hmsClient = client.getHiveClient(); long txnId = queryCtx.getTransaction_id(); @@ -3392,13 +3390,12 @@ public class Frontend { FeTable targetTable, boolean isOverwrite, String staticPartitionTarget, TQueryOptions queryOptions) throws TransactionException { - Preconditions.checkState( - AcidUtils.isTransactionalTable(targetTable.getMetaStoreTable().getParameters())); + Preconditions.checkState(AcidUtils.isTransactionalTable(targetTable)); List<LockComponent> lockComponents = new ArrayList<>(tables.size()); List<FeTable> lockTables = new ArrayList<>(tables); if (!lockTables.contains(targetTable)) lockTables.add(targetTable); for (FeTable table : lockTables) { - if (!AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) { + if (!AcidUtils.isTransactionalTable(table)) { continue; } LockComponent lockComponent = new LockComponent(); diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java index fccf06bc9..1a3451ae8 100644 --- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java +++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse; import org.apache.impala.catalog.CatalogException; import org.apache.impala.catalog.CatalogServiceCatalog; import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FileDescriptor; import org.apache.impala.catalog.FileMetadataLoader.LoadStats; import org.apache.impala.catalog.HdfsPartition; @@ -128,6 +129,11 @@ public class AcidUtils { equalsIgnoreCase(transactionalProp); } + public static boolean isTransactionalTable(FeTable table) { + Map<String, String> parameters = table.getMetaStoreTable().getParameters(); + return parameters != null && isTransactionalTable(parameters); + } + public static boolean isTransactionalTable(Map<String, String> props) { Preconditions.checkNotNull(props); String tableIsTransactional = props.get(TABLE_IS_TRANSACTIONAL); @@ -705,7 +711,7 @@ public class AcidUtils { long tableId) { Preconditions.checkState(tbl != null && tbl.getMetaStoreTable() != null); // if tbl is not a transactional, there is nothing to compare against and we return 0 - if (!isTransactionalTable(tbl.getMetaStoreTable().getParameters())) return 0; + if (!isTransactionalTable(tbl)) return 0; Preconditions.checkNotNull(tbl.getValidWriteIds()); // if the provided table id does not match with what CatalogService has we return // -1 indicating that cached table is stale. diff --git a/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java b/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java index 0b96a3ecb..be7ee3e5e 100644 --- a/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java +++ b/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java @@ -20,12 +20,15 @@ package org.apache.impala.util; import org.apache.impala.analysis.ColumnName; import org.apache.impala.analysis.FunctionName; import org.apache.impala.analysis.TableName; +import org.apache.impala.thrift.TAlterTableParams; import org.apache.impala.thrift.TCommentOnParams; import org.apache.impala.thrift.TDdlExecRequest; import org.apache.impala.thrift.TResetMetadataRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.impala.analysis.TableName.thriftToString; + public class CatalogOpUtil { private static final Logger LOG = LoggerFactory.getLogger(CatalogOpUtil.class); @@ -39,25 +42,26 @@ public class CatalogOpUtil { case ALTER_DATABASE: target = req.getAlter_db_params().getDb(); break; - case ALTER_TABLE: - target = TableName.thriftToString(req.getAlter_table_params().getTable_name()); + case ALTER_TABLE: { + TAlterTableParams params = req.getAlter_table_params(); + target = thriftToString(params.getTable_name()) + " " + params.getAlter_type(); break; + } case ALTER_VIEW: - target = TableName.thriftToString(req.getAlter_view_params().getView_name()); + target = thriftToString(req.getAlter_view_params().getView_name()); break; case CREATE_DATABASE: target = req.getCreate_db_params().getDb(); break; case CREATE_TABLE_AS_SELECT: case CREATE_TABLE: - target = TableName.thriftToString(req.getCreate_table_params().getTable_name()); + target = thriftToString(req.getCreate_table_params().getTable_name()); break; case CREATE_TABLE_LIKE: - target = TableName.thriftToString( - req.getCreate_table_like_params().getTable_name()); + target = thriftToString(req.getCreate_table_like_params().getTable_name()); break; case CREATE_VIEW: - target = TableName.thriftToString(req.getCreate_view_params().getView_name()); + target = thriftToString(req.getCreate_view_params().getView_name()); break; case CREATE_FUNCTION: target = FunctionName.thriftToString( @@ -68,7 +72,7 @@ public class CatalogOpUtil { if (params.isSetDb()) { target = "DB " + params.getDb(); } else if (params.isSetTable_name()) { - target = "TABLE " + TableName.thriftToString(params.getTable_name()); + target = "TABLE " + thriftToString(params.getTable_name()); } else if (params.isSetColumn_name()) { target = "COLUMN " + ColumnName.thriftToString(params.getColumn_name()); } else { @@ -77,18 +81,18 @@ public class CatalogOpUtil { break; } case DROP_STATS: - target = TableName.thriftToString(req.getDrop_stats_params().getTable_name()); + target = thriftToString(req.getDrop_stats_params().getTable_name()); break; case DROP_DATABASE: target = req.getDrop_db_params().getDb(); break; case DROP_TABLE: case DROP_VIEW: - target = TableName.thriftToString( + target = thriftToString( req.getDrop_table_or_view_params().getTable_name()); break; case TRUNCATE_TABLE: - target = TableName.thriftToString(req.getTruncate_params().getTable_name()); + target = thriftToString(req.getTruncate_params().getTable_name()); break; case DROP_FUNCTION: target = FunctionName.thriftToString(req.getDrop_fn_params().fn_name); diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index b1080d9dd..47a200cc1 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -3957,7 +3957,7 @@ public class MetastoreEventsProcessorTest { // Process commit compaction event should skip the event because // DISABLE_EVENT_HMS_SYNC is set to true event = filteredEvents.get(filteredEvents.size() - 1); - assertEquals(MetastoreEventType.COMMIT_COMPACTION, event.getEventType()); + assertEquals(MetastoreEventType.COMMIT_COMPACTION_EVENT, event.getEventType()); long skipCount = eventsProcessor_.getMetrics() .getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC) .getCount(); diff --git a/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java b/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java index ab1759054..823fd6de1 100644 --- a/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java +++ b/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java @@ -27,6 +27,7 @@ import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TAlterDbParams; import org.apache.impala.thrift.TAlterDbType; import org.apache.impala.thrift.TAlterTableParams; +import org.apache.impala.thrift.TAlterTableType; import org.apache.impala.thrift.TBackendGflags; import org.apache.impala.thrift.TColumnName; import org.apache.impala.thrift.TCommentOnParams; @@ -139,8 +140,9 @@ public class CatalogOpUtilTest { req.setDdl_type(TDdlType.ALTER_TABLE); TAlterTableParams alterTableParams = new TAlterTableParams(); alterTableParams.setTable_name(tblName); + alterTableParams.setAlter_type(TAlterTableType.RECOVER_PARTITIONS); req.setAlter_table_params(alterTableParams); - assertEquals("ALTER_TABLE db1.tbl1 issued by unknown user", + assertEquals("ALTER_TABLE db1.tbl1 RECOVER_PARTITIONS issued by unknown user", CatalogOpUtil.getShortDescForExecDdl(req)); req.setDdl_type(TDdlType.CREATE_VIEW);
