k.venureddy2...@gmail.com has posted comments on this change. ( http://gerrit.cloudera.org:8080/21031 )
Change subject: IMPALA-12709: Add support for hierarchical metastore event processing ...................................................................... Patch Set 38: (36 comments) http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG@42 PS34, Line 42: Once a DbE > nit: Unnecessary white spaces. Simply justify left. Done http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG@54 PS34, Line 54: Once a Tab > nit: Unnecessary white spaces. Simply justify left. Done http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG@63 PS34, Line 63: Following new events are added: > Oh, please add Java unit test to assert behavior of these new event type in Done http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG@63 PS34, Line 63: Following new events are added: > nit: It will be nice to have a shared interface for derived event kind so w Done. Have defined a marker interface and used it. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@996 PS36, Line 996: public static class PseudoCommitTxnEvent extends MetastoreTableEvent > Just a thought: our code can be cleaner if we can resolve IMPALA-13285 firs Shall i take it up as next PR? http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1060 PS36, Line 1060: List<PseudoCommitTxnEvent> pseudoEvents = new ArrayList<>(); : List<WriteEventInfo> writeEventInfoList; : try (MetaStoreClientPool.MetaStoreClient client = event.g > What happen if client failed to talk with MetaStore here? Have refactored a bit to make this same as without hierarchical event processing. Failures here are handled according to the commit txn event's onFailure(). Upon failure, tables involved are invalidated iff invalidate_metadata_on_event_processing_failure is true and canInvalidateTable() returns true. Otherwise, automatically global invalidates if invalidate_global_metadata_on_event_processing_failure is true. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1095 PS36, Line 1095: // Form pseudo events : List<Long> finalWriteIds = writeIds; : for (Map.Entry<TableName, Li > The msTbl object in catalog might be stale. Shouldn't we get it from the HM In the above loop we take table from the committxn message(write operation happened). But not required in this loop here. This is the case when no write operation has actually happened(i.e., writeEventInfoList is null or empty). It is to just update the catalog table writeIds on table's validWriteIds from open to committed status. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1102 PS36, Line 1102: : entry.get > Why event processing can not continue? Can you narrow down the error messag Have changed the error message. Failures here are handled according to the commit txn event's onFailure(). Upon failure, tables involved are invalidated iff invalidate_metadata_on_event_processing_failure is true and canInvalidateTable() returns true. Otherwise, automatically global invalidates if invalidate_global_metadata_on_event_processing_failure is true. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBBarrierEvent.java File fe/src/main/java/org/apache/impala/catalog/events/DBBarrierEvent.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBBarrierEvent.java@54 PS36, Line 54: > nit: we tend to use "Db" in names, i.e. "DbBarrierEvent". E.g. isBlackliste Done http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@73 PS34, Line 73: > nit: // Executor name, like "DBEventExecutor-0". Done http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@111 PS34, Line 111: > nit: "Lock contention should not occur at most frequent call sites " Done http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@166 PS34, Line 166: > nit: "Lock contention should not occur at most frequent call sites " Done http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@244 PS34, Line 244: : > This is not addressed yet. Have used isUsable() method at all places instead of checking usable_ variable directly because it is easy to find all references/usages. IMO, it is ok to do it since reentrant locking do have not involve much overhead. http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@438 PS34, Line 438: : : > This is not addressed yet. inEvents_ and barrierEvents_ are of type ConcurrentLinkedQueue(thread safe).They are not explicitly locked on each access and update. inEvents_ accessed or updated in: DbProcessor#enqueue(), DbProcessor#canBeRemoved(), DbProcessor#clear(), DbProcessor#process() barrierEvents_ accessed or updated in: DbProcessor#canBeRemoved() DbProcessor#clear() DbProcessor#dispatchTableEvent(), DbProcessor#dispatchDbEvent() DbProcessor#processDbEvents() Note: ConcurrentLinkedQueue.clear() is not atomic. It is called from DbProcessor#clear() thread. Assume DbProcessor#process() has read an event(i.e., inEvents_.peek() loop) and passed DbProcessor#isUsable() checks(i.e., DbProcessor is still usable). And just before executing inEvents_.poll() or dbEventExecutor_.decrOutstandingEventCount() statement in processing thread, DbEventExecutor#clear() thread has invoked DbProcessor#clear() and this clear() may be completed(case-2) or still happening(case-1). case-1: If this clear is still happening, it may be in-between or cleared the inEvents_ or barrierEvents_(i.e., ConcurrentLinkedQueue.clear() call is happening/completed. Means, part of events are removed or completely removed). case-2: Or may be DbProcessor#clear() invocation is finished and DbEventExecutor#clear() has cleared all DbProcessor and set outstandingEventCount_ to 0. Now, assume that processing thread becomes active after above case-2, and calls dbEventExecutor_.decrOutstandingEventCount(). It tries to decrement again. Similar situation can arise with case-1 as well. Didn't want to add explicit locking of inEvents_ or barrierEvents_ since it affects performance. Also it is not useful to add the explicit lock just for this purpose. Thats why decrOutstandingEventCount is handled this way. In the normal processing without clear() operation, outstandingEventCount_ increment and decrement happens whenever element is added(offer) and removed(poll) respectively. http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@515 PS34, Line 515: > This should be Precondition at the end of method rather than assignment. Please check the above comment's reply. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@149 PS36, Line 149: > nit: maybe "barrierEvents_" is clearer. At the beginning, I thought it's al Done http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@226 PS36, Line 226: > nit: we prefer only using C-style comments /* */ in class or method comment Done http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@248 PS36, Line 248: : > nit: put them in one line or add {} Done http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@374 PS36, Line 374: > It seems we should also skip non-rename events if the CREATE_DATABASE event Correct. Need to skip all the db events and table events except rename table, drop table and drop db events when drop db event is queued behind. For the above sequence, assuming all these events are queued together at same time on the dbEventExecutor. 1. We skip the create database, create table, insert table events. 2. Alter table rename(with drop table) is queued to tableEventExecutor for processing. 3. Drop database tmp is processed on dbEventExecutor(processDbEvents) and increments EVENTS_SKIPPED_METRIC since db doesn't exist. Have refactored code. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java File fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java@49 PS36, Line 49: * Pauses the event processing. Use <code>start(fromEventId)</code> method below to : * restart the event processing : */ : void pause(); : : /** : * Starts the event processing from the given eventId. This method can be used to jump : * ahead in the event processing under certain cases where it is okay skip certain : * events : */ : void start(l > Please contrast this against pause(). Why we need hold() if we have pause() Removed hold and resume now. Making use of :event_processor('pause') and :event_processor('start') added with IMPALA-12785. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java@61 PS36, Line 61: /** : * Clears the event processor if it has any pending events to process in its pipeline : */ : void clear(); : : /** > Please contrast this against stop(). Why we need resume if we have stop()? Removed hold and resume now. Making use of :event_processor('pause') and :event_processor('start') added with IMPALA-12785. http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java: http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@588 PS34, Line 588: protected static final String CLUSTER_WIDE_TARGET = "CLUSTER_WIDE"; > This is not addresses yet. It is not added with this patch. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@705 PS36, Line 705: pIntervalM > nit: checkArgument() might be better since it throws IllegalArgumentExcepti Done http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@899 PS36, Line 899: rride > Why pause() translate to eventExecutorService_.clear()? Before IMPALA-12785, pause() was called only from catalog reset(). So have called eventExecutorService_.clear() in pause() to clear all the pending events available on event executor service(i.e., to discard pending events on DbEventExecutors and TableEventExecutors) on reset. But with IMPALA-12785, pause() can be invoked with command(:event_processor('pause') to just pause event processor. So, have removed eventExecutorService_.clear() from pause() and called it from reset() now. Upon reset catalogd rebuilds the cache. Even if subset of events at pseudo event are processed during clear, reset brings the cache to consistent state. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1273 PS36, Line 1273: public TEventProcessorMetrics getEventProcessorMetrics() { : TEventProcessorMetrics ev > Ideally we can maintain lastSyncedEventId as min(event_id of all unprocesse Ack. Filed a jira https://issues.apache.org/jira/browse/IMPALA-13801 http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java File fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@72 PS36, Line 72: nt pr > RenameEventState is more descriptive. Renamed class to RenameEventState. IMHO, we do not need to synchronize(or hold monitor lock of RenameEventState object). This is a case of single thread writes to the variable and other thread only read the variable. Making the variable volatile is enough to ensure memory visibility. There is not need for lock. dropProcessed_ and dropSkipped_ flags are updated only on drop event processing thread(i.e., only on one thread) and createSkipped_ and createProcessed_ flags are updated only on create event processing thread(i.e., only on one thread). We just need the state of drop event processing(i.e., dropProcessed_ and dropSkipped_ flags) to be visibile on create event processing thread. So making dropProcessed_ and dropSkipped_ as volatile is enough. Added test for RenameEventState to verify the order of pseudo events processing(i.e., drop table followed by create table). http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@109 PS36, Line 109: aram skipped > Add Precondition that prevent True variable from turning back to False. Done. Have removed processed argument as we always need to pass true. And also added a precondition to ensure setProcessed is called only once on pseudo drop and create event processing threads. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@113 PS36, Line 113: Precon > Add Precondition in else block that eventType is CREATE_TABLE. Done http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@157 PS36, Line 157: * Determines whether this event can be processed. TableProcessor uses this method to : * check if the event can be processed before processing it. : * @return True if the event can be processed. False otherwise : */ > This should be synchronized methods inside State object, like setProcessed( IMHO, just keeping dropProcessed_ and dropSkipped_ flags as volatile is enough since they are updated only on drop event processing thread and read on create event processing thread. Have added unit test for it now. http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@56 PS34, Line 56: private final Ma > nit: // Executor name, like "DBEventExecutor-0.TableEventExecutor-0". Done http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@312 PS34, Line 312: */ : private void decrOutstandingEventCount() { : synchronize > This is not addressed yet. events_ is of type ConcurrentLinkedQueue(thread safe).It is not explicitly locked on each access and update. events_ is accessed or updated in: TableProcessor#isEmpty(), TableProcessor#canBeRemoved(), TableProcessor#enqueue(), TableProcessor#process() TableEventExecutor#deleteTableProcessor() Note: ConcurrentLinkedQueue.clear() is not atomic. It can be called from DbProcessor#clear() thread. Similar situation arises here as in DbProcessor and DbEventExecutor. Assume TableProcessor#process() has read an event(i.e., events_.peek() loop) and passed usable_ check(i.e., TableProcessor is still usable). And just before executing events_.poll() or tableEventExecutor_.decrOutstandingEventCount() statement in processing thread, DbEventExecutor#clear() thread has invoked DbProcessor#clear() which in turn invokes tableEventExecutor#deleteTableProcessor() in turn calls events_.clear(). This clear may be completed(case-2) or still happening(case-1). case-1: If this clear is still happening, it may be in-between or cleared the events_(i.e., ConcurrentLinkedQueue.clear() call is happening/completed. Means, part of events_ are removed or completely removed). case-2: Assume DbEventExecutor#clear() has cleared all DbProcessor, set outstandingEventCount_ to 0 and completed tableEventExecutors_.forEach(TableEventExecutor::clear) as well. Now, assume that processing thread becomes active after above case-2, and calls tableEventExecutor_.decrOutstandingEventCount(). It tries to decrement again. Similar situation can arise with case-1 as well. Didn't want to add explicit locking of events_ since it affects performance. Also it is not useful to add the explicit lock just for this purpose. Thats why decrOutstandingEventCount is handled this way. In the normal processing without clear() operation, outstandingEventCount_ increment and decrement happens whenever element is added(offer) and removed(poll) respectively. http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@333 PS34, Line 333: } > This is not addressed yet. Please check the above comment's reply. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@78 PS36, Line 78: p > nit: just needs one space indention Done http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@158 PS36, Line 158: if (event instanceof DbBarrierEvent) { > Should we also consider the DropTable kind of RenameTableBarrierEvent? Yes. Done. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java File fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java@51 PS36, Line 51: ad.sleep(10); > getOutstandingEventCount() is always valid to call, whether hierarchical ev Correct. Removed explicit hierarchical processing enabled check now. http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java File fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java: http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java@106 PS36, Line 106: 10000L > Why pollingFrequencyInMilliSec increased to 10s here? It was 10sec earlier too. But constructor takes time in millisec now. -- To view, visit http://gerrit.cloudera.org:8080/21031 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6 Gerrit-Change-Number: 21031 Gerrit-PatchSet: 38 Gerrit-Owner: Anonymous Coward <k.venureddy2...@gmail.com> Gerrit-Reviewer: Anonymous Coward <cclive1...@gmail.com> Gerrit-Reviewer: Anonymous Coward <k.venureddy2...@gmail.com> Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com> Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Gerrit-Reviewer: Quanlong Huang <huangquanl...@gmail.com> Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com> Gerrit-Reviewer: Sai Hemanth Gantasala <saihema...@cloudera.com> Gerrit-Comment-Date: Wed, 26 Feb 2025 18:42:48 +0000 Gerrit-HasComments: Yes