Riza Suminto has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/21031 )

Change subject: IMPALA-12709: Add support for hierarchical metastore event 
processing
......................................................................


Patch Set 42:

(32 comments)

http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml
File docs/topics/impala_metadata.xml:

http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@307
PS39, Line 307: n 1.0 to 5.0 second
> Done
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@308
PS39, Line 308: <codeph>--enable_hierarchical_event_processing</codeph> flag 
set to
              :         <codeph>true</codeph>
> There is no restriction to config hms_event_polling_interval_s in ms, even
Done


http://gerrit.cloudera.org:8080/#/c/21031/42/docs/topics/impala_metadata.xml
File docs/topics/impala_metadata.xml:

http://gerrit.cloudera.org:8080/#/c/21031/42/docs/topics/impala_metadata.xml@307
PS42, Line 307: If <codeph>catalogd</codeph> is
              :         being started with 
<codeph>--enable_hierarchical_event_processing</codeph> flag set to
              :         <codeph>true</codeph>, hms event polling interval can 
be set to a value less than 1.0 second.
Consider dropping this sentence.
I'm worried if user will set it to subsecond without understanding the 
implication. Yes, hierarchical event processor might be performant, but doing 
so might lead to unintended HMS perf degradation (too much request per 
seconds). Previous sentence should be sufficient to tell that 1.0 is a good 
enough polling period.


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1115
PS39, Line 1115: getTable(entr
> Done
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@444
PS39, Line 444:
              :   @VisibleForTesting
              :   List<TableEventExecutor> getTableEventExecutors() {
              :     ret
> Have modified following:
Done


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@188
PS42, Line 188: private final Object usableLock_ = new Object();
This lock also protect inEvents_.poll(), barrierEvents_.poll(), and 
dbEventExecutor_.decrOutstandingEventCount() besides of usable_ flag.

I also find this lock confusing. It says that usableLock_ only protect usable_ 
flag. But there are many places where this lock is held far longer (say, 
dispatchTableEvent, or dispatchDbEvent). In contrary, clear() relase the lock 
sooner and proceed with mutating many states without holding it.

Please update the docs, clarifying when usableLock_ must be held.


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@459
PS42, Line 459:    * Increments outstanding event count for the DbEventExecutor.
Please document when this method must be called.


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@468
PS42, Line 468: private void decrOutstandingEventCount() {
Remove this method and use the overload with 'delta' parameter instead.


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@477
PS42, Line 477: 
Preconditions.checkState(outstandingEventCount_.addAndGet(-delta) >= 0);
Add error message.

  Preconditions.checkState(outstandingEventCount_.get() == 0, 
"outstandingEventCount is negative after decrement.");


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@529
PS42, Line 529: unAssignEventExecutor(dbProcessor.dbName_);
Should this always happen if 'force' is True?
canBeRemoved() may return False if this clause evaluate to False.

  (System.currentTimeMillis() - lastEventQueuedTime_) >
              BackendConfig.INSTANCE.getMinEventProcessorIdleMs()


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@548
PS42, Line 548: Preconditions.checkState(outstandingEventCount_.get() == 0);
Add error message.

  Preconditions.checkState(outstandingEventCount_.get() == 0, 
"outstandingEventCount is non-zero after clear.");

Should this be done at the end of method?


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
File 
fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@266
PS38, Line 266:
              :     NotificationEvent pseudoDropTableEvent = new 
NotificationEvent();
              :     pseudoDropTableEvent.setEven
> findSuitableEventExecutor() is renamed to getOrFindDbEventExecutor(). It is
Thank you for your explanation.
I think I understand the decision now, because DbEventExecutor can remove the 
mapping by itself if cleanIfNecessary(false) is called. And the mapping should 
only be set after DbProcessor for dbName is instantiated, which happen 
exclusively inside DbeventExecutor.enqueue().

The fact that dispatch() is also a synchronized method make it safe that 
DbeventExecutor.getOrFindDbEventExecutor() and DbeventExecutor.enqueue() called 
independently within the same synchronization scope.


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
File 
fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@102
PS39, Line 102:   @VisibleForTesting
> Done. Have added Preconditions.checkState(status_ != EventExecutorStatus.ST
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@196
PS39, Line 196: reNotif
> Done
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@312
PS39, Line 312:  * @return DbEventExecutor if ma
> Done
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@327
PS39, Line 327: bEventExecutor
> Done. It actually do not create DbEventExecutor. So have changed it to getO
Ack


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
File 
fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java:

http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@72
PS42, Line 72:   // Database name to DbEventExecutor map
             :   private final Map<String, DbEventExecutor> 
dbNameToEventExecutor_ =
             :       new ConcurrentHashMap<>();
Please document that This map should be mutated only by DbeventExecutor through 
DbEventExecutor.assignEventExecutor() and 
DbEventExecutor.unAssignEventExecutor().


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@133
PS42, Line 133:   void clear() {
I think dispatch(), shutdown(), and clear() should all be a synchronized method.

I suspect that usable_ and usableLock_ in many places can be 
simplified/eliminated if this code guarantee that dispatch(), shutdown(), 
clear() can not run concurrently. For example, DbEventExecutor.clear() does not 
need to call setUsable(false) followed by setUsable(true).


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@3316
PS39, Line 3316: Adding {} aborted write ids
> Yes it can be empty. tableWriteIds_.size() is shown in LOG - "Adding {} abo
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
File 
fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1583
PS39, Line 1583:   public synchronized void updateStatus(EventProcessorStatus 
toStatus) {
               :     eventProcessorStatus_ = toStatus;
               :   }
> If 2 executors concurrently invoke handleEventProcessException() due to fai
If both NEEDS_INVALIDATE and ERROR is handled the same way, why bother 
distinguishing them as different enum val?
What if invalidate_global_metadata_on_event_processing_failure is False?

The docs seems to suggest that NEEDS_INVALIDATE and ERROR has different 
meaning/severity, and user may need to resolve it differently, especially if 
invalidate_global_metadata_on_event_processing_failure is False.
https://impala.apache.org/docs/build/html/topics/impala_metadata.html

But I agree with you that the current code treat both status the same way. The 
difference is only about whether MetastoreNotificationNeedsInvalidateException 
was raised or not. Is such difference important for user/admin to pay attention 
to?

In any case, please update the docs by mentioning that both NEEDS_INVALIDATE 
and ERROR can be solved by issuing global Invalidate Metadata query.


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
File 
fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java:

http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java@1231
PS42, Line 1231: a invalidate
nit: an invalidate


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@316
PS39, Line 316:     return tableProcessors_.size();
              :   }
              :
              :   /**
> Have modified following:
Done


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@125
PS42, Line 125: private final Object usableLock_ = new Object();
This lock also protect events_.poll() and 
tableEventExecutor_.decrOutstandingEventCount() besides of usable_ flag.
Please update the doc accordingly.


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@328
PS42, Line 328:    * Increments outstanding event count for the 
TableEventExecutor.
Please document when this method must be called.


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@337
PS42, Line 337: private void decrOutstandingEventCount() {
Remove this method and use the overload with 'delta' parameter instead.


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@346
PS42, Line 346: 
Preconditions.checkState(outstandingEventCount_.addAndGet(-delta) >= 0);
Add error message.

  Preconditions.checkState(outstandingEventCount_.get() == 0, 
"outstandingEventCount is negative after decrement.");


http://gerrit.cloudera.org:8080/#/c/21031/42/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@364
PS42, Line 364: Preconditions.checkState(outstandingEventCount_.get() == 0);
Add error message.

  Preconditions.checkState(outstandingEventCount_.get() == 0, 
"outstandingEventCount is non-zero after clear.");


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
File 
fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@125
PS38, Line 125:   public void cleanUp() {
              :     assertEquals(MetastoreEventsProces
> Done. Have added the checks in shutDownEventExecutorService(EventExecutorSe
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py
File tests/custom_cluster/test_event_processing_perf.py:

http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@32
PS39, Line 32: LOG = logging.getLogger(__name__)
> Have used pytest.mark.skipif() now. Also have added multiprocessing for all
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@270
PS39, Line 270:       def __del__(self):
              :         # Invoked only for main thread
              :         LOG.info("Deleting for thread %s", self.name)
              :
              :       def __enter__(self):
              :         # Invoked only for main thread
> Done. Have added multiprocessing for all the queries that run on different
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@343
PS39, Line 343:   insert_into_part_time = None
> Done. Have added teardown_method now.
Done


http://gerrit.cloudera.org:8080/#/c/21031/42/tests/custom_cluster/test_event_processing_perf.py
File tests/custom_cluster/test_event_processing_perf.py:

http://gerrit.cloudera.org:8080/#/c/21031/42/tests/custom_cluster/test_event_processing_perf.py@43
PS42, Line 43:   db_count = 5
             :   table_count = 5
             :   partition_count = 500
             :   insert_nonpartition_values_count = 100
             :   insert_nonpartition_repeat_count = 5
Consider lowering this test dimension so that we can regularly run this test in 
exhaustive test run.
You can also leave a comment lines with ideal/big dimension values that 
developers can uncomment to run the test manually.



--
To view, visit http://gerrit.cloudera.org:8080/21031
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6
Gerrit-Change-Number: 21031
Gerrit-PatchSet: 42
Gerrit-Owner: Anonymous Coward <k.venureddy2...@gmail.com>
Gerrit-Reviewer: Anonymous Coward <cclive1...@gmail.com>
Gerrit-Reviewer: Anonymous Coward <k.venureddy2...@gmail.com>
Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <huangquanl...@gmail.com>
Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com>
Gerrit-Reviewer: Sai Hemanth Gantasala <saihema...@cloudera.com>
Gerrit-Comment-Date: Thu, 10 Apr 2025 20:00:39 +0000
Gerrit-HasComments: Yes

Reply via email to