Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/21031 )
Change subject: IMPALA-12709: Add support for hierarchical metastore event processing ...................................................................... Patch Set 42: (32 comments) http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml File docs/topics/impala_metadata.xml: http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@307 PS39, Line 307: n 1.0 to 5.0 second > Done Done http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@308 PS39, Line 308: <codeph>--enable_hierarchical_event_processing</codeph> flag set to : <codeph>true</codeph> > There is no restriction to config hms_event_polling_interval_s in ms, even Done http://gerrit.cloudera.org:8080/#/c/21031/42/docs/topics/impala_metadata.xml File docs/topics/impala_metadata.xml: http://gerrit.cloudera.org:8080/#/c/21031/42/docs/topics/impala_metadata.xml@307 PS42, Line 307: If <codeph>catalogd</codeph> is : being started with <codeph>--enable_hierarchical_event_processing</codeph> flag set to : <codeph>true</codeph>, hms event polling interval can be set to a value less than 1.0 second. Consider dropping this sentence. I'm worried if user will set it to subsecond without understanding the implication. Yes, hierarchical event processor might be performant, but doing so might lead to unintended HMS perf degradation (too much request per seconds). Previous sentence should be sufficient to tell that 1.0 is a good enough polling period. http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1115 PS39, Line 1115: getTable(entr > Done Done http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@444 PS39, Line 444: : @VisibleForTesting : List<TableEventExecutor> getTableEventExecutors() { : ret > Have modified following: Done http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@188 PS42, Line 188: private final Object usableLock_ = new Object(); This lock also protect inEvents_.poll(), barrierEvents_.poll(), and dbEventExecutor_.decrOutstandingEventCount() besides of usable_ flag. I also find this lock confusing. It says that usableLock_ only protect usable_ flag. But there are many places where this lock is held far longer (say, dispatchTableEvent, or dispatchDbEvent). In contrary, clear() relase the lock sooner and proceed with mutating many states without holding it. Please update the docs, clarifying when usableLock_ must be held. http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@459 PS42, Line 459: * Increments outstanding event count for the DbEventExecutor. Please document when this method must be called. http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@468 PS42, Line 468: private void decrOutstandingEventCount() { Remove this method and use the overload with 'delta' parameter instead. http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@477 PS42, Line 477: Preconditions.checkState(outstandingEventCount_.addAndGet(-delta) >= 0); Add error message. Preconditions.checkState(outstandingEventCount_.get() == 0, "outstandingEventCount is negative after decrement."); http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@529 PS42, Line 529: unAssignEventExecutor(dbProcessor.dbName_); Should this always happen if 'force' is True? canBeRemoved() may return False if this clause evaluate to False. (System.currentTimeMillis() - lastEventQueuedTime_) > BackendConfig.INSTANCE.getMinEventProcessorIdleMs() http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@548 PS42, Line 548: Preconditions.checkState(outstandingEventCount_.get() == 0); Add error message. Preconditions.checkState(outstandingEventCount_.get() == 0, "outstandingEventCount is non-zero after clear."); Should this be done at the end of method? http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java File fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@266 PS38, Line 266: : NotificationEvent pseudoDropTableEvent = new NotificationEvent(); : pseudoDropTableEvent.setEven > findSuitableEventExecutor() is renamed to getOrFindDbEventExecutor(). It is Thank you for your explanation. I think I understand the decision now, because DbEventExecutor can remove the mapping by itself if cleanIfNecessary(false) is called. And the mapping should only be set after DbProcessor for dbName is instantiated, which happen exclusively inside DbeventExecutor.enqueue(). The fact that dispatch() is also a synchronized method make it safe that DbeventExecutor.getOrFindDbEventExecutor() and DbeventExecutor.enqueue() called independently within the same synchronization scope. http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java File fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@102 PS39, Line 102: @VisibleForTesting > Done. Have added Preconditions.checkState(status_ != EventExecutorStatus.ST Done http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@196 PS39, Line 196: reNotif > Done Done http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@312 PS39, Line 312: * @return DbEventExecutor if ma > Done Done http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@327 PS39, Line 327: bEventExecutor > Done. It actually do not create DbEventExecutor. So have changed it to getO Ack http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java File fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java: http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@72 PS42, Line 72: // Database name to DbEventExecutor map : private final Map<String, DbEventExecutor> dbNameToEventExecutor_ = : new ConcurrentHashMap<>(); Please document that This map should be mutated only by DbeventExecutor through DbEventExecutor.assignEventExecutor() and DbEventExecutor.unAssignEventExecutor(). http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@133 PS42, Line 133: void clear() { I think dispatch(), shutdown(), and clear() should all be a synchronized method. I suspect that usable_ and usableLock_ in many places can be simplified/eliminated if this code guarantee that dispatch(), shutdown(), clear() can not run concurrently. For example, DbEventExecutor.clear() does not need to call setUsable(false) followed by setUsable(true). http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@3316 PS39, Line 3316: Adding {} aborted write ids > Yes it can be empty. tableWriteIds_.size() is shown in LOG - "Adding {} abo Done http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1583 PS39, Line 1583: public synchronized void updateStatus(EventProcessorStatus toStatus) { : eventProcessorStatus_ = toStatus; : } > If 2 executors concurrently invoke handleEventProcessException() due to fai If both NEEDS_INVALIDATE and ERROR is handled the same way, why bother distinguishing them as different enum val? What if invalidate_global_metadata_on_event_processing_failure is False? The docs seems to suggest that NEEDS_INVALIDATE and ERROR has different meaning/severity, and user may need to resolve it differently, especially if invalidate_global_metadata_on_event_processing_failure is False. https://impala.apache.org/docs/build/html/topics/impala_metadata.html But I agree with you that the current code treat both status the same way. The difference is only about whether MetastoreNotificationNeedsInvalidateException was raised or not. Is such difference important for user/admin to pay attention to? In any case, please update the docs by mentioning that both NEEDS_INVALIDATE and ERROR can be solved by issuing global Invalidate Metadata query. http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java: http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1231 PS42, Line 1231: a invalidate nit: an invalidate http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@316 PS39, Line 316: return tableProcessors_.size(); : } : : /** > Have modified following: Done http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@125 PS42, Line 125: private final Object usableLock_ = new Object(); This lock also protect events_.poll() and tableEventExecutor_.decrOutstandingEventCount() besides of usable_ flag. Please update the doc accordingly. http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@328 PS42, Line 328: * Increments outstanding event count for the TableEventExecutor. Please document when this method must be called. http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@337 PS42, Line 337: private void decrOutstandingEventCount() { Remove this method and use the overload with 'delta' parameter instead. http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@346 PS42, Line 346: Preconditions.checkState(outstandingEventCount_.addAndGet(-delta) >= 0); Add error message. Preconditions.checkState(outstandingEventCount_.get() == 0, "outstandingEventCount is negative after decrement."); http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@364 PS42, Line 364: Preconditions.checkState(outstandingEventCount_.get() == 0); Add error message. Preconditions.checkState(outstandingEventCount_.get() == 0, "outstandingEventCount is non-zero after clear."); http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java File fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@125 PS38, Line 125: public void cleanUp() { : assertEquals(MetastoreEventsProces > Done. Have added the checks in shutDownEventExecutorService(EventExecutorSe Done http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py File tests/custom_cluster/test_event_processing_perf.py: http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@32 PS39, Line 32: LOG = logging.getLogger(__name__) > Have used pytest.mark.skipif() now. Also have added multiprocessing for all Done http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@270 PS39, Line 270: def __del__(self): : # Invoked only for main thread : LOG.info("Deleting for thread %s", self.name) : : def __enter__(self): : # Invoked only for main thread > Done. Have added multiprocessing for all the queries that run on different Done http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@343 PS39, Line 343: insert_into_part_time = None > Done. Have added teardown_method now. Done http://gerrit.cloudera.org:8080/#/c/21031/42/tests/custom_cluster/test_event_processing_perf.py File tests/custom_cluster/test_event_processing_perf.py: http://gerrit.cloudera.org:8080/#/c/21031/42/tests/custom_cluster/test_event_processing_perf.py@43 PS42, Line 43: db_count = 5 : table_count = 5 : partition_count = 500 : insert_nonpartition_values_count = 100 : insert_nonpartition_repeat_count = 5 Consider lowering this test dimension so that we can regularly run this test in exhaustive test run. You can also leave a comment lines with ideal/big dimension values that developers can uncomment to run the test manually. -- To view, visit http://gerrit.cloudera.org:8080/21031 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6 Gerrit-Change-Number: 21031 Gerrit-PatchSet: 42 Gerrit-Owner: Anonymous Coward <k.venureddy2...@gmail.com> Gerrit-Reviewer: Anonymous Coward <cclive1...@gmail.com> Gerrit-Reviewer: Anonymous Coward <k.venureddy2...@gmail.com> Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Quanlong Huang <huangquanl...@gmail.com> Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com> Gerrit-Reviewer: Sai Hemanth Gantasala <saihema...@cloudera.com> Gerrit-Comment-Date: Thu, 10 Apr 2025 20:00:39 +0000 Gerrit-HasComments: Yes