FrankYang0529 commented on code in PR #19389: URL: https://github.com/apache/kafka/pull/19389#discussion_r2030193847
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java: ########## @@ -0,0 +1,680 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.test.MockConsumerInterceptor; + +import org.junit.jupiter.api.BeforeEach; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; +import static org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.BROKER_COUNT; +import static org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_PARTITIONS; +import static org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_REPLICATION; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = OFFSETS_TOPIC_PARTITIONS), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = OFFSETS_TOPIC_REPLICATION), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + } +) +public class PlaintextConsumerCommitTest { + + public static final int BROKER_COUNT = 3; + public static final String OFFSETS_TOPIC_PARTITIONS = "1"; + public static final String OFFSETS_TOPIC_REPLICATION = "3"; + private final ClusterInstance cluster; + private final String topic = "topic"; + private final TopicPartition tp = new TopicPartition(topic, 0); + private final TopicPartition tp1 = new TopicPartition(topic, 1); + + public PlaintextConsumerCommitTest(ClusterInstance clusterInstance) { + this.cluster = clusterInstance; + } + + @BeforeEach + public void setup() throws InterruptedException { + cluster.createTopic(topic, 2, (short) BROKER_COUNT); + } + + @ClusterTest + public void testClassicConsumerAutoCommitOnClose() throws InterruptedException { + testAutoCommitOnClose(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerAutoCommitOnClose() throws InterruptedException { + testAutoCommitOnClose(GroupProtocol.CONSUMER); + } + + private void testAutoCommitOnClose(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol, true)) { + sendRecords(10000); + + consumer.subscribe(List.of(topic)); + awaitAssignment(consumer, Set.of(tp, tp1)); + // should auto-commit sought positions before closing + consumer.seek(tp, 300); + consumer.seek(tp1, 500); + } + + // now we should see the committed positions from another consumer + try (var anotherConsumer = createConsumer(groupProtocol, true)) { + assertEquals(300, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(500, anotherConsumer.committed(Set.of(tp1)).get(tp1).offset()); + } + } + + @ClusterTest + public void testClassicConsumerAutoCommitOnCloseAfterWakeup() throws InterruptedException { + testAutoCommitOnCloseAfterWakeup(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerAutoCommitOnCloseAfterWakeup() throws InterruptedException { + testAutoCommitOnCloseAfterWakeup(GroupProtocol.CONSUMER); + } + + private void testAutoCommitOnCloseAfterWakeup(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol, true)) { + sendRecords(10000); + + consumer.subscribe(List.of(topic)); + awaitAssignment(consumer, Set.of(tp, tp1)); + + // should auto-commit sought positions before closing + consumer.seek(tp, 300); + consumer.seek(tp1, 500); + + // wakeup the consumer before closing to simulate trying to break a poll + // loop from another thread + consumer.wakeup(); + } + + // now we should see the committed positions from another consumer + try (var anotherConsumer = createConsumer(groupProtocol, true)) { + assertEquals(300, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(500, anotherConsumer.committed(Set.of(tp1)).get(tp1).offset()); + } + } + + @ClusterTest + public void testClassicConsumerCommitMetadata() throws InterruptedException { + testCommitMetadata(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerCommitMetadata() throws InterruptedException { + testCommitMetadata(GroupProtocol.CONSUMER); + } + + private void testCommitMetadata(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol, true)) { + consumer.assign(List.of(tp)); + // sync commit + var syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo"); + consumer.commitSync(Map.of(tp, syncMetadata)); + assertEquals(syncMetadata, consumer.committed(Set.of(tp)).get(tp)); + + // async commit + var asyncMetadata = new OffsetAndMetadata(10, "bar"); + sendAndAwaitAsyncCommit(consumer, Optional.of(Map.of(tp, asyncMetadata))); + assertEquals(asyncMetadata, consumer.committed(Set.of(tp)).get(tp)); + + // handle null metadata + var nullMetadata = new OffsetAndMetadata(5, null); + consumer.commitSync(Map.of(tp, nullMetadata)); + assertEquals(nullMetadata, consumer.committed(Set.of(tp)).get(tp)); + } + } + + @ClusterTest + public void testClassicConsumerAsyncCommit() throws InterruptedException { + testAsyncCommit(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerAsyncCommit() throws InterruptedException { + testAsyncCommit(GroupProtocol.CONSUMER); + } + + private void testAsyncCommit(GroupProtocol groupProtocol) throws InterruptedException { + cluster.createTopic( + Topic.GROUP_METADATA_TOPIC_NAME, + Integer.parseInt(OFFSETS_TOPIC_PARTITIONS), + Short.parseShort(OFFSETS_TOPIC_REPLICATION) + ); Review Comment: Could you add some comments about why we need to manually create internal topic here? It looks like we don't set `AUTO_CREATE_TOPICS_ENABLE_CONFIG` as `false`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org