chia7712 commented on code in PR #19651:
URL: https://github.com/apache/kafka/pull/19651#discussion_r2114139043


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java:
##########
@@ -226,4 +249,205 @@ var record = new ProducerRecord<>(
         producer.send(record);
         return record;
     }
+
+    public static void sendAndAwaitAsyncCommit(
+        Consumer<byte[], byte[]> consumer,
+        Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt
+    ) throws InterruptedException {
+
+        var commitCallback = new RetryCommitCallback(consumer, offsetsOpt);
+        sendAsyncCommit(consumer, commitCallback, offsetsOpt);
+
+        TestUtils.waitForCondition(() -> {
+            consumer.poll(Duration.ofMillis(100));
+            return commitCallback.isComplete;
+        }, "Failed to observe commit callback before timeout");
+
+        assertEquals(Optional.empty(), commitCallback.error);
+    }
+
+    public static void awaitRebalance(
+        Consumer<byte[], byte[]> consumer,
+        TestConsumerReassignmentListener rebalanceListener
+    ) throws InterruptedException {
+        var numReassignments = rebalanceListener.callsToAssigned;
+        TestUtils.waitForCondition(() -> {
+            consumer.poll(Duration.ofMillis(100));
+            return rebalanceListener.callsToAssigned > numReassignments;
+        }, "Timed out before expected rebalance completed");
+    }
+
+    public static void ensureNoRebalance(
+        Consumer<byte[], byte[]> consumer,
+        TestConsumerReassignmentListener rebalanceListener
+    ) throws InterruptedException {
+        // The best way to verify that the current membership is still active 
is to commit offsets.
+        // This would fail if the group had rebalanced.
+        var initialRevokeCalls = rebalanceListener.callsToRevoked;
+        sendAndAwaitAsyncCommit(consumer, Optional.empty());
+        assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked);
+    }
+
+    /**
+     * This class is intended to replace the test cases in 
BaseConsumerTest.scala.
+     * When converting tests that extend from BaseConsumerTest.scala to Java,
+     * we should use the test cases provided in this class.
+     */
+    public static final class BaseConsumerTestcase {
+
+        public static final int BROKER_COUNT = 3;
+        public static final String TOPIC = "topic";
+        public static final TopicPartition TP = new TopicPartition(TOPIC, 0);
+
+        private BaseConsumerTestcase() {
+        }
+
+        public static void testSimpleConsumption(
+            ClusterInstance cluster,
+            Map<String, Object> config
+        ) throws InterruptedException {
+            var numRecords = 10000;
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(cluster, TP, numRecords, startingTimestamp);
+            try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) 
{
+                assertEquals(0, consumer.assignment().size());
+                consumer.assign(List.of(TP));
+                assertEquals(1, consumer.assignment().size());
+                consumer.seek(TP, 0);
+                consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+                // check async commit callbacks
+                sendAndAwaitAsyncCommit(consumer, Optional.empty());
+            }
+        }
+
+        public static void testClusterResourceListener(
+            ClusterInstance cluster,
+            Map<String, Object> consumerConfig
+        ) throws InterruptedException {
+            var numRecords = 100;
+            Map<String, Object> producerConfig = Map.of(
+                KEY_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class,
+                VALUE_SERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerSerializer.class
+            );
+            Map<String, Object> consumerConfigOverrides = new 
HashMap<>(consumerConfig);
+            consumerConfigOverrides.put(KEY_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+            consumerConfigOverrides.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
TestClusterResourceListenerDeserializer.class);
+            try (Producer<byte[], byte[]> producer = 
cluster.producer(producerConfig);
+                 Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfigOverrides)
+            ) {
+                var startingTimestamp = System.currentTimeMillis();
+                sendRecords(producer, TP, numRecords, startingTimestamp, -1);
+
+                consumer.subscribe(List.of(TP.topic()));
+                consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, 
startingTimestamp);
+                assertNotEquals(0, UPDATE_PRODUCER_COUNT.get());

Review Comment:
   Should we reset those counts to avoid getting corrupted by other tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to