vvcephei commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r431251657



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -189,38 +190,52 @@ private void prepareClose(final boolean clean) {
     @Override
     public void closeClean(final Map<TopicPartition, Long> checkpoint) {
         Objects.requireNonNull(checkpoint);
-        close(true);
+        close(true, false);
 
         log.info("Closed clean");
     }
 
     @Override
     public void closeDirty() {
-        close(false);
+        close(false, false);
 
         log.info("Closed dirty");
     }
 
-    private void close(final boolean clean) {
+    @Override
+    public void closeAndRecycleState() {
+        prepareClose(true);
+        close(true, true);

Review comment:
       When I introduced `closeClean` and `closeDirty`, I resisted the urge to 
inline `close(boolean)` only to control the LOC of the change. Having more 
branches and flags is generally more of a liability than a few duplicated 
statements.
   
   Now that we have two boolean flags (again) and new branch in the internal 
`close` method, I'd be much more inclined to inline it. But we can do this in a 
follow-on PR, if you prefer.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -811,17 +812,41 @@ private void prepareChangelogs(final 
Set<ChangelogMetadata> newPartitionsToResto
         }
     }
 
+    private RuntimeException invokeOnRestoreEnd(final TopicPartition partition,
+                                                final ChangelogMetadata 
changelogMetadata) {
+        // only trigger the store's specific listener to make sure we disable 
bulk loading before transition to standby
+        final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
+        final StateRestoreCallback restoreCallback = 
storeMetadata.restoreCallback();
+        final String storeName = storeMetadata.store().name();
+        if (restoreCallback instanceof StateRestoreListener) {
+            try {
+                ((StateRestoreListener) 
restoreCallback).onRestoreEnd(partition, storeName, 
changelogMetadata.totalRestored);
+            } catch (final RuntimeException e) {
+                return e;
+            }
+        }
+        return null;
+    }
+
     @Override
-    public void remove(final Collection<TopicPartition> revokedChangelogs) {
-        // Only changelogs that are initialized that been added to the restore 
consumer's assignment
+    public void unregister(final Collection<TopicPartition> revokedChangelogs,
+                           final boolean triggerOnRestoreEnd) {
+        final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
+
+        // Only changelogs that are initialized have been added to the restore 
consumer's assignment
         final List<TopicPartition> revokedInitializedChangelogs = new 
ArrayList<>();
 
         for (final TopicPartition partition : revokedChangelogs) {
             final ChangelogMetadata changelogMetadata = 
changelogs.remove(partition);
             if (changelogMetadata != null) {
-                if (changelogMetadata.state() != ChangelogState.REGISTERED) {
+                if (triggerOnRestoreEnd && 
changelogMetadata.state().equals(ChangelogState.RESTORING)) {

Review comment:
       I think this makes sense. The restore listener / bulk loading 
interaction is a bit wacky, but it seems reasonable to just work around it for 
now.
   
   Just to play devil's advocate briefly, though, is it not true for _all_ 
listeners that the restore has ended, for exactly the reason you cited above?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -272,6 +274,30 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             }
         }
 
+        if (taskCloseExceptions.isEmpty()) {
+            for (final Task oldTask : tasksToRecycle) {
+                final Task newTask;
+                try {
+                    if (oldTask.isActive()) {
+                        final Set<TopicPartition> partitions = 
standbyTasksToCreate.remove(oldTask.id());
+                        newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+                    } else {
+                        final Set<TopicPartition> partitions = 
activeTasksToCreate.remove(oldTask.id());
+                        newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
+                    }
+                    tasks.remove(oldTask.id());
+                    addNewTask(newTask);
+                } catch (final RuntimeException e) {
+                    final String uncleanMessage = String.format("Failed to 
recycle task %s cleanly. Attempting to close remaining tasks before 
re-throwing:", oldTask.id());
+                    log.error(uncleanMessage, e);
+                    taskCloseExceptions.put(oldTask.id(), e);
+                    dirtyTasks.add(oldTask);
+                }
+            }
+        } else {
+            dirtyTasks.addAll(tasksToRecycle);
+        }

Review comment:
       This seems like an odd case. Am I right in reading this as, "something 
went wrong, and we don't know what it was, so we're just going to assume the 
worst and dump all the tasks that we were hoping to recycle"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to