cadonna commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1987079448


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1163,10 +1166,12 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
         // as such we just need to skip those dirty tasks in the checkpoint
         final Set<Task> dirtyTasks = new HashSet<>();
         try {
-            // in handleRevocation we must call commitOffsetsOrTransaction() 
directly rather than
-            // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make 
sure we don't skip the
-            // offset commit because we are in a rebalance
-            taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+            if (revokedTasksNeedCommit) {

Review Comment:
   Thinking about it again, I agree we still need the TX in-flight check, 
because a punctuator could write to a changelog topic without consuming any 
records from input topics. That means, there might be an empty offset map and a 
TX in-flight that should be committed. 
   
   Of course, if we cannot get rid of the TX in-flight, it does not make any 
sense to move the complete check to the outermost context for the reasons you 
mentioned. I thought that was clear from my comment that the movement only 
applies if we adapt the condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to