ShivsundarR commented on code in PR #19417: URL: https://github.com/apache/kafka/pull/19417#discussion_r2041809483
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ########## @@ -748,28 +713,33 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch() { // Acknowledging 2 out of the 3 records received via commitAsync. ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next(); assertEquals(0L, firstRecord.offset()); assertEquals(1L, secondRecord.offset()); shareConsumer1.acknowledge(firstRecord); shareConsumer1.acknowledge(secondRecord); shareConsumer1.commitAsync(); - // The 3rd record should be re-presented to the consumer when it polls again. - records = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - iterator = records.iterator(); - firstRecord = iterator.next(); - assertEquals(2L, firstRecord.offset()); + producer.send(record4); + producer.flush(); + + // The next poll() should throw an IllegalStateException as there is still 1 unacknowledged record. + // In EXPLICIT acknowledgement mode, we are not allowed to have unacknowledged records from a batch. + assertThrows(IllegalStateException.class, () -> shareConsumer1.poll(Duration.ofMillis(5000))); + + // Acknowledging the 3rd record Review Comment: Yes we could still use it as of now. I'll write an application locally and experiment a bit to see whether it is useful to allow this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org