FrankYang0529 commented on code in PR #19389:
URL: https://github.com/apache/kafka/pull/19389#discussion_r2030191468


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java:
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.MockConsumerInterceptor;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
+import static 
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.BROKER_COUNT;
+import static 
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_PARTITIONS;
+import static 
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_REPLICATION;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = BROKER_COUNT, 
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
OFFSETS_TOPIC_PARTITIONS),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = OFFSETS_TOPIC_REPLICATION),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+    }
+)
+public class PlaintextConsumerCommitTest {
+
+    public static final int BROKER_COUNT = 3;
+    public static final String OFFSETS_TOPIC_PARTITIONS = "1";
+    public static final String OFFSETS_TOPIC_REPLICATION = "3";
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+    private final TopicPartition tp1 = new TopicPartition(topic, 1);
+
+    public PlaintextConsumerCommitTest(ClusterInstance clusterInstance) {
+        this.cluster = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        cluster.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerAutoCommitOnClose() throws 
InterruptedException {
+        testAutoCommitOnClose(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitOnClose() throws 
InterruptedException {
+        testAutoCommitOnClose(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitOnClose(GroupProtocol groupProtocol) throws 
InterruptedException {
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            sendRecords(10000);
+
+            consumer.subscribe(List.of(topic));
+            awaitAssignment(consumer, Set.of(tp, tp1));
+            // should auto-commit sought positions before closing
+            consumer.seek(tp, 300);
+            consumer.seek(tp1, 500);
+        }
+
+        // now we should see the committed positions from another consumer
+        try (var anotherConsumer = createConsumer(groupProtocol, true)) {
+            assertEquals(300, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(500, 
anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
+        }
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerAutoCommitOnCloseAfterWakeup() throws 
InterruptedException {
+        testAutoCommitOnCloseAfterWakeup(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitOnCloseAfterWakeup() throws 
InterruptedException {
+        testAutoCommitOnCloseAfterWakeup(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitOnCloseAfterWakeup(GroupProtocol groupProtocol) 
throws InterruptedException {
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            sendRecords(10000);
+
+            consumer.subscribe(List.of(topic));
+            awaitAssignment(consumer, Set.of(tp, tp1));
+
+            // should auto-commit sought positions before closing
+            consumer.seek(tp, 300);
+            consumer.seek(tp1, 500);
+
+            // wakeup the consumer before closing to simulate trying to break 
a poll
+            // loop from another thread
+            consumer.wakeup();
+        }
+
+        // now we should see the committed positions from another consumer
+        try (var anotherConsumer = createConsumer(groupProtocol, true)) {
+            assertEquals(300, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(500, 
anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerCommitMetadata() throws 
InterruptedException {
+        testCommitMetadata(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerCommitMetadata() throws InterruptedException {
+        testCommitMetadata(GroupProtocol.CONSUMER);
+    }
+
+    private void testCommitMetadata(GroupProtocol groupProtocol) throws 
InterruptedException {
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            consumer.assign(List.of(tp));
+            // sync commit
+            var syncMetadata = new OffsetAndMetadata(5, Optional.of(15), 
"foo");
+            consumer.commitSync(Map.of(tp, syncMetadata));
+            assertEquals(syncMetadata, consumer.committed(Set.of(tp)).get(tp));
+
+            // async commit
+            var asyncMetadata = new OffsetAndMetadata(10, "bar");
+            sendAndAwaitAsyncCommit(consumer, Optional.of(Map.of(tp, 
asyncMetadata)));
+            assertEquals(asyncMetadata, 
consumer.committed(Set.of(tp)).get(tp));
+
+            // handle null metadata
+            var nullMetadata = new OffsetAndMetadata(5, null);
+            consumer.commitSync(Map.of(tp, nullMetadata));
+            assertEquals(nullMetadata, consumer.committed(Set.of(tp)).get(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAsyncCommit() throws InterruptedException {
+        testAsyncCommit(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAsyncCommit() throws InterruptedException {
+        testAsyncCommit(GroupProtocol.CONSUMER);
+    }
+
+    private void testAsyncCommit(GroupProtocol groupProtocol) throws 
InterruptedException {
+        cluster.createTopic(
+            Topic.GROUP_METADATA_TOPIC_NAME, 
+            Integer.parseInt(OFFSETS_TOPIC_PARTITIONS), 
+            Short.parseShort(OFFSETS_TOPIC_REPLICATION)
+        );
+        try (var consumer = createConsumer(groupProtocol, false)) {
+            consumer.assign(List.of(tp));
+
+            var callback = new CountConsumerCommitCallback();
+            var count = 5;
+            for (var i = 1; i <= count; i++) 
+                consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(i)), 
callback);
+
+            TestUtils.waitForCondition(() -> {
+                consumer.poll(Duration.ofMillis(100));
+                return callback.successCount >= count || 
callback.lastError.isPresent();
+            }, "Failed to observe commit callback before timeout");
+
+            assertEquals(Optional.empty(), callback.lastError);
+            assertEquals(count, callback.successCount);
+            assertEquals(new OffsetAndMetadata(count), 
consumer.committed(Set.of(tp)).get(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAutoCommitIntercept() throws 
InterruptedException {
+        testAutoCommitIntercept(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitIntercept() throws 
InterruptedException {
+        testAutoCommitIntercept(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitIntercept(GroupProtocol groupProtocol) throws 
InterruptedException {
+        var topic2 = "topic2";
+        cluster.createTopic(topic2, 2, (short) 3);
+        var numRecords = 100;
+        try (var producer = cluster.producer();
+             // create consumer with interceptor
+             Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(
+                 GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT),
+                 ENABLE_AUTO_COMMIT_CONFIG, "true",
+                 INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor"
+             ))
+        ) {
+            // produce records
+            for (var i = 0; i < numRecords; i++) {
+                producer.send(new ProducerRecord<>(tp.topic(), tp.partition(), 
("key " + i).getBytes(), ("value " + i).getBytes()));
+            }
+
+            var rebalanceListener = new ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // keep partitions paused in this test so that we can 
verify the commits based on specific seeks
+                    consumer.pause(partitions);
+                }
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    // No-op
+                }
+            };
+
+            changeConsumerSubscriptionAndValidateAssignment(
+                consumer, 
+                List.of(topic), 
+                Set.of(tp, tp1),
+                rebalanceListener
+            );
+            consumer.seek(tp, 10);
+            consumer.seek(tp1, 20);
+
+            // change subscription to trigger rebalance
+            var commitCountBeforeRebalance = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
+            var expectedAssignment = Set.of(tp, tp1, new 
TopicPartition(topic2, 0), new TopicPartition(topic2, 1)); 
+            changeConsumerSubscriptionAndValidateAssignment(
+                consumer, 
+                List.of(topic, topic2), 
+                expectedAssignment, 
+                rebalanceListener
+            );
+
+            // after rebalancing, we should have reset to the committed 
positions
+            var committed1 = consumer.committed(Set.of(tp));
+            assertEquals(10, committed1.get(tp).offset());
+            var committed2 = consumer.committed(Set.of(tp1));
+            assertEquals(20, committed2.get(tp1).offset());
+            assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeRebalance);
+
+            // In both CLASSIC and CONSUMER protocols, interceptors are 
executed in poll and close.
+            // However, in the CONSUMER protocol, the assignment may be 
changed outside a poll, so
+            // we need to poll once to ensure the interceptor is called.
+            if (groupProtocol == GroupProtocol.CONSUMER) {
+                consumer.poll(Duration.ZERO);
+            }
+            
+            // verify commits are intercepted on close
+            var commitCountBeforeClose = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
+            consumer.close();
+            assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeClose);
+            producer.close();
+            // cleanup
+            MockConsumerInterceptor.resetCounters();
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerCommitSpecifiedOffsets() throws 
InterruptedException {
+        testCommitSpecifiedOffsets(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerCommitSpecifiedOffsets() throws 
InterruptedException {
+        testCommitSpecifiedOffsets(GroupProtocol.CONSUMER);
+    }
+
+    private void testCommitSpecifiedOffsets(GroupProtocol groupProtocol) 
throws InterruptedException {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             var consumer = createConsumer(groupProtocol, false)
+        ) {
+            sendRecords(producer, 5, tp);
+            sendRecords(producer, 7, tp1);
+
+            consumer.assign(List.of(tp, tp1));
+
+            var pos1 = consumer.position(tp);
+            var pos2 = consumer.position(tp1);
+            
+            consumer.commitSync(Map.of(tp, new OffsetAndMetadata(3L)));
+
+            assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
+            
assertNull(consumer.committed(Collections.singleton(tp1)).get(tp1));
+
+            // Positions should not change
+            assertEquals(pos1, consumer.position(tp));
+            assertEquals(pos2, consumer.position(tp1));
+
+            consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(5L)));
+
+            assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(5, consumer.committed(Set.of(tp1)).get(tp1).offset());
+
+            // Using async should pick up the committed changes after commit 
completes
+            sendAndAwaitAsyncCommit(consumer, Optional.of(Map.of(tp1, new 
OffsetAndMetadata(7L))));
+            assertEquals(7, 
consumer.committed(Collections.singleton(tp1)).get(tp1).offset());
+        }
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerAutoCommitOnRebalance() throws 
InterruptedException {
+        testAutoCommitOnRebalance(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitOnRebalance() throws 
InterruptedException {
+        testAutoCommitOnRebalance(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitOnRebalance(GroupProtocol groupProtocol) throws 
InterruptedException {
+        var topic2 = "topic2";
+        cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            sendRecords(10000);
+            
+            var rebalanceListener = new ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // keep partitions paused in this test so that we can 
verify the commits based on specific seeks
+                    consumer.pause(partitions);
+                }
+
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    
+                }
+            };
+
+            consumer.subscribe(List.of(topic), rebalanceListener);
+            awaitAssignment(consumer, Set.of(tp, tp1));
+
+            consumer.seek(tp, 300);
+            consumer.seek(tp1, 500);
+            // change subscription to trigger rebalance
+            consumer.subscribe(List.of(topic, topic2), rebalanceListener);
+
+            var newAssignment = Set.of(tp, tp1, new TopicPartition(topic2, 0), 
new TopicPartition(topic2, 1));
+            awaitAssignment(consumer, newAssignment);
+
+            // after rebalancing, we should have reset to the committed 
positions
+            assertEquals(300, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(500, 
consumer.committed(Set.of(tp1)).get(tp1).offset());
+        }
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerSubscribeAndCommitSync() throws 
InterruptedException {
+        testSubscribeAndCommitSync(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerSubscribeAndCommitSync() throws 
InterruptedException {
+        testSubscribeAndCommitSync(GroupProtocol.CONSUMER);
+    }
+
+    private void testSubscribeAndCommitSync(GroupProtocol groupProtocol) 
throws InterruptedException {
+        // This test ensure that the member ID is propagated from the group 
coordinator when the
+        // assignment is received into a subsequent offset commit
+        try (var consumer = createConsumer(groupProtocol, false)) {
+            assertEquals(0, consumer.assignment().size());
+            consumer.subscribe(List.of(topic));
+            awaitAssignment(consumer, Set.of(tp, tp1));
+
+            consumer.seek(tp, 0);
+            consumer.commitSync();
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerPositionAndCommit() throws 
InterruptedException {
+        testPositionAndCommit(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerPositionAndCommit() throws 
InterruptedException {
+        testPositionAndCommit(GroupProtocol.CONSUMER);
+    }
+
+    private void testPositionAndCommit(GroupProtocol groupProtocol) throws 
InterruptedException {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             var consumer = createConsumer(groupProtocol, false);
+             var otherConsumer = createConsumer(groupProtocol, false)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, 5, tp, startingTimestamp);
+
+            var topicPartition = new TopicPartition(topic, 15);
+            
assertNull(consumer.committed(Collections.singleton(topicPartition)).get(topicPartition));
+
+            // position() on a partition that we aren't subscribed to throws 
an exception
+            assertThrows(IllegalStateException.class, () -> 
consumer.position(topicPartition));
+
+            consumer.assign(List.of(tp));
+
+            assertEquals(0L, consumer.position(tp), "position() on a partition 
that we are subscribed to should reset the offset");
+            consumer.commitSync();
+            assertEquals(0L, consumer.committed(Set.of(tp)).get(tp).offset());
+            consumeAndVerifyRecords(consumer, 5, 0, startingTimestamp);
+            assertEquals(5L, consumer.position(tp), "After consuming 5 
records, position should be 5");
+            consumer.commitSync();
+            assertEquals(5L, consumer.committed(Set.of(tp)).get(tp).offset(), 
"Committed offset should be returned");
+
+            startingTimestamp = System.currentTimeMillis();
+            sendRecords(producer, 1, tp, startingTimestamp);
+
+            // another consumer in the same group should get the same position
+            otherConsumer.assign(List.of(tp));
+            consumeAndVerifyRecords(otherConsumer, 1, 5, startingTimestamp);
+        }
+    }
+
+    // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+    @ClusterTest
+    public void testCommitAsyncCompletedBeforeConsumerCloses() {
+        // This is testing the contract that asynchronous offset commit are 
completed before the consumer
+        // is closed, even when no commit sync is performed as part of the 
close (due to auto-commit
+        // disabled, or simply because there are no consumed offsets).
+        try (Producer<byte[], byte[]> producer = 
cluster.producer(Map.of(ProducerConfig.ACKS_CONFIG, "all"));
+             var consumer = createConsumer(GroupProtocol.CONSUMER, false)
+        ) {
+            sendRecords(producer, 3, tp);
+            sendRecords(producer, 3, tp1);
+            consumer.assign(List.of(tp, tp1));
+
+            // Try without looking up the coordinator first
+            var cb = new CountConsumerCommitCallback();
+            consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb);
+            consumer.commitAsync(Map.of(tp1, new OffsetAndMetadata(1L)), cb);
+
+            consumer.close();
+            assertEquals(2, cb.successCount);
+        }
+    }
+
+    // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
+    @ClusterTest
+    public void testCommitAsyncCompletedBeforeCommitSyncReturns() {
+        // This is testing the contract that asynchronous offset commits sent 
previously with the
+        // `commitAsync` are guaranteed to have their callbacks invoked prior 
to completion of
+        // `commitSync` (given that it does not time out).
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             var consumer = createConsumer(GroupProtocol.CONSUMER, false)
+        ) {
+            sendRecords(producer, 3, tp);
+            sendRecords(producer, 3, tp1);
+
+            consumer.assign(List.of(tp, tp1));
+
+            // Try without looking up the coordinator first
+            var cb = new CountConsumerCommitCallback();
+            consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb);
+            consumer.commitSync(Map.of());
+
+            assertEquals(1, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(1, cb.successCount);
+
+            // Try with coordinator known
+            consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(2L)), cb);
+            consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(2L)));
+
+            assertEquals(2, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset());
+            assertEquals(2, cb.successCount);
+
+            // Try with empty sync commit
+            consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(3L)), cb);
+            consumer.commitSync(Map.of());
+
+            assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset());
+            assertEquals(3, cb.successCount);
+        }
+    }
+
+    private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol, 
boolean enableAutoCommit) {
+        return cluster.consumer(Map.of(
+            GROUP_ID_CONFIG, "test-group",
+            GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit
+        ));
+    }
+    
+    private void sendRecords(int numRecords) {
+        try (Producer<byte[], byte[]> producer = cluster.producer()) {
+            sendRecords(producer, numRecords, tp, System.currentTimeMillis());
+        }
+    }
+
+    private void sendRecords(
+        Producer<byte[], byte[]> producer,
+        int numRecords,
+        TopicPartition topicPartition
+    ) {
+        sendRecords(producer, numRecords, topicPartition, 
System.currentTimeMillis());
+    }
+
+    private void sendRecords(
+        Producer<byte[], byte[]> producer, 
+        int numRecords, 
+        TopicPartition topicPartition,
+        long startingTimestamp
+    ) {
+        for (var i = 0; i < numRecords; i++) {
+            var timestamp = startingTimestamp + i;
+            var record = new ProducerRecord<>(
+                topicPartition.topic(),
+                topicPartition.partition(),
+                timestamp,
+                ("key " + i).getBytes(),
+                ("value " + i).getBytes()
+            );
+            producer.send(record);
+        }
+        producer.flush();
+    }
+
+    private void awaitAssignment(
+        Consumer<byte[], byte[]> consumer,
+        Set<TopicPartition> expectedAssignment
+    ) throws InterruptedException {
+        TestUtils.waitForCondition(() -> {
+            consumer.poll(Duration.ofMillis(100));
+            return consumer.assignment().equals(expectedAssignment);
+        }, "Timed out while awaiting expected assignment " + 
expectedAssignment + ". " +
+            "The current assignment is " + consumer.assignment()
+        );
+    }
+
+    private void sendAndAwaitAsyncCommit(
+        Consumer<byte[], byte[]> consumer,
+        Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt

Review Comment:
   It looks like all `offsetOpt` input is not empty. Do you think that we can 
remove `Optional<>` from `sendAndAwaitAsyncCommit` and `RetryCommitCallback`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to