cadonna commented on code in PR #12270:
URL: https://github.com/apache/kafka/pull/12270#discussion_r893822468


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {
 
         private void addTask(final Task task) {
             if (isStateless(task)) {
-                log.debug("Stateless active task " + task.id() + " was added 
to the state updater");
                 addTaskToRestoredTasks((StreamTask) task);
+                log.debug("Stateless active task " + task.id() + " was added 
to the restored tasks of the state updater");
             } else {
+                updatingTasks.put(task.id(), task);
                 if (task.isActive()) {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Stateful active task " + task.id() + " was 
added to the state updater");
+                    log.debug("Stateful active task " + task.id() + " was 
added to the updating tasks of the state updater");
                     changelogReader.enforceRestoreActive();
                 } else {
-                    updatingTasks.put(task.id(), task);
-                    log.debug("Standby task " + task.id() + " was added to the 
state updater");
+                    log.debug("Standby task " + task.id() + " was added to the 
updating tasks of the state updater");
                     if (updatingTasks.size() == 1) {
                         changelogReader.transitToUpdateStandby();
                     }
                 }
             }
         }
 
+        private void removeTask(final TaskId taskId) {
+            final Task task = updatingTasks.remove(taskId);

Review Comment:
   Yeah, I was also thinking about checkpointing, but was not clear about all 
the details. Here my thoughts about checkpointing:
   We only checkpoint if we are not in EOS mode, because otherwise we would 
have a checkpoint file when we close dirty. On the other hand, also in EOS the 
offsets in that checkpoint file should be safe since it was written during 
restoration and not during a commit.
   Is this correct or do I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to