FrankYang0529 commented on code in PR #19389: URL: https://github.com/apache/kafka/pull/19389#discussion_r2030191468
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java: ########## @@ -0,0 +1,680 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.test.MockConsumerInterceptor; + +import org.junit.jupiter.api.BeforeEach; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; +import static org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.BROKER_COUNT; +import static org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_PARTITIONS; +import static org.apache.kafka.clients.consumer.PlaintextConsumerCommitTest.OFFSETS_TOPIC_REPLICATION; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = OFFSETS_TOPIC_PARTITIONS), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = OFFSETS_TOPIC_REPLICATION), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + } +) +public class PlaintextConsumerCommitTest { + + public static final int BROKER_COUNT = 3; + public static final String OFFSETS_TOPIC_PARTITIONS = "1"; + public static final String OFFSETS_TOPIC_REPLICATION = "3"; + private final ClusterInstance cluster; + private final String topic = "topic"; + private final TopicPartition tp = new TopicPartition(topic, 0); + private final TopicPartition tp1 = new TopicPartition(topic, 1); + + public PlaintextConsumerCommitTest(ClusterInstance clusterInstance) { + this.cluster = clusterInstance; + } + + @BeforeEach + public void setup() throws InterruptedException { + cluster.createTopic(topic, 2, (short) BROKER_COUNT); + } + + @ClusterTest + public void testClassicConsumerAutoCommitOnClose() throws InterruptedException { + testAutoCommitOnClose(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerAutoCommitOnClose() throws InterruptedException { + testAutoCommitOnClose(GroupProtocol.CONSUMER); + } + + private void testAutoCommitOnClose(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol, true)) { + sendRecords(10000); + + consumer.subscribe(List.of(topic)); + awaitAssignment(consumer, Set.of(tp, tp1)); + // should auto-commit sought positions before closing + consumer.seek(tp, 300); + consumer.seek(tp1, 500); + } + + // now we should see the committed positions from another consumer + try (var anotherConsumer = createConsumer(groupProtocol, true)) { + assertEquals(300, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(500, anotherConsumer.committed(Set.of(tp1)).get(tp1).offset()); + } + } + + @ClusterTest + public void testClassicConsumerAutoCommitOnCloseAfterWakeup() throws InterruptedException { + testAutoCommitOnCloseAfterWakeup(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerAutoCommitOnCloseAfterWakeup() throws InterruptedException { + testAutoCommitOnCloseAfterWakeup(GroupProtocol.CONSUMER); + } + + private void testAutoCommitOnCloseAfterWakeup(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol, true)) { + sendRecords(10000); + + consumer.subscribe(List.of(topic)); + awaitAssignment(consumer, Set.of(tp, tp1)); + + // should auto-commit sought positions before closing + consumer.seek(tp, 300); + consumer.seek(tp1, 500); + + // wakeup the consumer before closing to simulate trying to break a poll + // loop from another thread + consumer.wakeup(); + } + + // now we should see the committed positions from another consumer + try (var anotherConsumer = createConsumer(groupProtocol, true)) { + assertEquals(300, anotherConsumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(500, anotherConsumer.committed(Set.of(tp1)).get(tp1).offset()); + } + } + + @ClusterTest + public void testClassicConsumerCommitMetadata() throws InterruptedException { + testCommitMetadata(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerCommitMetadata() throws InterruptedException { + testCommitMetadata(GroupProtocol.CONSUMER); + } + + private void testCommitMetadata(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol, true)) { + consumer.assign(List.of(tp)); + // sync commit + var syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo"); + consumer.commitSync(Map.of(tp, syncMetadata)); + assertEquals(syncMetadata, consumer.committed(Set.of(tp)).get(tp)); + + // async commit + var asyncMetadata = new OffsetAndMetadata(10, "bar"); + sendAndAwaitAsyncCommit(consumer, Optional.of(Map.of(tp, asyncMetadata))); + assertEquals(asyncMetadata, consumer.committed(Set.of(tp)).get(tp)); + + // handle null metadata + var nullMetadata = new OffsetAndMetadata(5, null); + consumer.commitSync(Map.of(tp, nullMetadata)); + assertEquals(nullMetadata, consumer.committed(Set.of(tp)).get(tp)); + } + } + + @ClusterTest + public void testClassicConsumerAsyncCommit() throws InterruptedException { + testAsyncCommit(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerAsyncCommit() throws InterruptedException { + testAsyncCommit(GroupProtocol.CONSUMER); + } + + private void testAsyncCommit(GroupProtocol groupProtocol) throws InterruptedException { + cluster.createTopic( + Topic.GROUP_METADATA_TOPIC_NAME, + Integer.parseInt(OFFSETS_TOPIC_PARTITIONS), + Short.parseShort(OFFSETS_TOPIC_REPLICATION) + ); + try (var consumer = createConsumer(groupProtocol, false)) { + consumer.assign(List.of(tp)); + + var callback = new CountConsumerCommitCallback(); + var count = 5; + for (var i = 1; i <= count; i++) + consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(i)), callback); + + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(100)); + return callback.successCount >= count || callback.lastError.isPresent(); + }, "Failed to observe commit callback before timeout"); + + assertEquals(Optional.empty(), callback.lastError); + assertEquals(count, callback.successCount); + assertEquals(new OffsetAndMetadata(count), consumer.committed(Set.of(tp)).get(tp)); + } + } + + @ClusterTest + public void testClassicConsumerAutoCommitIntercept() throws InterruptedException { + testAutoCommitIntercept(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerAutoCommitIntercept() throws InterruptedException { + testAutoCommitIntercept(GroupProtocol.CONSUMER); + } + + private void testAutoCommitIntercept(GroupProtocol groupProtocol) throws InterruptedException { + var topic2 = "topic2"; + cluster.createTopic(topic2, 2, (short) 3); + var numRecords = 100; + try (var producer = cluster.producer(); + // create consumer with interceptor + Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of( + GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT), + ENABLE_AUTO_COMMIT_CONFIG, "true", + INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor" + )) + ) { + // produce records + for (var i = 0; i < numRecords; i++) { + producer.send(new ProducerRecord<>(tp.topic(), tp.partition(), ("key " + i).getBytes(), ("value " + i).getBytes())); + } + + var rebalanceListener = new ConsumerRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + // keep partitions paused in this test so that we can verify the commits based on specific seeks + consumer.pause(partitions); + } + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + // No-op + } + }; + + changeConsumerSubscriptionAndValidateAssignment( + consumer, + List.of(topic), + Set.of(tp, tp1), + rebalanceListener + ); + consumer.seek(tp, 10); + consumer.seek(tp1, 20); + + // change subscription to trigger rebalance + var commitCountBeforeRebalance = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue(); + var expectedAssignment = Set.of(tp, tp1, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)); + changeConsumerSubscriptionAndValidateAssignment( + consumer, + List.of(topic, topic2), + expectedAssignment, + rebalanceListener + ); + + // after rebalancing, we should have reset to the committed positions + var committed1 = consumer.committed(Set.of(tp)); + assertEquals(10, committed1.get(tp).offset()); + var committed2 = consumer.committed(Set.of(tp1)); + assertEquals(20, committed2.get(tp1).offset()); + assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance); + + // In both CLASSIC and CONSUMER protocols, interceptors are executed in poll and close. + // However, in the CONSUMER protocol, the assignment may be changed outside a poll, so + // we need to poll once to ensure the interceptor is called. + if (groupProtocol == GroupProtocol.CONSUMER) { + consumer.poll(Duration.ZERO); + } + + // verify commits are intercepted on close + var commitCountBeforeClose = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue(); + consumer.close(); + assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeClose); + producer.close(); + // cleanup + MockConsumerInterceptor.resetCounters(); + } + } + + @ClusterTest + public void testClassicConsumerCommitSpecifiedOffsets() throws InterruptedException { + testCommitSpecifiedOffsets(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerCommitSpecifiedOffsets() throws InterruptedException { + testCommitSpecifiedOffsets(GroupProtocol.CONSUMER); + } + + private void testCommitSpecifiedOffsets(GroupProtocol groupProtocol) throws InterruptedException { + try (Producer<byte[], byte[]> producer = cluster.producer(); + var consumer = createConsumer(groupProtocol, false) + ) { + sendRecords(producer, 5, tp); + sendRecords(producer, 7, tp1); + + consumer.assign(List.of(tp, tp1)); + + var pos1 = consumer.position(tp); + var pos2 = consumer.position(tp1); + + consumer.commitSync(Map.of(tp, new OffsetAndMetadata(3L))); + + assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset()); + assertNull(consumer.committed(Collections.singleton(tp1)).get(tp1)); + + // Positions should not change + assertEquals(pos1, consumer.position(tp)); + assertEquals(pos2, consumer.position(tp1)); + + consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(5L))); + + assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(5, consumer.committed(Set.of(tp1)).get(tp1).offset()); + + // Using async should pick up the committed changes after commit completes + sendAndAwaitAsyncCommit(consumer, Optional.of(Map.of(tp1, new OffsetAndMetadata(7L)))); + assertEquals(7, consumer.committed(Collections.singleton(tp1)).get(tp1).offset()); + } + } + + @ClusterTest + public void testClassicConsumerAutoCommitOnRebalance() throws InterruptedException { + testAutoCommitOnRebalance(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerAutoCommitOnRebalance() throws InterruptedException { + testAutoCommitOnRebalance(GroupProtocol.CONSUMER); + } + + private void testAutoCommitOnRebalance(GroupProtocol groupProtocol) throws InterruptedException { + var topic2 = "topic2"; + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + try (var consumer = createConsumer(groupProtocol, true)) { + sendRecords(10000); + + var rebalanceListener = new ConsumerRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + // keep partitions paused in this test so that we can verify the commits based on specific seeks + consumer.pause(partitions); + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + + } + }; + + consumer.subscribe(List.of(topic), rebalanceListener); + awaitAssignment(consumer, Set.of(tp, tp1)); + + consumer.seek(tp, 300); + consumer.seek(tp1, 500); + // change subscription to trigger rebalance + consumer.subscribe(List.of(topic, topic2), rebalanceListener); + + var newAssignment = Set.of(tp, tp1, new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)); + awaitAssignment(consumer, newAssignment); + + // after rebalancing, we should have reset to the committed positions + assertEquals(300, consumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(500, consumer.committed(Set.of(tp1)).get(tp1).offset()); + } + } + + @ClusterTest + public void testClassicConsumerSubscribeAndCommitSync() throws InterruptedException { + testSubscribeAndCommitSync(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerSubscribeAndCommitSync() throws InterruptedException { + testSubscribeAndCommitSync(GroupProtocol.CONSUMER); + } + + private void testSubscribeAndCommitSync(GroupProtocol groupProtocol) throws InterruptedException { + // This test ensure that the member ID is propagated from the group coordinator when the + // assignment is received into a subsequent offset commit + try (var consumer = createConsumer(groupProtocol, false)) { + assertEquals(0, consumer.assignment().size()); + consumer.subscribe(List.of(topic)); + awaitAssignment(consumer, Set.of(tp, tp1)); + + consumer.seek(tp, 0); + consumer.commitSync(); + } + } + + @ClusterTest + public void testClassicConsumerPositionAndCommit() throws InterruptedException { + testPositionAndCommit(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerPositionAndCommit() throws InterruptedException { + testPositionAndCommit(GroupProtocol.CONSUMER); + } + + private void testPositionAndCommit(GroupProtocol groupProtocol) throws InterruptedException { + try (Producer<byte[], byte[]> producer = cluster.producer(); + var consumer = createConsumer(groupProtocol, false); + var otherConsumer = createConsumer(groupProtocol, false) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, 5, tp, startingTimestamp); + + var topicPartition = new TopicPartition(topic, 15); + assertNull(consumer.committed(Collections.singleton(topicPartition)).get(topicPartition)); + + // position() on a partition that we aren't subscribed to throws an exception + assertThrows(IllegalStateException.class, () -> consumer.position(topicPartition)); + + consumer.assign(List.of(tp)); + + assertEquals(0L, consumer.position(tp), "position() on a partition that we are subscribed to should reset the offset"); + consumer.commitSync(); + assertEquals(0L, consumer.committed(Set.of(tp)).get(tp).offset()); + consumeAndVerifyRecords(consumer, 5, 0, startingTimestamp); + assertEquals(5L, consumer.position(tp), "After consuming 5 records, position should be 5"); + consumer.commitSync(); + assertEquals(5L, consumer.committed(Set.of(tp)).get(tp).offset(), "Committed offset should be returned"); + + startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, 1, tp, startingTimestamp); + + // another consumer in the same group should get the same position + otherConsumer.assign(List.of(tp)); + consumeAndVerifyRecords(otherConsumer, 1, 5, startingTimestamp); + } + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ClusterTest + public void testCommitAsyncCompletedBeforeConsumerCloses() { + // This is testing the contract that asynchronous offset commit are completed before the consumer + // is closed, even when no commit sync is performed as part of the close (due to auto-commit + // disabled, or simply because there are no consumed offsets). + try (Producer<byte[], byte[]> producer = cluster.producer(Map.of(ProducerConfig.ACKS_CONFIG, "all")); + var consumer = createConsumer(GroupProtocol.CONSUMER, false) + ) { + sendRecords(producer, 3, tp); + sendRecords(producer, 3, tp1); + consumer.assign(List.of(tp, tp1)); + + // Try without looking up the coordinator first + var cb = new CountConsumerCommitCallback(); + consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb); + consumer.commitAsync(Map.of(tp1, new OffsetAndMetadata(1L)), cb); + + consumer.close(); + assertEquals(2, cb.successCount); + } + } + + // TODO: This only works in the new consumer, but should be fixed for the old consumer as well + @ClusterTest + public void testCommitAsyncCompletedBeforeCommitSyncReturns() { + // This is testing the contract that asynchronous offset commits sent previously with the + // `commitAsync` are guaranteed to have their callbacks invoked prior to completion of + // `commitSync` (given that it does not time out). + try (Producer<byte[], byte[]> producer = cluster.producer(); + var consumer = createConsumer(GroupProtocol.CONSUMER, false) + ) { + sendRecords(producer, 3, tp); + sendRecords(producer, 3, tp1); + + consumer.assign(List.of(tp, tp1)); + + // Try without looking up the coordinator first + var cb = new CountConsumerCommitCallback(); + consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), cb); + consumer.commitSync(Map.of()); + + assertEquals(1, consumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(1, cb.successCount); + + // Try with coordinator known + consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(2L)), cb); + consumer.commitSync(Map.of(tp1, new OffsetAndMetadata(2L))); + + assertEquals(2, consumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset()); + assertEquals(2, cb.successCount); + + // Try with empty sync commit + consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(3L)), cb); + consumer.commitSync(Map.of()); + + assertEquals(3, consumer.committed(Set.of(tp)).get(tp).offset()); + assertEquals(2, consumer.committed(Set.of(tp1)).get(tp1).offset()); + assertEquals(3, cb.successCount); + } + } + + private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol, boolean enableAutoCommit) { + return cluster.consumer(Map.of( + GROUP_ID_CONFIG, "test-group", + GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT), + ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit + )); + } + + private void sendRecords(int numRecords) { + try (Producer<byte[], byte[]> producer = cluster.producer()) { + sendRecords(producer, numRecords, tp, System.currentTimeMillis()); + } + } + + private void sendRecords( + Producer<byte[], byte[]> producer, + int numRecords, + TopicPartition topicPartition + ) { + sendRecords(producer, numRecords, topicPartition, System.currentTimeMillis()); + } + + private void sendRecords( + Producer<byte[], byte[]> producer, + int numRecords, + TopicPartition topicPartition, + long startingTimestamp + ) { + for (var i = 0; i < numRecords; i++) { + var timestamp = startingTimestamp + i; + var record = new ProducerRecord<>( + topicPartition.topic(), + topicPartition.partition(), + timestamp, + ("key " + i).getBytes(), + ("value " + i).getBytes() + ); + producer.send(record); + } + producer.flush(); + } + + private void awaitAssignment( + Consumer<byte[], byte[]> consumer, + Set<TopicPartition> expectedAssignment + ) throws InterruptedException { + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(100)); + return consumer.assignment().equals(expectedAssignment); + }, "Timed out while awaiting expected assignment " + expectedAssignment + ". " + + "The current assignment is " + consumer.assignment() + ); + } + + private void sendAndAwaitAsyncCommit( + Consumer<byte[], byte[]> consumer, + Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt Review Comment: It looks like all `offsetOpt` input is not empty. Do you think that we can remove `Optional<>` from `sendAndAwaitAsyncCommit` and `RetryCommitCallback`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org