cadonna commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1986072577


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1163,10 +1166,12 @@ void handleRevocation(final Collection<TopicPartition> 
revokedPartitions) {
         // as such we just need to skip those dirty tasks in the checkpoint
         final Set<Task> dirtyTasks = new HashSet<>();
         try {
-            // in handleRevocation we must call commitOffsetsOrTransaction() 
directly rather than
-            // commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make 
sure we don't skip the
-            // offset commit because we are in a rebalance
-            taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+            if (revokedTasksNeedCommit) {

Review Comment:
   If we have this, do we want to also adapt the following condition in 
`TaskExecutor#commitOffsetsOrTransaction()`
   ```java
   if (!offsetsPerTask.isEmpty() || 
taskManager.streamsProducer().transactionInFlight()) {
   ```
   to
   ```java
   if (!offsetsPerTask.isEmpty()) {
   ```
   (and maybe move it to the outermost context as it was before the PR that 
introduced the bug)?



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -922,6 +932,175 @@ public void onRestoreEnd(final TopicPartition 
topicPartition,
         );
     }
 
+
+    private final AtomicReference<String> transactionalProducerId = new 
AtomicReference<>();
+
+    private class TestClientSupplier extends DefaultKafkaClientSupplier {
+        @Override
+        public Producer<byte[], byte[]> getProducer(final Map<String, Object> 
config) {
+            transactionalProducerId.compareAndSet(null, (String) 
config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+
+            return new KafkaProducer<>(config, new ByteArraySerializer(), new 
ByteArraySerializer());
+        }
+    }
+
+    final static AtomicReference<TaskId> taskWithData = new 
AtomicReference<>();
+    final static AtomicBoolean didRevokeIdleTask = new AtomicBoolean(false);
+
+    @Test
+    public void 
shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() 
throws Exception {
+        shouldNotProduceDuplicates(false);
+    }
+
+    @Test
+    public void shouldCommitAllTasksIfRevokedTaskTriggerPunctuation() throws 
Exception {
+        shouldNotProduceDuplicates(true);
+    }
+
+    private void shouldNotProduceDuplicates(final boolean usePunctuation) 
throws Exception {

Review Comment:
   nit:
   
   ```suggestion
       @ParameterizedTest(name = "{argumentsWithNames}")
       @FieldSource("namedArguments")
       @ParameterizedTest(name = "shouldCommitAllTasks with punctuation: {0}")
       @ValueSource(booleans = {true, false})
       public void shouldNotProduceDuplicates(final boolean usePunctuation) 
throws Exception {
       ...
       }
       
       private static List<Arguments> namedArguments = Arrays.asList(
           arguments(named("Should not commit active tasks with pending input 
if revoked task did not make progress"), false),
           arguments(named("Should commit all tasks if revoked task triggers 
punctuation"), true)
      );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to