adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2031767657
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java: ########## @@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception { verifyShareGroupStateTopicRecordsProduced(); } + @ClusterTest + public void testReadCommittedIsolationLevel() { + alterShareAutoOffsetReset("group1", "earliest"); + alterShareIsolationLevel("group1", "read_committed"); + try (Producer<byte[], byte[]> transactionalProducer = createProducer("T1"); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5); + shareConsumer.subscribe(Set.of(tp.topic())); + ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 8); + // 5th and 10th message transaction was aborted, hence they won't be included in the fetched records. + assertEquals(8, records.count()); + int messageCounter = 1; + for (ConsumerRecord<byte[], byte[]> record : records) { + assertEquals(tp.topic(), record.topic()); + assertEquals(tp.partition(), record.partition()); + if (messageCounter % 5 == 0) + messageCounter++; + assertEquals("Message " + messageCounter, new String(record.value())); + messageCounter++; + } + } + verifyShareGroupStateTopicRecordsProduced(); + } + + @ClusterTest + public void testReadUncommittedIsolationLevel() { + alterShareAutoOffsetReset("group1", "earliest"); + alterShareIsolationLevel("group1", "read_uncommitted"); + try (Producer<byte[], byte[]> transactionalProducer = createProducer("T1"); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5); + shareConsumer.subscribe(Set.of(tp.topic())); + ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 10); + // Even though 5th and 10th message transaction was aborted, they will be included in the fetched records since IsolationLevel is READ_UNCOMMITTED. + assertEquals(10, records.count()); + int messageCounter = 1; + for (ConsumerRecord<byte[], byte[]> record : records) { + assertEquals(tp.topic(), record.topic()); + assertEquals(tp.partition(), record.partition()); + assertEquals("Message " + messageCounter, new String(record.value())); + messageCounter++; + } + } + verifyShareGroupStateTopicRecordsProduced(); + } + + @ClusterTest + public void testAlterReadUncommittedToReadCommittedIsolationLevel() { + alterShareAutoOffsetReset("group1", "earliest"); + alterShareIsolationLevel("group1", "read_uncommitted"); + try (Producer<byte[], byte[]> transactionalProducer = createProducer("T1"); + ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) { + shareConsumer.subscribe(Set.of(tp.topic())); + transactionalProducer.initTransactions(); + try { + // First transaction is committed. + produceCommittedTransaction(transactionalProducer, "Message 1"); + + ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1); Review Comment: I don't think that this will force the test to wait for 2.5 sec. This is the maximum time to block. The method returns immediately if there are records available. Otherwise, it will await the passed timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org