ShivsundarR commented on code in PR #19417: URL: https://github.com/apache/kafka/pull/19417#discussion_r2044503952
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ########## @@ -748,28 +713,33 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch() { // Acknowledging 2 out of the 3 records received via commitAsync. ConsumerRecord<byte[], byte[]> firstRecord = iterator.next(); ConsumerRecord<byte[], byte[]> secondRecord = iterator.next(); + ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next(); assertEquals(0L, firstRecord.offset()); assertEquals(1L, secondRecord.offset()); shareConsumer1.acknowledge(firstRecord); shareConsumer1.acknowledge(secondRecord); shareConsumer1.commitAsync(); - // The 3rd record should be re-presented to the consumer when it polls again. - records = shareConsumer1.poll(Duration.ofMillis(5000)); - assertEquals(1, records.count()); - iterator = records.iterator(); - firstRecord = iterator.next(); - assertEquals(2L, firstRecord.offset()); + producer.send(record4); + producer.flush(); + + // The next poll() should throw an IllegalStateException as there is still 1 unacknowledged record. + // In EXPLICIT acknowledgement mode, we are not allowed to have unacknowledged records from a batch. + assertThrows(IllegalStateException.class, () -> shareConsumer1.poll(Duration.ofMillis(5000))); + + // Acknowledging the 3rd record Review Comment: Update on this : So I experimented with a mock consumer application, overall it looks good with the current setup. When the user gets the exception, they can do what the want. - Close the consumer, in which case the unacked records are released. - Re-attempt to acknowledge the remaining records. We will continue to get the exception until all the records are acknowledged. After that any further consumption will continue as expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org