k.venureddy2...@gmail.com has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/21031 )

Change subject: IMPALA-12709: Add support for hierarchical metastore event 
processing
......................................................................


Patch Set 38:

(36 comments)

http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG@42
PS34, Line 42: Once a DbE
> nit: Unnecessary white spaces. Simply justify left.
Done


http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG@54
PS34, Line 54: Once a Tab
> nit: Unnecessary white spaces. Simply justify left.
Done


http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG@63
PS34, Line 63: Following new events are added:
> Oh, please add Java unit test to assert behavior of these new event type in
Done


http://gerrit.cloudera.org:8080/#/c/21031/34//COMMIT_MSG@63
PS34, Line 63: Following new events are added:
> nit: It will be nice to have a shared interface for derived event kind so w
Done. Have defined a marker interface and used it.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@996
PS36, Line 996:   public static class PseudoCommitTxnEvent extends 
MetastoreTableEvent
> Just a thought: our code can be cleaner if we can resolve IMPALA-13285 firs
Shall i take it up as next PR?


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1060
PS36, Line 1060:     List<PseudoCommitTxnEvent> pseudoEvents = new 
ArrayList<>();
               :     List<WriteEventInfo> writeEventInfoList;
               :     try (MetaStoreClientPool.MetaStoreClient client = event.g
> What happen if client failed to talk with MetaStore here?
Have refactored a bit to make this same as without hierarchical event 
processing. Failures here are handled according to the commit txn event's 
onFailure(). Upon failure, tables involved are invalidated iff 
invalidate_metadata_on_event_processing_failure is true and 
canInvalidateTable() returns true. Otherwise, automatically global invalidates 
if invalidate_global_metadata_on_event_processing_failure is true.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1095
PS36, Line 1095:         // Form pseudo events
               :         List<Long> finalWriteIds = writeIds;
               :         for (Map.Entry<TableName, Li
> The msTbl object in catalog might be stale. Shouldn't we get it from the HM
In the above loop we take table from the committxn message(write operation 
happened). But not required in this loop here. This is the case when no write 
operation has actually happened(i.e., writeEventInfoList is null or empty). It 
is to just update the catalog table writeIds on table's validWriteIds from open 
to committed status.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1102
PS36, Line 1102:
               :               entry.get
> Why event processing can not continue? Can you narrow down the error messag
Have changed the error message.
Failures here are handled according to the commit txn event's onFailure(). Upon 
failure, tables involved are invalidated iff 
invalidate_metadata_on_event_processing_failure is true and 
canInvalidateTable() returns true. Otherwise, automatically global invalidates 
if invalidate_global_metadata_on_event_processing_failure is true.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBBarrierEvent.java
File fe/src/main/java/org/apache/impala/catalog/events/DBBarrierEvent.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBBarrierEvent.java@54
PS36, Line 54:
> nit: we tend to use "Db" in names, i.e. "DbBarrierEvent". E.g. isBlackliste
Done


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@73
PS34, Line 73:
> nit: // Executor name, like "DBEventExecutor-0".
Done


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@111
PS34, Line 111:
> nit: "Lock contention should not occur at most frequent call sites "
Done


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@166
PS34, Line 166:
> nit: "Lock contention should not occur at most frequent call sites "
Done


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@244
PS34, Line 244:
              :
> This is not addressed yet.
Have used isUsable() method at all places instead of checking usable_ variable 
directly because it is easy to find all references/usages. IMO, it is ok to do 
it since reentrant locking do have not involve much overhead.


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@438
PS34, Line 438:
              :
              :
> This is not addressed yet.
inEvents_ and barrierEvents_ are of type ConcurrentLinkedQueue(thread 
safe).They are not explicitly locked on each access and update.

inEvents_ accessed or updated in:
DbProcessor#enqueue(), DbProcessor#canBeRemoved(), DbProcessor#clear(), 
DbProcessor#process()

barrierEvents_ accessed or updated in:
DbProcessor#canBeRemoved()
DbProcessor#clear()
DbProcessor#dispatchTableEvent(), DbProcessor#dispatchDbEvent()
DbProcessor#processDbEvents()

Note: ConcurrentLinkedQueue.clear() is not atomic. It is called from 
DbProcessor#clear() thread.

Assume DbProcessor#process() has read an event(i.e., inEvents_.peek() loop) and 
passed DbProcessor#isUsable() checks(i.e., DbProcessor is still usable).
And just before executing inEvents_.poll() or 
dbEventExecutor_.decrOutstandingEventCount() statement in processing thread, 
DbEventExecutor#clear() thread has invoked DbProcessor#clear() and this clear() 
may be completed(case-2) or still happening(case-1).

case-1: If this clear is still happening, it may be in-between or cleared the 
inEvents_ or barrierEvents_(i.e., ConcurrentLinkedQueue.clear() call is 
happening/completed. Means, part of events are removed or completely removed).

case-2: Or may be DbProcessor#clear() invocation is finished and 
DbEventExecutor#clear() has cleared all DbProcessor and set 
outstandingEventCount_ to 0.

Now, assume that processing thread becomes active after above case-2, and calls 
dbEventExecutor_.decrOutstandingEventCount(). It tries to decrement again. 
Similar situation can arise with case-1 as well.

Didn't want to add explicit locking of inEvents_ or barrierEvents_ since it 
affects performance. Also it is not useful to add the explicit lock just for 
this purpose. Thats why decrOutstandingEventCount is handled this way.

In the normal processing without clear() operation, outstandingEventCount_ 
increment and decrement happens whenever element is added(offer) and 
removed(poll) respectively.


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@515
PS34, Line 515:
> This should be Precondition at the end of method rather than assignment.
Please check the above comment's reply.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@149
PS36, Line 149:
> nit: maybe "barrierEvents_" is clearer. At the beginning, I thought it's al
Done


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@226
PS36, Line 226:
> nit: we prefer only using C-style comments /* */ in class or method comment
Done


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@248
PS36, Line 248:
              :
> nit: put them in one line or add {}
Done


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/DBEventExecutor.java@374
PS36, Line 374:
> It seems we should also skip non-rename events if the CREATE_DATABASE event
Correct. Need to skip all the db events and table events except rename table, 
drop table and drop db events when drop db event is queued behind.

For the above sequence, assuming all these events are queued together at same 
time on the dbEventExecutor.
1. We skip the create database, create table, insert table events.
2. Alter table rename(with drop table) is queued to tableEventExecutor for 
processing.
3. Drop database tmp is processed on dbEventExecutor(processDbEvents) and 
increments EVENTS_SKIPPED_METRIC since db doesn't exist.

Have refactored code.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java
File 
fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java@49
PS36, Line 49:    * Pauses the event processing. Use 
<code>start(fromEventId)</code> method below to
             :    * restart the event processing
             :    */
             :   void pause();
             :
             :   /**
             :    * Starts the event processing from the given eventId. This 
method can be used to jump
             :    * ahead in the event processing under certain cases where it 
is okay skip certain
             :    * events
             :    */
             :   void start(l
> Please contrast this against pause(). Why we need hold() if we have pause()
Removed hold and resume now. Making use of :event_processor('pause') and 
:event_processor('start') added with IMPALA-12785.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/ExternalEventsProcessor.java@61
PS36, Line 61:   /**
             :    * Clears the event processor if it has any pending events to 
process in its pipeline
             :    */
             :   void clear();
             :
             :   /**
> Please contrast this against stop(). Why we need resume if we have stop()?
Removed hold and resume now. Making use of :event_processor('pause') and 
:event_processor('start') added with IMPALA-12785.


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java:

http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@588
PS34, Line 588: protected static final String CLUSTER_WIDE_TARGET = 
"CLUSTER_WIDE";
> This is not addresses yet.
It is not added with this patch.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
File 
fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@705
PS36, Line 705: pIntervalM
> nit: checkArgument() might be better since it throws IllegalArgumentExcepti
Done


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@899
PS36, Line 899: rride
> Why pause() translate to eventExecutorService_.clear()?
Before IMPALA-12785, pause() was called only from catalog reset(). So have 
called eventExecutorService_.clear() in pause() to clear all the pending events 
available on event executor service(i.e., to discard pending events on 
DbEventExecutors and TableEventExecutors) on reset. But with IMPALA-12785, 
pause() can be invoked with command(:event_processor('pause') to just pause 
event processor.

So, have removed eventExecutorService_.clear() from pause() and called it from 
reset() now. Upon reset catalogd rebuilds the cache. Even if subset of events 
at pseudo event are processed during clear, reset brings the cache to 
consistent state.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1273
PS36, Line 1273:   public TEventProcessorMetrics getEventProcessorMetrics() {
               :     TEventProcessorMetrics ev
> Ideally we can maintain lastSyncedEventId as min(event_id of all unprocesse
Ack. Filed a jira https://issues.apache.org/jira/browse/IMPALA-13801


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java
File 
fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@72
PS36, Line 72: nt pr
> RenameEventState is more descriptive.
Renamed class to RenameEventState.

IMHO, we do not need to synchronize(or hold monitor lock of RenameEventState 
object). This is a case of single thread writes to the variable and other 
thread only read the variable. Making the variable volatile is enough to ensure 
memory visibility. There is not need for lock.
dropProcessed_ and dropSkipped_ flags are updated only on drop event processing 
thread(i.e., only on one thread) and createSkipped_ and createProcessed_ flags 
are updated only on create event processing thread(i.e., only on one thread). 
We just need the state of drop event processing(i.e., dropProcessed_ and 
dropSkipped_ flags) to be visibile on create event processing thread. So making 
dropProcessed_ and dropSkipped_ as volatile is enough.
Added test for RenameEventState to verify the order of pseudo events 
processing(i.e., drop table followed by create table).


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@109
PS36, Line 109: aram skipped
> Add Precondition that prevent True variable from turning back to False.
Done. Have removed processed argument as we always need to pass true. And also 
added a precondition to ensure setProcessed is called only once on pseudo drop 
and create event processing threads.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@113
PS36, Line 113:    Precon
> Add Precondition in else block that eventType is CREATE_TABLE.
Done


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/RenameTableBarrierEvent.java@157
PS36, Line 157:    * Determines whether this event can be processed. 
TableProcessor uses this method to
              :    * check if the event can be processed before processing it.
              :    * @return True if the event can be processed. False otherwise
              :    */
> This should be synchronized methods inside State object, like setProcessed(
IMHO, just keeping dropProcessed_ and dropSkipped_ flags as volatile is enough 
since they are updated only on drop event processing thread and read on create 
event processing thread. Have added unit test for it now.


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@56
PS34, Line 56:   private final Ma
> nit: // Executor name, like "DBEventExecutor-0.TableEventExecutor-0".
Done


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@312
PS34, Line 312:    */
              :   private void decrOutstandingEventCount() {
              :     synchronize
> This is not addressed yet.
events_ is of type ConcurrentLinkedQueue(thread safe).It is not explicitly 
locked on each access and update.

events_ is accessed or updated in:
TableProcessor#isEmpty(), TableProcessor#canBeRemoved(), 
TableProcessor#enqueue(), TableProcessor#process()
TableEventExecutor#deleteTableProcessor()

Note: ConcurrentLinkedQueue.clear() is not atomic. It can be called from 
DbProcessor#clear() thread.

Similar situation arises here as in DbProcessor and DbEventExecutor.

Assume TableProcessor#process() has read an event(i.e., events_.peek() loop) 
and passed usable_ check(i.e., TableProcessor is still usable).
And just before executing events_.poll() or 
tableEventExecutor_.decrOutstandingEventCount() statement in processing thread, 
DbEventExecutor#clear() thread has invoked DbProcessor#clear() which in turn 
invokes tableEventExecutor#deleteTableProcessor() in turn calls 
events_.clear(). This clear may be completed(case-2) or still happening(case-1).

case-1: If this clear is still happening, it may be in-between or cleared the 
events_(i.e., ConcurrentLinkedQueue.clear() call is happening/completed. Means, 
part of events_ are removed or completely removed).

case-2: Assume DbEventExecutor#clear() has cleared all DbProcessor, set 
outstandingEventCount_ to 0 and completed 
tableEventExecutors_.forEach(TableEventExecutor::clear) as well.

Now, assume that processing thread becomes active after above case-2, and calls 
tableEventExecutor_.decrOutstandingEventCount(). It tries to decrement again. 
Similar situation can arise with case-1 as well.

Didn't want to add explicit locking of events_ since it affects performance. 
Also it is not useful to add the explicit lock just for this purpose. Thats why 
decrOutstandingEventCount is handled this way.

In the normal processing without clear() operation, outstandingEventCount_ 
increment and decrement happens whenever element is added(offer) and 
removed(poll) respectively.


http://gerrit.cloudera.org:8080/#/c/21031/34/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@333
PS34, Line 333:   }
> This is not addressed yet.
Please check the above comment's reply.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@78
PS36, Line 78:  p
> nit: just needs one space indention
Done


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@158
PS36, Line 158:       if (event instanceof DbBarrierEvent) {
> Should we also consider the DropTable kind of RenameTableBarrierEvent?
Yes. Done.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
File 
fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java@51
PS36, Line 51: ad.sleep(10);
> getOutstandingEventCount() is always valid to call, whether hierarchical ev
Correct. Removed explicit hierarchical processing enabled check now.


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java
File 
fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsSyncToLatestEventIdTest.java@106
PS36, Line 106: 10000L
> Why pollingFrequencyInMilliSec increased to 10s here?
It was 10sec earlier too. But constructor takes time in millisec now.



--
To view, visit http://gerrit.cloudera.org:8080/21031
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6
Gerrit-Change-Number: 21031
Gerrit-PatchSet: 38
Gerrit-Owner: Anonymous Coward <k.venureddy2...@gmail.com>
Gerrit-Reviewer: Anonymous Coward <cclive1...@gmail.com>
Gerrit-Reviewer: Anonymous Coward <k.venureddy2...@gmail.com>
Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <huangquanl...@gmail.com>
Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com>
Gerrit-Reviewer: Sai Hemanth Gantasala <saihema...@cloudera.com>
Gerrit-Comment-Date: Wed, 26 Feb 2025 18:42:48 +0000
Gerrit-HasComments: Yes

Reply via email to