ShivsundarR commented on code in PR #19417:
URL: https://github.com/apache/kafka/pull/19417#discussion_r2044503952


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -748,28 +713,33 @@ public void 
testExplicitAcknowledgementCommitAsyncPartialBatch() {
             // Acknowledging 2 out of the 3 records received via commitAsync.
             ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
             ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+            ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
             assertEquals(0L, firstRecord.offset());
             assertEquals(1L, secondRecord.offset());
 
             shareConsumer1.acknowledge(firstRecord);
             shareConsumer1.acknowledge(secondRecord);
             shareConsumer1.commitAsync();
 
-            // The 3rd record should be re-presented to the consumer when it 
polls again.
-            records = shareConsumer1.poll(Duration.ofMillis(5000));
-            assertEquals(1, records.count());
-            iterator = records.iterator();
-            firstRecord = iterator.next();
-            assertEquals(2L, firstRecord.offset());
+            producer.send(record4);
+            producer.flush();
+
+            // The next poll() should throw an IllegalStateException as there 
is still 1 unacknowledged record.
+            // In EXPLICIT acknowledgement mode, we are not allowed to have 
unacknowledged records from a batch.
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer1.poll(Duration.ofMillis(5000)));
+
+            // Acknowledging the 3rd record

Review Comment:
   Update on this : So I experimented with a mock consumer application, overall 
it looks good with the current setup. When the user gets the exception, they 
can do what the want.
   
   - Close the consumer, in which case the unacked records are released.
   
   - Re-attempt to acknowledge the remaining records. We will continue to get 
the exception until all the records are acknowledged. After that any further 
consumption will continue as expected. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to