FrankYang0529 commented on code in PR #19389:
URL: https://github.com/apache/kafka/pull/19389#discussion_r2030193847


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java:
##########
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.MockConsumerInterceptor;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG;
+import static 
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.BROKER_COUNT;
+import static 
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_PARTITIONS;
+import static 
org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_REPLICATION;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = BROKER_COUNT, 
+    serverProperties = {
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
OFFSETS_TOPIC_PARTITIONS),
+        @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = OFFSETS_TOPIC_REPLICATION),
+        @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+    }
+)
+public class PlaintextConsumerCommitTest {
+
+    public static final int BROKER_COUNT = 3;
+    public static final String OFFSETS_TOPIC_PARTITIONS = "1";
+    public static final String OFFSETS_TOPIC_REPLICATION = "3";
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+    private final TopicPartition tp1 = new TopicPartition(topic, 1);
+
+    public PlaintextConsumerCommitTest(ClusterInstance clusterInstance) {
+        this.cluster = clusterInstance;
+    }
+
+    @BeforeEach
+    public void setup() throws InterruptedException {
+        cluster.createTopic(topic, 2, (short) BROKER_COUNT);
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerAutoCommitOnClose() throws 
InterruptedException {
+        testAutoCommitOnClose(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitOnClose() throws 
InterruptedException {
+        testAutoCommitOnClose(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitOnClose(GroupProtocol groupProtocol) throws 
InterruptedException {
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            sendRecords(10000);
+
+            consumer.subscribe(List.of(topic));
+            awaitAssignment(consumer, Set.of(tp, tp1));
+            // should auto-commit sought positions before closing
+            consumer.seek(tp, 300);
+            consumer.seek(tp1, 500);
+        }
+
+        // now we should see the committed positions from another consumer
+        try (var anotherConsumer = createConsumer(groupProtocol, true)) {
+            assertEquals(300, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(500, 
anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
+        }
+    }
+    
+    @ClusterTest
+    public void testClassicConsumerAutoCommitOnCloseAfterWakeup() throws 
InterruptedException {
+        testAutoCommitOnCloseAfterWakeup(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAutoCommitOnCloseAfterWakeup() throws 
InterruptedException {
+        testAutoCommitOnCloseAfterWakeup(GroupProtocol.CONSUMER);
+    }
+
+    private void testAutoCommitOnCloseAfterWakeup(GroupProtocol groupProtocol) 
throws InterruptedException {
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            sendRecords(10000);
+
+            consumer.subscribe(List.of(topic));
+            awaitAssignment(consumer, Set.of(tp, tp1));
+
+            // should auto-commit sought positions before closing
+            consumer.seek(tp, 300);
+            consumer.seek(tp1, 500);
+
+            // wakeup the consumer before closing to simulate trying to break 
a poll
+            // loop from another thread
+            consumer.wakeup();
+        }
+
+        // now we should see the committed positions from another consumer
+        try (var anotherConsumer = createConsumer(groupProtocol, true)) {
+            assertEquals(300, 
anotherConsumer.committed(Set.of(tp)).get(tp).offset());
+            assertEquals(500, 
anotherConsumer.committed(Set.of(tp1)).get(tp1).offset());
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerCommitMetadata() throws 
InterruptedException {
+        testCommitMetadata(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerCommitMetadata() throws InterruptedException {
+        testCommitMetadata(GroupProtocol.CONSUMER);
+    }
+
+    private void testCommitMetadata(GroupProtocol groupProtocol) throws 
InterruptedException {
+        try (var consumer = createConsumer(groupProtocol, true)) {
+            consumer.assign(List.of(tp));
+            // sync commit
+            var syncMetadata = new OffsetAndMetadata(5, Optional.of(15), 
"foo");
+            consumer.commitSync(Map.of(tp, syncMetadata));
+            assertEquals(syncMetadata, consumer.committed(Set.of(tp)).get(tp));
+
+            // async commit
+            var asyncMetadata = new OffsetAndMetadata(10, "bar");
+            sendAndAwaitAsyncCommit(consumer, Optional.of(Map.of(tp, 
asyncMetadata)));
+            assertEquals(asyncMetadata, 
consumer.committed(Set.of(tp)).get(tp));
+
+            // handle null metadata
+            var nullMetadata = new OffsetAndMetadata(5, null);
+            consumer.commitSync(Map.of(tp, nullMetadata));
+            assertEquals(nullMetadata, consumer.committed(Set.of(tp)).get(tp));
+        }
+    }
+
+    @ClusterTest
+    public void testClassicConsumerAsyncCommit() throws InterruptedException {
+        testAsyncCommit(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerAsyncCommit() throws InterruptedException {
+        testAsyncCommit(GroupProtocol.CONSUMER);
+    }
+
+    private void testAsyncCommit(GroupProtocol groupProtocol) throws 
InterruptedException {
+        cluster.createTopic(
+            Topic.GROUP_METADATA_TOPIC_NAME, 
+            Integer.parseInt(OFFSETS_TOPIC_PARTITIONS), 
+            Short.parseShort(OFFSETS_TOPIC_REPLICATION)
+        );

Review Comment:
   Could you add some comments about why we need to manually create internal 
topic here? It looks like we don't set `AUTO_CREATE_TOPICS_ENABLE_CONFIG` as 
`false`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to