Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/21031 )
Change subject: IMPALA-12709: Add support for hierarchical metastore event processing ...................................................................... Patch Set 38: (23 comments) I think this conclude my review for patch set 38. http://gerrit.cloudera.org:8080/#/c/21031/38/be/src/common/global-flags.cc File be/src/common/global-flags.cc: http://gerrit.cloudera.org:8080/#/c/21031/38/be/src/common/global-flags.cc@288 PS38, Line 288: : DEFINE_double(hms_event_polling_interval_s, 1, Have we discuss why we're changing this to allow subsecond? If admin mistakenly set subsecond value in production env, it can bombard HMS with too much requests per second and cause potential issue. We'll need to update documentation everywhere too. Instead, should we add DEFINE_int32_hidden(hms_event_polling_interval_ms flag used for test only? I don't mind changing this to double (next version will be a major Impala 5 anyway), but please update the related docs in docs/topics/impala_metadata.xml. http://gerrit.cloudera.org:8080/#/c/21031/38/common/thrift/metrics.json File common/thrift/metrics.json: http://gerrit.cloudera.org:8080/#/c/21031/38/common/thrift/metrics.json@3656 PS38, Line 3656: "key" : "events-processor.outstanding-event-count" nit: Will be interesting to add a 2 more granular counter: events-processor.outstanding-db-event-count events-processor.outstanding-table-event-count Probably only make sense if enable_hierarchical_event_processing=true http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java File fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java@660 PS38, Line 660: /** : * CDP Hive-3 only function. This is a dummy implementation for Apache Hive-3. : */ : public static class PseudoCommitTxnEvent extends MetastoreTableEvent : implements DerivedMetastoreEvent { Can you add comment, what is the first Apache Hive version where this PseudoCommitTxnEvent becomes relevant (not dummy anymore)? Is it Apache Hive 4? http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@71 PS38, Line 71: // Database name to DbEventExecutor assignment : private static final Map<String, DbEventExecutor> dbNameToEventExecutor = : new ConcurrentHashMap<>(); Can this be a protected field inside EventExecutorService? DbEventExecutor can simply have reference to its parent EventExecutorService to access EventExecutorService.dbNameToEventExecutor. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java File fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@44 PS38, Line 44: stop I don't see stop method. Do you mean shutdown()? It will be great if you have comment or diagram describing the lifecycle of EventExecutorService. It is helpful because ExternalEventsProcessor methods does not always have direct 1-on-1 mapping with ExternalEventsProcessor interface. For example, There is ExternalEventsProcessor.pause(), but there is no EventExecutorService.pause(). Is it true that ExternalEventsProcessor.pause() must be called first before calling EventExecutorService.clear()? http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@52 PS38, Line 52: private boolean isActive_ = false; Can isActive_ flip-flop between false -> true -> false -> true? How does this relate with MetastoreEventsProcessor.eventProcessorStatus_ enum? Is there any prohibited combo between them? http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@85 PS38, Line 85: /** : * Clear the EventExecutorService. : */ : void clear() { Can clear() called anytime, even when MetastoreEventsProcessor still dispatching new event to process() ? Please clarify in method comment. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@92 PS38, Line 92: /** : * Shutdown the EventExecutorService. : */ : void shutdown(boolean graceful) { can I call start() again after shutdown(). Please clarify if comment of both methods. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@132 PS38, Line 132: * For the events involving multiple tables(commit and abort transaction), : * pseudo-events are created for each table so that processing happens independently for : * each table. Based on your reply in https://gerrit.cloudera.org/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java#899 Please add comment on what is the expected behavior if clear() is called when only subset of pseudo-events have been processed. What I understand is, it is OK since EventExecutorService.clear() is only called via CatalogServiceCatalog.reset(), and it has subsequent routine to bring back event processor to good initial state? http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@266 PS38, Line 266: DbEventExecutor eventExecutor = DbEventExecutor.getEventExecutor( : dbName.toLowerCase()); : if (eventExecutor == null) { If dbNameToEventExecutor moved into this class, you can do dbNameToEventExecutor.computeIfAbsent() to simplify this. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@3282 PS38, Line 3282: tableWriteIds_ = catalog_.removeWriteIds(txnId_); nit: Just to confirm, is this meant to be pass by reference (sharing object), or create a new set (new HashSet<>(tableWriteIds_)) ? If it is the former and not the latter, why need separate tableWriteIds_ field? http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@899 PS36, Line 899: rride > Before IMPALA-12785, pause() was called only from catalog reset(). So have Thanks. I have follow up question in patch set 38. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@973 PS38, Line 973: /** : * Starts the event processor from a given event id : */ : @Override : public synchronized void start(long fromEventId) { What happen with this method if eventExecutorService_ != null? I do not see an equivalent method in EventExecutorService. Can dispatched skip forward to fromEventId? http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1173 PS38, Line 1173: : public void handleEventProcessException(Exception ex) { > Please add comment for this method. Especially, please state what is the expected follow up to do if this method is reached. Should the whole events still in queue ignored and removed? http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1251 PS38, Line 1251: // TODO: Need to redefine the lag in the hierarchical processing mode. In the meantime, can you print different WARN log that has more information related to hierarchical mode, if it is enabled? http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1424 PS38, Line 1424: eventExecutorService_ != null This check is repeated in multiple places. Please create a dedicated method for it. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1437 PS38, Line 1437: "Processing if isHierarchical is True, where are we printing the "Processing" log? How do we know if dispatched event is being / has been processed. http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1484 PS38, Line 1484: // TODO: eventProcessingTime here is more relevant in sequential processing mode. But, : // in case of hierarchical processing mode, these are the times taken at dispatcher : // thread alone for each event. Need to have another mechanism to capture the actual : // event processing time in hierarchical processing mode. : LOG.info("Time elapsed in processing event batch: {}", : PrintUtils.printTimeNs(elapsedNs)); So will this LOG.info line lies if hierarchical mode enabled? Should we print different LOG.info if hierarchical mode is enabled? Maybe "Time elapsed in dispatching event batch: {}"? http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/MetastoreApiTestUtils.java File fe/src/test/java/org/apache/impala/catalog/MetastoreApiTestUtils.java: http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/MetastoreApiTestUtils.java@196 PS38, Line 196: : public static void simulateInsertIntoTransactionalTableFromFS(MetaStoreClient msClient, : org.apache.hadoop.hive.metastore.api.Table msTbl, Partition partition, : int totalNumberOfFilesToAdd, long txnId, long writeId) throws IOException { Please add comment what is this method doing. http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py File tests/custom_cluster/test_event_processing_perf.py: http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py@34 PS38, Line 34: class TestEventProcessingPerf(CustomClusterTestSuite): How long does it takes to run the whole TestEventProcessingPerf? http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py@53 PS38, Line 53: --hms_event_polling_interval_s=0.2 Wherever you set very low hms_event_polling_interval_s to speedup tests, I think you also need to speedup statestore_heartbeat_frequency_ms and statestore_priority_update_frequency_ms for faster state propagation. See https://gerrit.cloudera.org/c/22400/ for reference. http://gerrit.cloudera.org:8080/#/c/21031/38/tests/custom_cluster/test_event_processing_perf.py@228 PS38, Line 228: def __run_event_processing_tests(self, case, exec_fn, is_transactional, is_partitioned): Please write comment on what this method does. All tests looks like they're running this same method but with slightly different parameters. Maybe elaborate too in TestEventProcessingPerf class comment. http://gerrit.cloudera.org:8080/#/c/21031/38/tests/util/event_processor_utils.py File tests/util/event_processor_utils.py: http://gerrit.cloudera.org:8080/#/c/21031/38/tests/util/event_processor_utils.py@60 PS38, Line 60: and outstanding_events == 0 Is this only relevant if hierarchical mode is enabled? Should wait_for_synced_event_id have optional expected_outstanding_event parameter (default to None) to decide whether to wait until outstanding_events == expected_outstanding_event, or not? -- To view, visit http://gerrit.cloudera.org:8080/21031 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6 Gerrit-Change-Number: 21031 Gerrit-PatchSet: 38 Gerrit-Owner: Anonymous Coward <k.venureddy2...@gmail.com> Gerrit-Reviewer: Anonymous Coward <cclive1...@gmail.com> Gerrit-Reviewer: Anonymous Coward <k.venureddy2...@gmail.com> Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Quanlong Huang <huangquanl...@gmail.com> Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com> Gerrit-Reviewer: Sai Hemanth Gantasala <saihema...@cloudera.com> Gerrit-Comment-Date: Sat, 01 Mar 2025 21:27:11 +0000 Gerrit-HasComments: Yes