adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031767657


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception 
{
         verifyShareGroupStateTopicRecordsProduced();
     }
 
+    @ClusterTest
+    public void testReadCommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_committed");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 8);
+            // 5th and 10th message transaction was aborted, hence they won't 
be included in the fetched records.
+            assertEquals(8, records.count());
+            int messageCounter = 1;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                if (messageCounter % 5 == 0)
+                    messageCounter++;
+                assertEquals("Message " + messageCounter, new 
String(record.value()));
+                messageCounter++;
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void testReadUncommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_uncommitted");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 10);
+            // Even though 5th and 10th message transaction was aborted, they 
will be included in the fetched records since IsolationLevel is 
READ_UNCOMMITTED.
+            assertEquals(10, records.count());
+            int messageCounter = 1;
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                assertEquals(tp.topic(), record.topic());
+                assertEquals(tp.partition(), record.partition());
+                assertEquals("Message " + messageCounter, new 
String(record.value()));
+                messageCounter++;
+            }
+        }
+        verifyShareGroupStateTopicRecordsProduced();
+    }
+
+    @ClusterTest
+    public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        alterShareIsolationLevel("group1", "read_uncommitted");
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            transactionalProducer.initTransactions();
+            try {
+                // First transaction is committed.
+                produceCommittedTransaction(transactionalProducer, "Message 
1");
+
+                ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 1);

Review Comment:
   I don't think that this will force the test to wait for 2.5 sec. This is the 
maximum time to block. The method returns immediately if there are records 
available. Otherwise, it will await the passed timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to