vvcephei commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r431505723



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -811,17 +812,41 @@ private void prepareChangelogs(final 
Set<ChangelogMetadata> newPartitionsToResto
         }
     }
 
+    private RuntimeException invokeOnRestoreEnd(final TopicPartition partition,
+                                                final ChangelogMetadata 
changelogMetadata) {
+        // only trigger the store's specific listener to make sure we disable 
bulk loading before transition to standby
+        final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
+        final StateRestoreCallback restoreCallback = 
storeMetadata.restoreCallback();
+        final String storeName = storeMetadata.store().name();
+        if (restoreCallback instanceof StateRestoreListener) {
+            try {
+                ((StateRestoreListener) 
restoreCallback).onRestoreEnd(partition, storeName, 
changelogMetadata.totalRestored);
+            } catch (final RuntimeException e) {
+                return e;
+            }
+        }
+        return null;
+    }
+
     @Override
-    public void remove(final Collection<TopicPartition> revokedChangelogs) {
-        // Only changelogs that are initialized that been added to the restore 
consumer's assignment
+    public void unregister(final Collection<TopicPartition> revokedChangelogs,
+                           final boolean triggerOnRestoreEnd) {
+        final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
+
+        // Only changelogs that are initialized have been added to the restore 
consumer's assignment
         final List<TopicPartition> revokedInitializedChangelogs = new 
ArrayList<>();
 
         for (final TopicPartition partition : revokedChangelogs) {
             final ChangelogMetadata changelogMetadata = 
changelogs.remove(partition);
             if (changelogMetadata != null) {
-                if (changelogMetadata.state() != ChangelogState.REGISTERED) {
+                if (triggerOnRestoreEnd && 
changelogMetadata.state().equals(ChangelogState.RESTORING)) {

Review comment:
       Yeah, that last sentence is what I was thinking would make sense to just 
call all the listeners, not just the inner ones. If you implemented the 
listener so that you could log or collect metrics to watch the restore process, 
it seems like it would be strange just to see the restoration disappear, and 
then a new restoration start (for the post-recycled task), without the prior 
one ever having "ended". It just seems natural to see an end for every start, 
even if the "end" doesn't mean "completed". But I can see how that could also 
be confusing.
   
   I'm not totally sure what the best call here is, so maybe we should just 
defer to your judgement, since you're the closest to this code right now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to