frankvicky commented on code in PR #19298: URL: https://github.com/apache/kafka/pull/19298#discussion_r2015873494
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java: ########## @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.callback; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.junit.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = 3 +) +@ExtendWith(ClusterTestExtensions.class) +public class PlaintextConsumerCallbackTest { + + private final ClusterInstance cluster; + private final String topic = "topic"; + private final TopicPartition tp = new TopicPartition(topic, 0); + + public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) { + this.cluster = clusterInstance; + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { Review Comment: You have `testAsyncConsumer...` above, so I suggest you also name it `testClassicConsumer...` also ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java: ########## @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.callback; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.junit.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = 3 +) +@ExtendWith(ClusterTestExtensions.class) +public class PlaintextConsumerCallbackTest { + + private final ClusterInstance cluster; + private final String topic = "topic"; + private final TopicPartition tp = new TopicPartition(topic, 0); + + public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) { + this.cluster = clusterInstance; + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertTrue(executeConsumer.assignment().contains(tp)) + ); + } + } + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertTrue(executeConsumer.assignment().contains(tp)) + ); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var map = executeConsumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var map = executeConsumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> consumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> consumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, + (consumer, partitions) -> assertTrue(consumer.assignment().contains(tp)) + ); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, + (consumer, partitions) -> assertTrue(consumer.assignment().contains(tp)) + ); + } + + @ClusterTest + public void testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> { + var map = consumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> { + var map = consumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + + @ClusterTest + public void testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertDoesNotThrow(() -> executeConsumer.position(tp)) + ); + } + } + + @ClusterTest + public void testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertDoesNotThrow(() -> executeConsumer.position(tp)) + ); + } + } + + @ClusterTest + public void testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + var startingOffset = 100L; + var totalRecords = 120; + var startingTimestamp = 0L; + + sendRecords(totalRecords, startingTimestamp); + + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + executeConsumer.seek(tp, startingOffset); + executeConsumer.pause(Collections.singletonList(tp)); + }); + + assertTrue(consumer.paused().contains(tp)); + consumer.resume(Collections.singletonList(tp)); + consumeAndVerifyRecords( + consumer, + (int) (totalRecords - startingOffset), + (int) startingOffset, + (int) startingOffset, + startingOffset + ); + } + } + + @ClusterTest + public void testAsyncConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + var startingOffset = 100L; + var totalRecords = 120; + var startingTimestamp = 0L; + + sendRecords(totalRecords, startingTimestamp); + + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + executeConsumer.seek(tp, startingOffset); + executeConsumer.pause(Collections.singletonList(tp)); + }); + + assertTrue(consumer.paused().contains(tp)); + consumer.resume(Collections.singletonList(tp)); + consumeAndVerifyRecords( + consumer, + (int) (totalRecords - startingOffset), + (int) startingOffset, + (int) startingOffset, + startingOffset + ); + } + } + + private void triggerOnPartitionsAssigned( + TopicPartition tp, + Consumer<byte[], byte[]> consumer, + BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> execute + ) throws InterruptedException { + var partitionsAssigned = new AtomicBoolean(false); + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + // Make sure the partition used in the test is actually assigned before continuing. + if (partitions.contains(tp)) { + execute.accept(consumer, partitions); + partitionsAssigned.set(true); + } + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + // noop + } + }); + TestUtils.waitForCondition( + () -> { + consumer.poll(Duration.ofMillis(100)); + return partitionsAssigned.get(); + }, + "Timed out before expected rebalance completed" + ); + } + + private void triggerOnPartitionsRevoked( + TopicPartition tp, + GroupProtocol protocol, + BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> execute + ) throws InterruptedException { + var partitionsAssigned = new AtomicBoolean(false); + var partitionsRevoked = new AtomicBoolean(false); + try (var consumer = createConsumer(protocol)) { + consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + // Make sure the partition used in the test is actually assigned before continuing. + if (partitions.contains(tp)) { + partitionsAssigned.set(true); + } + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + // Make sure the partition used in the test is actually revoked before continuing. + if (partitions.contains(tp)) { + execute.accept(consumer, partitions); + partitionsRevoked.set(true); + } + } + }); + TestUtils.waitForCondition( + () -> { + consumer.poll(Duration.ofMillis(100)); + return partitionsAssigned.get(); + }, + "Timed out before expected rebalance completed" + ); + } + assertTrue(partitionsRevoked.get()); + } + + private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol) { + return cluster.consumer(Map.of( + GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT), + ENABLE_AUTO_COMMIT_CONFIG, "false" + )); + } + + private void sendRecords(int numRecords, long startingTimestamp) { + try (Producer<byte[], byte[]> producer = cluster.producer()) { + for (int i = 0; i < numRecords; i++) { + long timestamp = startingTimestamp + i; + var record = new ProducerRecord<>( + tp.topic(), + tp.partition(), + timestamp, + ("key " + i).getBytes(), + ("value " + i).getBytes() + ); + producer.send(record); + } + producer.flush(); + } + } + + protected void consumeAndVerifyRecords( + Consumer<byte[], byte[]> consumer, + int numRecords, + int startingOffset, + int startingKeyAndValueIndex, + long startingTimestamp + ) throws InterruptedException { + var records = consumeRecords(consumer, numRecords); + for (var i = 0; i < numRecords; i++) { + var record = records.get(i); + int offset = startingOffset + i; Review Comment: I suggest keeping the style consistent. `var` ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java: ########## @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.callback; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.junit.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = 3 +) +@ExtendWith(ClusterTestExtensions.class) +public class PlaintextConsumerCallbackTest { + + private final ClusterInstance cluster; + private final String topic = "topic"; + private final TopicPartition tp = new TopicPartition(topic, 0); + + public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) { + this.cluster = clusterInstance; + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertTrue(executeConsumer.assignment().contains(tp)) + ); + } + } + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertTrue(executeConsumer.assignment().contains(tp)) + ); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var map = executeConsumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var map = executeConsumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> consumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> consumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, + (consumer, partitions) -> assertTrue(consumer.assignment().contains(tp)) + ); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, + (consumer, partitions) -> assertTrue(consumer.assignment().contains(tp)) + ); + } + + @ClusterTest + public void testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> { + var map = consumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> { + var map = consumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + + @ClusterTest + public void testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertDoesNotThrow(() -> executeConsumer.position(tp)) + ); + } + } + + @ClusterTest + public void testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertDoesNotThrow(() -> executeConsumer.position(tp)) + ); + } + } + + @ClusterTest + public void testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + var startingOffset = 100L; + var totalRecords = 120; + var startingTimestamp = 0L; + + sendRecords(totalRecords, startingTimestamp); + + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + executeConsumer.seek(tp, startingOffset); + executeConsumer.pause(Collections.singletonList(tp)); + }); + + assertTrue(consumer.paused().contains(tp)); + consumer.resume(Collections.singletonList(tp)); + consumeAndVerifyRecords( + consumer, + (int) (totalRecords - startingOffset), + (int) startingOffset, + (int) startingOffset, + startingOffset + ); + } + } + + @ClusterTest + public void testAsyncConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + var startingOffset = 100L; + var totalRecords = 120; + var startingTimestamp = 0L; + + sendRecords(totalRecords, startingTimestamp); + + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + executeConsumer.seek(tp, startingOffset); + executeConsumer.pause(Collections.singletonList(tp)); + }); + + assertTrue(consumer.paused().contains(tp)); + consumer.resume(Collections.singletonList(tp)); + consumeAndVerifyRecords( + consumer, + (int) (totalRecords - startingOffset), + (int) startingOffset, + (int) startingOffset, + startingOffset + ); + } + } + + private void triggerOnPartitionsAssigned( + TopicPartition tp, + Consumer<byte[], byte[]> consumer, + BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> execute + ) throws InterruptedException { + var partitionsAssigned = new AtomicBoolean(false); + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + // Make sure the partition used in the test is actually assigned before continuing. + if (partitions.contains(tp)) { + execute.accept(consumer, partitions); + partitionsAssigned.set(true); + } + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + // noop + } + }); + TestUtils.waitForCondition( + () -> { + consumer.poll(Duration.ofMillis(100)); + return partitionsAssigned.get(); + }, + "Timed out before expected rebalance completed" + ); + } + + private void triggerOnPartitionsRevoked( + TopicPartition tp, + GroupProtocol protocol, + BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> execute + ) throws InterruptedException { + var partitionsAssigned = new AtomicBoolean(false); + var partitionsRevoked = new AtomicBoolean(false); + try (var consumer = createConsumer(protocol)) { + consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + // Make sure the partition used in the test is actually assigned before continuing. + if (partitions.contains(tp)) { + partitionsAssigned.set(true); + } + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + // Make sure the partition used in the test is actually revoked before continuing. + if (partitions.contains(tp)) { + execute.accept(consumer, partitions); + partitionsRevoked.set(true); + } + } + }); + TestUtils.waitForCondition( + () -> { + consumer.poll(Duration.ofMillis(100)); + return partitionsAssigned.get(); + }, + "Timed out before expected rebalance completed" + ); + } + assertTrue(partitionsRevoked.get()); + } + + private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol) { + return cluster.consumer(Map.of( + GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT), + ENABLE_AUTO_COMMIT_CONFIG, "false" + )); + } + + private void sendRecords(int numRecords, long startingTimestamp) { + try (Producer<byte[], byte[]> producer = cluster.producer()) { + for (int i = 0; i < numRecords; i++) { + long timestamp = startingTimestamp + i; + var record = new ProducerRecord<>( + tp.topic(), + tp.partition(), + timestamp, + ("key " + i).getBytes(), + ("value " + i).getBytes() + ); + producer.send(record); + } + producer.flush(); + } + } + + protected void consumeAndVerifyRecords( + Consumer<byte[], byte[]> consumer, + int numRecords, + int startingOffset, + int startingKeyAndValueIndex, + long startingTimestamp + ) throws InterruptedException { + var records = consumeRecords(consumer, numRecords); + for (var i = 0; i < numRecords; i++) { + var record = records.get(i); + int offset = startingOffset + i; + + assertEquals(tp.topic(), record.topic()); + assertEquals(tp.partition(), record.partition()); + + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + var timestamp = startingTimestamp + i; + assertEquals(timestamp, record.timestamp()); + + assertEquals(offset, record.offset()); + var keyAndValueIndex = startingKeyAndValueIndex + i; + assertEquals("key " + keyAndValueIndex, new String(record.key())); + assertEquals("value " + keyAndValueIndex, new String(record.value())); + // this is true only because K and V are byte arrays + assertEquals(("key " + keyAndValueIndex).length(), record.serializedKeySize()); + assertEquals(("value " + keyAndValueIndex).length(), record.serializedValueSize()); + } + } + + protected <K, V> List<ConsumerRecord<K, V>> consumeRecords( + Consumer<K, V> consumer, + int numRecords + ) throws InterruptedException { + List<ConsumerRecord<K, V>> records = new ArrayList<>(); + TestUtils.waitForCondition(() -> { + var polledRecords = consumer.poll(Duration.ofMillis(100)); + for (var record : polledRecords) { + records.add(record); + } Review Comment: nit: `forEach` ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java: ########## @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.callback; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.junit.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = 3 +) +@ExtendWith(ClusterTestExtensions.class) +public class PlaintextConsumerCallbackTest { + + private final ClusterInstance cluster; + private final String topic = "topic"; + private final TopicPartition tp = new TopicPartition(topic, 0); + + public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) { + this.cluster = clusterInstance; + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertTrue(executeConsumer.assignment().contains(tp)) + ); + } + } + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertTrue(executeConsumer.assignment().contains(tp)) + ); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var map = executeConsumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var map = executeConsumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> consumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> consumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + + @ClusterTest + public void testConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, + (consumer, partitions) -> assertTrue(consumer.assignment().contains(tp)) + ); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, + (consumer, partitions) -> assertTrue(consumer.assignment().contains(tp)) + ); + } + + @ClusterTest + public void testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> { + var map = consumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> { + var map = consumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + + @ClusterTest + public void testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertDoesNotThrow(() -> executeConsumer.position(tp)) + ); + } + } + + @ClusterTest + public void testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CONSUMER)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertDoesNotThrow(() -> executeConsumer.position(tp)) + ); + } + } + + @ClusterTest + public void testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + try (var consumer = createConsumer(CLASSIC)) { + var startingOffset = 100L; + var totalRecords = 120; + var startingTimestamp = 0L; + + sendRecords(totalRecords, startingTimestamp); + + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + executeConsumer.seek(tp, startingOffset); + executeConsumer.pause(Collections.singletonList(tp)); + }); + + assertTrue(consumer.paused().contains(tp)); + consumer.resume(Collections.singletonList(tp)); Review Comment: `List.of` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org