mjsax commented on code in PR #19507: URL: https://github.com/apache/kafka/pull/19507#discussion_r2049471025
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -553,7 +553,7 @@ private void handleTasksWithStateUpdater(final Map<TaskId, Set<TopicPartition>> private void handleTasksPendingInitialization() { // All tasks pending initialization are not part of the usual bookkeeping for (final Task task : tasks.drainPendingTasksToInit()) { - closeTaskClean(task, Collections.emptySet(), Collections.emptyMap()); + closeTaskClean(task, new HashSet<>(), new HashMap<>()); Review Comment: I am frankly not sure, why https://github.com/apache/kafka/pull/16730 would actually address any resource leak, as the old and new code would just call ``` task.suspend(); task.closeClean(); ``` The difference is just, that the new code would catch and swallow potential exceptions. So maybe if we throw some exception, other cleanup code did not run leaking the producer? But with the new code, we would swallow the exception ensuring that other cleanup code does run? Would be good to understand better. Maybe @FrankYang0529 can shed some light? I am not 100% sure right now, if https://github.com/apache/kafka/pull/16730 is the correct fix for the leak or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org