ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r430838846



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
##########
@@ -45,12 +44,6 @@
      */
     Set<TopicPartition> completedChangelogs();
 
-    /**
-     * Removes the passed in partitions from the set of changelogs
-     * @param revokedPartitions the set of partitions to remove
-     */
-    void remove(Collection<TopicPartition> revokedPartitions);

Review comment:
       Renamed to `unregister` and moved to `ChangelogRegister` interface

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -811,8 +811,28 @@ private void prepareChangelogs(final 
Set<ChangelogMetadata> newPartitionsToResto
         }
     }
 
+    void maybeTriggerOnRestoreEnd(final Collection<TopicPartition> 
changelogPartitions) {

Review comment:
       Alright, the situation here is that we need to make sure we toggle bulk 
loading off for any active restoring tasks that convert to standby. Rather than 
try and force a direct call to `toggleForBulkLoading` on the store itself I 
figured we should just call `onRestoreEnd`. Technically, restoration _is_ 
ending. It just happens to be due to type transition, rather than restore 
completion.
   I figured this might be relevant for users of custom stores, which might do 
something similar to bulk loading that they wish to turn off for standbys. But 
since this is only relevant to the underlying store, and doesn't mean we have 
actually finished restoring a task, we should only call the specific store's 
listener -- and _not_ the user registered global listener.
   WDYT?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -811,17 +812,41 @@ private void prepareChangelogs(final 
Set<ChangelogMetadata> newPartitionsToResto
         }
     }
 
+    private RuntimeException invokeOnRestoreEnd(final TopicPartition partition,
+                                                final ChangelogMetadata 
changelogMetadata) {
+        // only trigger the store's specific listener to make sure we disable 
bulk loading before transition to standby
+        final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
+        final StateRestoreCallback restoreCallback = 
storeMetadata.restoreCallback();
+        final String storeName = storeMetadata.store().name();
+        if (restoreCallback instanceof StateRestoreListener) {
+            try {
+                ((StateRestoreListener) 
restoreCallback).onRestoreEnd(partition, storeName, 
changelogMetadata.totalRestored);
+            } catch (final RuntimeException e) {
+                return e;
+            }
+        }
+        return null;
+    }
+
     @Override
-    public void remove(final Collection<TopicPartition> revokedChangelogs) {
-        // Only changelogs that are initialized that been added to the restore 
consumer's assignment
+    public void unregister(final Collection<TopicPartition> revokedChangelogs,
+                           final boolean triggerOnRestoreEnd) {
+        final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
+
+        // Only changelogs that are initialized have been added to the restore 
consumer's assignment
         final List<TopicPartition> revokedInitializedChangelogs = new 
ArrayList<>();
 
         for (final TopicPartition partition : revokedChangelogs) {
             final ChangelogMetadata changelogMetadata = 
changelogs.remove(partition);
             if (changelogMetadata != null) {
-                if (changelogMetadata.state() != ChangelogState.REGISTERED) {
+                if (triggerOnRestoreEnd && 
changelogMetadata.state().equals(ChangelogState.RESTORING)) {

Review comment:
       Alright, the situation here is that we need to make sure we toggle bulk 
loading off for any active restoring tasks that convert to standby. Rather than 
try and force a direct call to toggleForBulkLoading on the store itself I 
figured we should just call onRestoreEnd. Technically, restoration is ending. 
It just happens to be due to type transition, rather than restore completion.
   
   I figured this might be relevant for users of custom stores, which might do 
something similar to bulk loading that they wish to turn off for standbys. But 
since this is only relevant to the underlying store, and doesn't mean we have 
actually finished restoring a task, we should only call the specific store's 
listener -- and not the user registered global listener.
   WDYT?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -811,8 +811,28 @@ private void prepareChangelogs(final 
Set<ChangelogMetadata> newPartitionsToResto
         }
     }
 
+    void maybeTriggerOnRestoreEnd(final Collection<TopicPartition> 
changelogPartitions) {

Review comment:
       Alright, the situation here is that we need to make sure we toggle bulk 
loading off for any active restoring tasks that convert to standby. Rather than 
try and force a direct call to `toggleForBulkLoading` on the store itself I 
figured we should just call `onRestoreEnd`. Technically, restoration _is_ 
ending. It just happens to be due to type transition, rather than restore 
completion.
   I figured this might be relevant for users of custom stores, which might do 
something similar to bulk loading that they wish to turn off for standbys. But 
since this is only relevant to the underlying store, and doesn't mean we have 
actually finished restoring a task, we should only call the specific store's 
listener -- and _not_ the user registered global listener.
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to