guozhangwang commented on code in PR #12270:
URL: https://github.com/apache/kafka/pull/12270#discussion_r893827703
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {
private void addTask(final Task task) {
if (isStateless(task)) {
- log.debug("Stateless active task " + task.id() + " was added
to the state updater");
addTaskToRestoredTasks((StreamTask) task);
+ log.debug("Stateless active task " + task.id() + " was added
to the restored tasks of the state updater");
} else {
+ updatingTasks.put(task.id(), task);
if (task.isActive()) {
- updatingTasks.put(task.id(), task);
- log.debug("Stateful active task " + task.id() + " was
added to the state updater");
+ log.debug("Stateful active task " + task.id() + " was
added to the updating tasks of the state updater");
changelogReader.enforceRestoreActive();
} else {
- updatingTasks.put(task.id(), task);
- log.debug("Standby task " + task.id() + " was added to the
state updater");
+ log.debug("Standby task " + task.id() + " was added to the
updating tasks of the state updater");
if (updatingTasks.size() == 1) {
changelogReader.transitToUpdateStandby();
}
}
}
}
+ private void removeTask(final TaskId taskId) {
+ final Task task = updatingTasks.remove(taskId);
Review Comment:
During processing: yes today we should not write checkpoint file when we
commit.
During restoring: we can always write checkpoint file regardless of EOS or
ALOS, since if there's a failure we would just over-restore them upon recovery
so no EOS violations happened.
Also during restoring: when we complete restore or remove task, we should
enforce a checkpoint as well (for failing cases though, we should not write a
new one).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]