cadonna commented on code in PR #12270:
URL: https://github.com/apache/kafka/pull/12270#discussion_r893838592
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -230,23 +233,35 @@ private List<TaskAndAction> getTasksAndActions() {
private void addTask(final Task task) {
if (isStateless(task)) {
- log.debug("Stateless active task " + task.id() + " was added
to the state updater");
addTaskToRestoredTasks((StreamTask) task);
+ log.debug("Stateless active task " + task.id() + " was added
to the restored tasks of the state updater");
} else {
+ updatingTasks.put(task.id(), task);
if (task.isActive()) {
- updatingTasks.put(task.id(), task);
- log.debug("Stateful active task " + task.id() + " was
added to the state updater");
+ log.debug("Stateful active task " + task.id() + " was
added to the updating tasks of the state updater");
changelogReader.enforceRestoreActive();
} else {
- updatingTasks.put(task.id(), task);
- log.debug("Standby task " + task.id() + " was added to the
state updater");
+ log.debug("Standby task " + task.id() + " was added to the
updating tasks of the state updater");
if (updatingTasks.size() == 1) {
changelogReader.transitToUpdateStandby();
}
}
}
}
+ private void removeTask(final TaskId taskId) {
+ final Task task = updatingTasks.remove(taskId);
+ if (task != null) {
+ final Collection<TopicPartition> changelogPartitions =
task.changelogPartitions();
+ changelogReader.unregister(changelogPartitions);
+ removedTasks.add(task);
+ log.debug((task.isActive() ? "Active" : "Standby")
+ + " task " + task.id() + " was removed from the updating
tasks and added to the removed tasks.");
+ } else {
+ log.debug("Task " + taskId + " was not removed since it is not
updating.");
Review Comment:
I am not sure that the one way is less complex than the other. In any case
users need to keep track of the tasks they want to remove in some way or the
other. However, I am open for changes here since it is not trivial how to give
feedback users about what happened with the removed tasks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]