mjsax commented on code in PR #19507:
URL: https://github.com/apache/kafka/pull/19507#discussion_r2049471025


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -553,7 +553,7 @@ private void handleTasksWithStateUpdater(final Map<TaskId, 
Set<TopicPartition>>
     private void handleTasksPendingInitialization() {
         // All tasks pending initialization are not part of the usual 
bookkeeping
         for (final Task task : tasks.drainPendingTasksToInit()) {
-            closeTaskClean(task, Collections.emptySet(), 
Collections.emptyMap());
+            closeTaskClean(task, new HashSet<>(), new HashMap<>());

Review Comment:
   I am frankly not sure, why https://github.com/apache/kafka/pull/16730 would 
actually address any resource leak, and the old and new code would just call
   ```
               task.suspend();
               task.closeClean();
   ```
   
   The difference is just, that the new code would catch and swallow some 
exception.
   
   So maybe if we throw some exception, other cleanup code did not run leaking 
the producer? But with the new code, we would swallow the exception ensuring 
that other cleanup code does run?
   
   Would be good to understand better. Mabye @FrankYang0529 can shed some light?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to