k.venureddy2...@gmail.com has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/21031 )

Change subject: IMPALA-12709: Add support for hierarchical metastore event 
processing
......................................................................


Patch Set 42:

(19 comments)

Reworked

http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml
File docs/topics/impala_metadata.xml:

http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@306
PS39, Line 306: in
> nit: remove trailing whitespace here and others.
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@307
PS39, Line 307: n 1.0 to 5.0 second
> I think it is better to say "between 1.0 to 5.0 seconds"
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/docs/topics/impala_metadata.xml@308
PS39, Line 308: <codeph>--enable_hierarchical_event_processing</codeph> flag 
set to
              :         <codeph>true</codeph>
> Why subsecond hms_event_polling_interval_s is not allowed if enable_hierarc
There is no restriction to config hms_event_polling_interval_s in ms, even if 
hierarchical event processing is not enabled. With 
enable_hierarchical_event_processing, MetastoreEventsProcessor#processEvents() 
became just event dispatcher. So can be can be configured to lower value as 
long as the executor threads are processing the incoming events and  keeping 
the outstanding event count within the max_outstanding_events_on_executors 
limit.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
File 
fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java@660
PS38, Line 660:   /**
              :    * CDP Hive-3 only function. This is a dummy implementation 
till the CommitTxnEvent
              :    * dummy implementation class defined in this file becomes 
actual implementation. Need
              :    * to change when CommitTxnEvent implementation is supported 
with IMPALA-13285.
              :    */
> Understood. Please mention the JIRA number, IMPALA-13285, in the comment.
Done


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java:

http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1060
PS36, Line 1060:   public static List<PseudoCommitTxnEvent> 
getPseudoCommitTxnEvents(CommitTxnEvent event)
               :       throws MetastoreNotificationException {
               :     List<PseudoCommitTxnEvent> pseudoEvents = new ArrayList<>
> Please clarify help message for invalidate_metadata_on_event_processing_fai
Done


http://gerrit.cloudera.org:8080/#/c/21031/36/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1095
PS36, Line 1095:           tableNameToIdxs.computeIfAbsent(tableName, k -> new 
ArrayList<>()).add(i);
               :         }
               :         // Form list of PseudoCommit
