k.venureddy2...@gmail.com has posted comments on this change. ( http://gerrit.cloudera.org:8080/21031 )
Change subject: IMPALA-12709: Add support for hierarchical metastore event processing ...................................................................... Patch Set 42: (19 comments) Reworked http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml File docs/topics/impala_metadata.xml: http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@306 PS39, Line 306: in > nit: remove trailing whitespace here and others. Done http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@307 PS39, Line 307: n 1.0 to 5.0 second > I think it is better to say "between 1.0 to 5.0 seconds" Done http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@308 PS39, Line 308: <codeph>--enable_hierarchical_event_processing</codeph> flag set to : <codeph>true</codeph> > Why subsecond hms_event_polling_interval_s is not allowed if enable_hierarc There is no restriction to config hms_event_polling_interval_s in ms, even if hierarchical event processing is not enabled. With enable_hierarchical_event_processing, MetastoreEventsProcessor#processEvents() became just event dispatcher. So can be can be configured to lower value as long as the executor threads are processing the incoming events and keeping the outstanding event count within the max_outstanding_events_on_executors limit. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java File fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java@660 PS38, Line 660: /** : * CDP Hive-3 only function. This is a dummy implementation till the CommitTxnEvent : * dummy implementation class defined in this file becomes actual implementation. Need : * to change when CommitTxnEvent implementation is supported with IMPALA-13285. : */ > Understood. Please mention the JIRA number, IMPALA-13285, in the comment. Done http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1060 PS36, Line 1060: public static List<PseudoCommitTxnEvent> getPseudoCommitTxnEvents(CommitTxnEvent event) : throws MetastoreNotificationException { : List<PseudoCommitTxnEvent> pseudoEvents = new ArrayList<> > Please clarify help message for invalidate_metadata_on_event_processing_fai Done http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1095 PS36, Line 1095: tableNameToIdxs.computeIfAbsent(tableName, k -> new ArrayList<>()).add(i); : } : // Form list of PseudoCommit > Checked the usage of this msTbl. We just use the dbName, tableName, numOfPa Done http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1115 PS39, Line 1115: getTable(entr > Spell it out. "PseudoCommitTxnEvent". Done http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@444 PS39, Line 444: : @VisibleForTesting : List<TableEventExecutor> getTableEventExecutors() { : ret > I'm sorry, I know that I have asked this before: Have modified following: 1.Decrement the outstandingEventCount_ in DbProcessor#clear() as well just before clearing queues. 2. Have added Preconditions.checkState at following places: 1. Here in decrOutstandingEventCount. 2. After polling the event from the queues(inEvents_ and barrierEvents_) 3. clearInternal() asserts whether outstandingEventCount_ is 0. 4. Have used AtomicLong for outstandingEventCount_. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java File fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@266 PS38, Line 266: : NotificationEvent pseudoDropTableEvent = new NotificationEvent(); : pseudoDropTableEvent.setEven > I think the assignment should happen here. findSuitableEventExecutor() is renamed to getOrFindDbEventExecutor(). It is called from dispatch(). And dispatch() is synchronized method. Concurrent calls do not happen. Lets assume assignment happens here. If EventExecutorService#dispatch() and EventExecutorService#clear() are invoked concurrenly. During dispatch() execution, right after EventExecutorService#getOrFindDbEventExecutor() and before DbEventExecutor#enqueue() invocation, it might happen that assignment is removed in EventExecutorService#clear(). Then the dbName to DbEventExecutor mapping is lost by the time we call DbEventExecutor#enqueue() on respective Db event executor. Thats why assigning dbName to DbEventExecutor mapping in enqueue(). DbEventExecutor is not usable during the time when DbEventExecutor#clear() is executing. It becomes usable right before returning from clear(). And, it is also not usable when DbEventExecutor#stop() is invoked. In this case, it doesn't become usable again. Checking usable in assignment is not required due to following reasons: 1. If a DbEventExecutor is stopped, it the case where all the remaining DbEventExecutors are either stopped or getting stopped. So it is ok ignore the event in DbEventExecutor#enqueue() if DbEventExecutor is not usable. 2. If a DbEventExecutor is cleared, metastore event processor is being reset. Catalog cache will be cleared. All the DbEventExecutors are either cleared or getting cleared. So it is ok ignore the event in DbEventExecutor#enqueue() if DbEventExecutor is not usable. 3. Requires to acquire usable lock to check if DbEventExecutor is usable for each DbEventExecutor. Instead we can ignore the event in DbEventExecutor#enqueue() if DbEventExecutor is not usable. http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java File fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@102 PS39, Line 102: @VisibleForTesting > Must validate and test legal transition here. Done. Have added Preconditions.checkState(status_ != EventExecutorStatus.STOPPED); We need an initial default state(INACTIVE). start() is not allowed if status_ is STOPPED. So STOPPED cannot be default. And ACTIVE cannot be default before start(). http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@196 PS39, Line 196: reNotif > Is it better to name this 'dispatch' rather than 'process'? Done http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@312 PS39, Line 312: * @return DbEventExecutor if ma > I think 'getDbEventExecutor' is clearer. Please add @Nullable annotation. Done http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@327 PS39, Line 327: bEventExecutor > I think 'getOrCreateDbEventExecutor' is clearer. Done. It actually do not create DbEventExecutor. So have changed it to getOrFindDbEventExecutor(). http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@3316 PS39, Line 3316: Adding {} aborted write ids > Please log txnId_ as well. Yes it can be empty. tableWriteIds_.size() is shown in LOG - "Adding {} aborted write ids for txn id {}". I think, it is ok. It is not misleading. Have added txnId_ in the log now. We also have debug logs in CatalogServiceCatalog#addWriteIdsToTable() at the entry of method("Trying to add {} write ids {} to table {}.{} for event {}") and also have log whether have added or not at all the possible paths in the method. http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@316 PS39, Line 316: return tableProcessors_.size(); : } : : /** > Similar comment like DBEventExecutor.decrOutstandingEventCount(). Have modified following: 1. Decrement the outstandingEventCount_ in deleteTableProcessor() as well just before clearing queue. 2. Have added Preconditions.checkState at following places: 1. Here in decrOutstandingEventCount. 2. After polling the event from events_. 3. clear() asserts whether outstandingEventCount_ is 0. 3. Have used AtomicLong for outstandingEventCount_. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java File fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@125 PS38, Line 125: public void cleanUp() { : assertEquals(MetastoreEventsProces > Make sense. Should not do it in cleanUp. Done. Have added the checks in shutDownEventExecutorService(EventExecutorService eventExecutorService) before shutting down the EventExecutorService. List<MetastoreEvent> metastoreEvents = eventsProcessor_.getEventsFactory() .getFilteredEvents(eventsProcessor_.getNextMetastoreEvents(), eventsProcessor_.getMetrics()); assertTrue(metastoreEvents.size() == 0); assertTrue(eventsProcessor_.getOutstandingEventCount() == 0); http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py File tests/custom_cluster/test_event_processing_perf.py: http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@32 PS39, Line 32: LOG = logging.getLogger(__name__) > Please write this under SkipIf, something along this line: Have used pytest.mark.skipif() now. Also have added multiprocessing for all the queries that run on different dbs and tables at the same time. http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@270 PS39, Line 270: def __del__(self): : # Invoked only for main thread : LOG.info("Deleting for thread %s", self.name) : : def __enter__(self): : # Invoked only for main thread > Consider multithreading this (and maybe other relevant queries), with 1 Imp Done. Have added multiprocessing for all the queries that run on different dbs and tables at the same time. This stage table is just single instance and it is used to create partitions/insert into existing partitions for tables in different dbs. So these 2 queries(create stage table and inserting some data to stage table) are executed as it is now. http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@343 PS39, Line 343: insert_into_part_time = None > Please make sure that DROP DATABASE CASCADE happen even if assertion failur Done. Have added teardown_method now. -- To view, visit http://gerrit.cloudera.org:8080/21031 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6 Gerrit-Change-Number: 21031 Gerrit-PatchSet: 42 Gerrit-Owner: Anonymous Coward <k.venureddy2...@gmail.com> Gerrit-Reviewer: Anonymous Coward <cclive1...@gmail.com> Gerrit-Reviewer: Anonymous Coward <k.venureddy2...@gmail.com> Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Quanlong Huang <huangquanl...@gmail.com> Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com> Gerrit-Reviewer: Sai Hemanth Gantasala <saihema...@cloudera.com> Gerrit-Comment-Date: Thu, 10 Apr 2025 16:44:22 +0000 Gerrit-HasComments: Yes