> Checked the usage of this msTbl. We just use the dbName, tableName, numOfPa
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
File fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java@1115
PS39, Line 1115: getTable(entr
> Spell it out. "PseudoCommitTxnEvent".
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java@444
PS39, Line 444:
              :   @VisibleForTesting
              :   List<TableEventExecutor> getTableEventExecutors() {
              :     ret
> I'm sorry, I know that I have asked this before:
Have modified following:
1.Decrement the outstandingEventCount_ in DbProcessor#clear() as well just 
before clearing queues.
2. Have added Preconditions.checkState at following places:
  1. Here in  decrOutstandingEventCount.
  2. After polling the event from the queues(inEvents_ and barrierEvents_)
3. clearInternal() asserts whether outstandingEventCount_ is 0.
4. Have used AtomicLong for outstandingEventCount_.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
File 
fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@266
PS38, Line 266:
              :     NotificationEvent pseudoDropTableEvent = new 
NotificationEvent();
              :     pseudoDropTableEvent.setEven
> I think the assignment should happen here.
findSuitableEventExecutor() is renamed to getOrFindDbEventExecutor(). It is 
called from dispatch(). And dispatch() is synchronized method. Concurrent calls 
do not happen.

Lets assume assignment happens here. If EventExecutorService#dispatch() and 
EventExecutorService#clear() are invoked concurrenly. During dispatch() 
execution, right after EventExecutorService#getOrFindDbEventExecutor() and 
before DbEventExecutor#enqueue() invocation, it might happen that assignment is 
removed in EventExecutorService#clear(). Then the dbName to DbEventExecutor 
mapping is lost by the time we call DbEventExecutor#enqueue() on respective Db 
event executor.
Thats why assigning dbName to DbEventExecutor mapping in enqueue().

DbEventExecutor is not usable during the time when DbEventExecutor#clear() is 
executing. It becomes usable right before returning from clear(). And, it is 
also not usable when DbEventExecutor#stop() is invoked. In this case, it 
doesn't become usable again.
Checking usable in assignment is not required due to following reasons:
1. If a DbEventExecutor is stopped, it the case where all the remaining 
DbEventExecutors are either stopped or getting stopped. So it is ok ignore the 
event in DbEventExecutor#enqueue() if DbEventExecutor is not usable.

2. If a DbEventExecutor is cleared, metastore event processor is being reset. 
Catalog cache will be cleared. All the DbEventExecutors are either cleared or 
getting cleared. So it is ok ignore the event in DbEventExecutor#enqueue() if 
DbEventExecutor is not usable.

3. Requires to acquire usable lock to check if DbEventExecutor is usable for 
each DbEventExecutor. Instead we can ignore the event in 
DbEventExecutor#enqueue() if DbEventExecutor is not usable.


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
File 
fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@102
PS39, Line 102:   @VisibleForTesting
> Must validate and test legal transition here.
Done. Have added Preconditions.checkState(status_ != 
EventExecutorStatus.STOPPED);
We need an initial default state(INACTIVE). start() is not allowed if status_ 
is STOPPED. So STOPPED cannot be default. And ACTIVE cannot be default before 
start().


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@196
PS39, Line 196: reNotif
> Is it better to name this 'dispatch' rather than 'process'?
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@312
PS39, Line 312:  * @return DbEventExecutor if ma
> I think 'getDbEventExecutor' is clearer. Please add @Nullable annotation.
Done


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java@327
PS39, Line 327: bEventExecutor
> I think 'getOrCreateDbEventExecutor' is clearer.
Done. It actually do not create DbEventExecutor. So have changed it to 
getOrFindDbEventExecutor().


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
File fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java@3316
PS39, Line 3316: Adding {} aborted write ids
> Please log txnId_ as well.
Yes it can be empty. tableWriteIds_.size() is shown in LOG - "Adding {} aborted 
write ids for txn id {}".
I think, it is ok. It is not misleading. Have added txnId_ in the log now.

We also have debug logs in CatalogServiceCatalog#addWriteIdsToTable() at the 
entry of method("Trying to add {} write ids {} to table {}.{} for event {}") 
and also have log whether have added or not at all the possible paths in the 
method.


http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
File fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java:

http://gerrit.cloudera.org:8080/#/c/21031/39/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java@316
PS39, Line 316:     return tableProcessors_.size();
              :   }
              :
              :   /**
> Similar comment like DBEventExecutor.decrOutstandingEventCount().
Have modified following:
1. Decrement the outstandingEventCount_ in deleteTableProcessor() as well just 
before clearing queue.
2. Have added Preconditions.checkState at following places:
  1. Here in  decrOutstandingEventCount.
  2. After polling the event from events_.
  3. clear() asserts whether outstandingEventCount_ is 0.
3. Have used AtomicLong for outstandingEventCount_.


http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
File 
fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java:

http://gerrit.cloudera.org:8080/#/c/21031/38/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java@125
PS38, Line 125:   public void cleanUp() {
              :     assertEquals(MetastoreEventsProces
> Make sense. Should not do it in cleanUp.
Done. Have added the checks in 
shutDownEventExecutorService(EventExecutorService eventExecutorService) before 
shutting down the EventExecutorService.

List<MetastoreEvent> metastoreEvents = eventsProcessor_.getEventsFactory()
       .getFilteredEvents(eventsProcessor_.getNextMetastoreEvents(),
            eventsProcessor_.getMetrics());
    assertTrue(metastoreEvents.size() == 0);
    assertTrue(eventsProcessor_.getOutstandingEventCount() == 0);


http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py
File tests/custom_cluster/test_event_processing_perf.py:

http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@32
PS39, Line 32: LOG = logging.getLogger(__name__)
> Please write this under SkipIf, something along this line:
Have used pytest.mark.skipif() now. Also have added multiprocessing for all the 
queries that run on different dbs and tables at the same time.


http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@270
PS39, Line 270:       def __del__(self):
              :         # Invoked only for main thread
              :         LOG.info("Deleting for thread %s", self.name)
              :
              :       def __enter__(self):
              :         # Invoked only for main thread
> Consider multithreading this (and maybe other relevant queries), with 1 Imp
Done. Have added multiprocessing for all the queries that run on different dbs 
and tables at the same time.
This stage table is just single instance and it is used to create 
partitions/insert into existing partitions for tables in different dbs. So 
these 2 queries(create stage table and inserting some data to stage table) are 
executed as it is now.


http://gerrit.cloudera.org:8080/#/c/21031/39/tests/custom_cluster/test_event_processing_perf.py@343
PS39, Line 343:   insert_into_part_time = None
> Please make sure that DROP DATABASE CASCADE happen even if assertion failur
Done. Have added teardown_method now.



--
To view, visit http://gerrit.cloudera.org:8080/21031
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I76d8a739f9db6d40f01028bfd786a85d83f9e5d6
Gerrit-Change-Number: 21031
Gerrit-PatchSet: 42
Gerrit-Owner: Anonymous Coward <k.venureddy2...@gmail.com>
Gerrit-Reviewer: Anonymous Coward <cclive1...@gmail.com>
Gerrit-Reviewer: Anonymous Coward <k.venureddy2...@gmail.com>
Gerrit-Reviewer: Csaba Ringhofer <csringho...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <huangquanl...@gmail.com>
Gerrit-Reviewer: Riza Suminto <riza.sumi...@cloudera.com>
Gerrit-Reviewer: Sai Hemanth Gantasala <saihema...@cloudera.com>
Gerrit-Comment-Date: Thu, 10 Apr 2025 16:44:22 +0000
Gerrit-HasComments: Yes

Reply via email to