frankvicky commented on code in PR #19298:
URL: https://github.com/apache/kafka/pull/19298#discussion_r2015873494


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.callback;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = 3
+)
+@ExtendWith(ClusterTestExtensions.class)
+public class PlaintextConsumerCallbackTest {
+
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+
+    public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) {
+        this.cluster = clusterInstance;
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+                assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+                assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() 
throws InterruptedException {

Review Comment:
   You have `testAsyncConsumer...` above, so I suggest you also name it 
`testClassicConsumer...` also



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.callback;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = 3
+)
+@ExtendWith(ClusterTestExtensions.class)
+public class PlaintextConsumerCallbackTest {
+
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+
+    public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) {
+        this.cluster = clusterInstance;
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+                assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+                assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> 
assertTrue(executeConsumer.assignment().contains(tp))
+            );
+        }
+    }
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws 
InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> 
assertTrue(executeConsumer.assignment().contains(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws 
InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var map = executeConsumer.beginningOffsets(List.of(tp));
+                assertTrue(map.containsKey(tp));
+                assertEquals(0L, map.get(tp));
+            });
+        }
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws 
InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var map = executeConsumer.beginningOffsets(List.of(tp));
+                assertTrue(map.containsKey(tp));
+                assertEquals(0L, map.get(tp));
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignOnPartitionsRevoked() 
throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> {
+            var e = assertThrows(IllegalStateException.class, () -> 
consumer.assign(List.of(tp)));
+            assertEquals("Subscription to topics, partitions and pattern are 
mutually exclusive", e.getMessage());
+        });
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerRebalanceListenerAssignOnPartitionsRevoked() 
throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> {
+            var e = assertThrows(IllegalStateException.class, () -> 
consumer.assign(List.of(tp)));
+            assertEquals("Subscription to topics, partitions and pattern are 
mutually exclusive", e.getMessage());
+        });
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignmentOnPartitionsRevoked() 
throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC,
+            (consumer, partitions) -> 
assertTrue(consumer.assignment().contains(tp))
+        );
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER,
+            (consumer, partitions) -> 
assertTrue(consumer.assignment().contains(tp))
+        );
+    }
+
+    @ClusterTest
+    public void 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> {
+            var map = consumer.beginningOffsets(List.of(tp));
+            assertTrue(map.containsKey(tp));
+            assertEquals(0L, map.get(tp));
+        });
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> {
+            var map = consumer.beginningOffsets(List.of(tp));
+            assertTrue(map.containsKey(tp));
+            assertEquals(0L, map.get(tp));
+        });
+    }
+
+    @ClusterTest
+    public void 
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws 
InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> assertDoesNotThrow(() -> 
executeConsumer.position(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> assertDoesNotThrow(() -> 
executeConsumer.position(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            var startingOffset = 100L;
+            var totalRecords = 120;
+            var startingTimestamp = 0L;
+
+            sendRecords(totalRecords, startingTimestamp);
+
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                executeConsumer.seek(tp, startingOffset);
+                executeConsumer.pause(Collections.singletonList(tp));
+            });
+
+            assertTrue(consumer.paused().contains(tp));
+            consumer.resume(Collections.singletonList(tp));
+            consumeAndVerifyRecords(
+                consumer,
+                (int) (totalRecords - startingOffset),
+                (int) startingOffset,
+                (int) startingOffset,
+                startingOffset
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            var startingOffset = 100L;
+            var totalRecords = 120;
+            var startingTimestamp = 0L;
+
+            sendRecords(totalRecords, startingTimestamp);
+
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                executeConsumer.seek(tp, startingOffset);
+                executeConsumer.pause(Collections.singletonList(tp));
+            });
+
+            assertTrue(consumer.paused().contains(tp));
+            consumer.resume(Collections.singletonList(tp));
+            consumeAndVerifyRecords(
+                consumer,
+                (int) (totalRecords - startingOffset),
+                (int) startingOffset,
+                (int) startingOffset,
+                startingOffset
+            );
+        }
+    }
+
+    private void triggerOnPartitionsAssigned(
+        TopicPartition tp,
+        Consumer<byte[], byte[]> consumer,
+        BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> 
execute
+    ) throws InterruptedException {
+        var partitionsAssigned = new AtomicBoolean(false);
+        consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                // Make sure the partition used in the test is actually 
assigned before continuing.
+                if (partitions.contains(tp)) {
+                    execute.accept(consumer, partitions);
+                    partitionsAssigned.set(true);
+                }
+            }
+
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                // noop
+            }
+        });
+        TestUtils.waitForCondition(
+            () -> {
+                consumer.poll(Duration.ofMillis(100));
+                return partitionsAssigned.get();
+            },
+            "Timed out before expected rebalance completed"
+        );
+    }
+
+    private void triggerOnPartitionsRevoked(
+        TopicPartition tp,
+        GroupProtocol protocol,
+        BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> 
execute
+    ) throws InterruptedException {
+        var partitionsAssigned = new AtomicBoolean(false);
+        var partitionsRevoked = new AtomicBoolean(false);
+        try (var consumer = createConsumer(protocol)) {
+            consumer.subscribe(Collections.singletonList(topic), new 
ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // Make sure the partition used in the test is actually 
assigned before continuing.
+                    if (partitions.contains(tp)) {
+                        partitionsAssigned.set(true);
+                    }
+                }
+
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    // Make sure the partition used in the test is actually 
revoked before continuing.
+                    if (partitions.contains(tp)) {
+                        execute.accept(consumer, partitions);
+                        partitionsRevoked.set(true);
+                    }
+                }
+            });
+            TestUtils.waitForCondition(
+                () -> {
+                    consumer.poll(Duration.ofMillis(100));
+                    return partitionsAssigned.get();
+                },
+                "Timed out before expected rebalance completed"
+            );
+        }
+        assertTrue(partitionsRevoked.get());
+    }
+
+    private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol) {
+        return cluster.consumer(Map.of(
+            GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, "false"
+        ));
+    }
+
+    private void sendRecords(int numRecords, long startingTimestamp) {
+        try (Producer<byte[], byte[]> producer = cluster.producer()) {
+            for (int i = 0; i < numRecords; i++) {
+                long timestamp = startingTimestamp + i;
+                var record = new ProducerRecord<>(
+                    tp.topic(),
+                    tp.partition(),
+                    timestamp,
+                    ("key " + i).getBytes(),
+                    ("value " + i).getBytes()
+                );
+                producer.send(record);
+            }
+            producer.flush();
+        }
+    }
+
+    protected void consumeAndVerifyRecords(
+        Consumer<byte[], byte[]> consumer,
+        int numRecords,
+        int startingOffset,
+        int startingKeyAndValueIndex,
+        long startingTimestamp
+    ) throws InterruptedException {
+        var records = consumeRecords(consumer, numRecords);
+        for (var i = 0; i < numRecords; i++) {
+            var record = records.get(i);
+            int offset = startingOffset + i;

Review Comment:
   I suggest keeping the style consistent. `var`



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.callback;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = 3
+)
+@ExtendWith(ClusterTestExtensions.class)
+public class PlaintextConsumerCallbackTest {
+
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+
+    public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) {
+        this.cluster = clusterInstance;
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+                assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+                assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> 
assertTrue(executeConsumer.assignment().contains(tp))
+            );
+        }
+    }
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws 
InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> 
assertTrue(executeConsumer.assignment().contains(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws 
InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var map = executeConsumer.beginningOffsets(List.of(tp));
+                assertTrue(map.containsKey(tp));
+                assertEquals(0L, map.get(tp));
+            });
+        }
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws 
InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var map = executeConsumer.beginningOffsets(List.of(tp));
+                assertTrue(map.containsKey(tp));
+                assertEquals(0L, map.get(tp));
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignOnPartitionsRevoked() 
throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> {
+            var e = assertThrows(IllegalStateException.class, () -> 
consumer.assign(List.of(tp)));
+            assertEquals("Subscription to topics, partitions and pattern are 
mutually exclusive", e.getMessage());
+        });
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerRebalanceListenerAssignOnPartitionsRevoked() 
throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> {
+            var e = assertThrows(IllegalStateException.class, () -> 
consumer.assign(List.of(tp)));
+            assertEquals("Subscription to topics, partitions and pattern are 
mutually exclusive", e.getMessage());
+        });
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignmentOnPartitionsRevoked() 
throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC,
+            (consumer, partitions) -> 
assertTrue(consumer.assignment().contains(tp))
+        );
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER,
+            (consumer, partitions) -> 
assertTrue(consumer.assignment().contains(tp))
+        );
+    }
+
+    @ClusterTest
+    public void 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> {
+            var map = consumer.beginningOffsets(List.of(tp));
+            assertTrue(map.containsKey(tp));
+            assertEquals(0L, map.get(tp));
+        });
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> {
+            var map = consumer.beginningOffsets(List.of(tp));
+            assertTrue(map.containsKey(tp));
+            assertEquals(0L, map.get(tp));
+        });
+    }
+
+    @ClusterTest
+    public void 
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws 
InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> assertDoesNotThrow(() -> 
executeConsumer.position(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> assertDoesNotThrow(() -> 
executeConsumer.position(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            var startingOffset = 100L;
+            var totalRecords = 120;
+            var startingTimestamp = 0L;
+
+            sendRecords(totalRecords, startingTimestamp);
+
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                executeConsumer.seek(tp, startingOffset);
+                executeConsumer.pause(Collections.singletonList(tp));
+            });
+
+            assertTrue(consumer.paused().contains(tp));
+            consumer.resume(Collections.singletonList(tp));
+            consumeAndVerifyRecords(
+                consumer,
+                (int) (totalRecords - startingOffset),
+                (int) startingOffset,
+                (int) startingOffset,
+                startingOffset
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            var startingOffset = 100L;
+            var totalRecords = 120;
+            var startingTimestamp = 0L;
+
+            sendRecords(totalRecords, startingTimestamp);
+
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                executeConsumer.seek(tp, startingOffset);
+                executeConsumer.pause(Collections.singletonList(tp));
+            });
+
+            assertTrue(consumer.paused().contains(tp));
+            consumer.resume(Collections.singletonList(tp));
+            consumeAndVerifyRecords(
+                consumer,
+                (int) (totalRecords - startingOffset),
+                (int) startingOffset,
+                (int) startingOffset,
+                startingOffset
+            );
+        }
+    }
+
+    private void triggerOnPartitionsAssigned(
+        TopicPartition tp,
+        Consumer<byte[], byte[]> consumer,
+        BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> 
execute
+    ) throws InterruptedException {
+        var partitionsAssigned = new AtomicBoolean(false);
+        consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                // Make sure the partition used in the test is actually 
assigned before continuing.
+                if (partitions.contains(tp)) {
+                    execute.accept(consumer, partitions);
+                    partitionsAssigned.set(true);
+                }
+            }
+
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                // noop
+            }
+        });
+        TestUtils.waitForCondition(
+            () -> {
+                consumer.poll(Duration.ofMillis(100));
+                return partitionsAssigned.get();
+            },
+            "Timed out before expected rebalance completed"
+        );
+    }
+
+    private void triggerOnPartitionsRevoked(
+        TopicPartition tp,
+        GroupProtocol protocol,
+        BiConsumer<Consumer<byte[], byte[]>, Collection<TopicPartition>> 
execute
+    ) throws InterruptedException {
+        var partitionsAssigned = new AtomicBoolean(false);
+        var partitionsRevoked = new AtomicBoolean(false);
+        try (var consumer = createConsumer(protocol)) {
+            consumer.subscribe(Collections.singletonList(topic), new 
ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                    // Make sure the partition used in the test is actually 
assigned before continuing.
+                    if (partitions.contains(tp)) {
+                        partitionsAssigned.set(true);
+                    }
+                }
+
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                    // Make sure the partition used in the test is actually 
revoked before continuing.
+                    if (partitions.contains(tp)) {
+                        execute.accept(consumer, partitions);
+                        partitionsRevoked.set(true);
+                    }
+                }
+            });
+            TestUtils.waitForCondition(
+                () -> {
+                    consumer.poll(Duration.ofMillis(100));
+                    return partitionsAssigned.get();
+                },
+                "Timed out before expected rebalance completed"
+            );
+        }
+        assertTrue(partitionsRevoked.get());
+    }
+
+    private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol) {
+        return cluster.consumer(Map.of(
+            GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT),
+            ENABLE_AUTO_COMMIT_CONFIG, "false"
+        ));
+    }
+
+    private void sendRecords(int numRecords, long startingTimestamp) {
+        try (Producer<byte[], byte[]> producer = cluster.producer()) {
+            for (int i = 0; i < numRecords; i++) {
+                long timestamp = startingTimestamp + i;
+                var record = new ProducerRecord<>(
+                    tp.topic(),
+                    tp.partition(),
+                    timestamp,
+                    ("key " + i).getBytes(),
+                    ("value " + i).getBytes()
+                );
+                producer.send(record);
+            }
+            producer.flush();
+        }
+    }
+
+    protected void consumeAndVerifyRecords(
+        Consumer<byte[], byte[]> consumer,
+        int numRecords,
+        int startingOffset,
+        int startingKeyAndValueIndex,
+        long startingTimestamp
+    ) throws InterruptedException {
+        var records = consumeRecords(consumer, numRecords);
+        for (var i = 0; i < numRecords; i++) {
+            var record = records.get(i);
+            int offset = startingOffset + i;
+
+            assertEquals(tp.topic(), record.topic());
+            assertEquals(tp.partition(), record.partition());
+
+            assertEquals(TimestampType.CREATE_TIME, record.timestampType());
+            var timestamp = startingTimestamp + i;
+            assertEquals(timestamp, record.timestamp());
+
+            assertEquals(offset, record.offset());
+            var keyAndValueIndex = startingKeyAndValueIndex + i;
+            assertEquals("key " + keyAndValueIndex, new String(record.key()));
+            assertEquals("value " + keyAndValueIndex, new 
String(record.value()));
+            // this is true only because K and V are byte arrays
+            assertEquals(("key " + keyAndValueIndex).length(), 
record.serializedKeySize());
+            assertEquals(("value " + keyAndValueIndex).length(), 
record.serializedValueSize());
+        }
+    }
+
+    protected <K, V> List<ConsumerRecord<K, V>> consumeRecords(
+        Consumer<K, V> consumer,
+        int numRecords
+    ) throws InterruptedException {
+        List<ConsumerRecord<K, V>> records = new ArrayList<>();
+        TestUtils.waitForCondition(() -> {
+            var polledRecords = consumer.poll(Duration.ofMillis(100));
+            for (var record : polledRecords) {
+                records.add(record);
+            }

Review Comment:
   nit: `forEach`



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.callback;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+    types = {Type.KRAFT},
+    brokers = 3
+)
+@ExtendWith(ClusterTestExtensions.class)
+public class PlaintextConsumerCallbackTest {
+
+    private final ClusterInstance cluster;
+    private final String topic = "topic";
+    private final TopicPartition tp = new TopicPartition(topic, 0);
+
+    public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) {
+        this.cluster = clusterInstance;
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+                assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+                assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> 
assertTrue(executeConsumer.assignment().contains(tp))
+            );
+        }
+    }
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws 
InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> 
assertTrue(executeConsumer.assignment().contains(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws 
InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var map = executeConsumer.beginningOffsets(List.of(tp));
+                assertTrue(map.containsKey(tp));
+                assertEquals(0L, map.get(tp));
+            });
+        }
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws 
InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                var map = executeConsumer.beginningOffsets(List.of(tp));
+                assertTrue(map.containsKey(tp));
+                assertEquals(0L, map.get(tp));
+            });
+        }
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignOnPartitionsRevoked() 
throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> {
+            var e = assertThrows(IllegalStateException.class, () -> 
consumer.assign(List.of(tp)));
+            assertEquals("Subscription to topics, partitions and pattern are 
mutually exclusive", e.getMessage());
+        });
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerRebalanceListenerAssignOnPartitionsRevoked() 
throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> {
+            var e = assertThrows(IllegalStateException.class, () -> 
consumer.assign(List.of(tp)));
+            assertEquals("Subscription to topics, partitions and pattern are 
mutually exclusive", e.getMessage());
+        });
+    }
+
+    @ClusterTest
+    public void testConsumerRebalanceListenerAssignmentOnPartitionsRevoked() 
throws InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC,
+            (consumer, partitions) -> 
assertTrue(consumer.assignment().contains(tp))
+        );
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER,
+            (consumer, partitions) -> 
assertTrue(consumer.assignment().contains(tp))
+        );
+    }
+
+    @ClusterTest
+    public void 
testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CLASSIC, (consumer, partitions) -> {
+            var map = consumer.beginningOffsets(List.of(tp));
+            assertTrue(map.containsKey(tp));
+            assertEquals(0L, map.get(tp));
+        });
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws 
InterruptedException {
+        triggerOnPartitionsRevoked(tp, CONSUMER, (consumer, partitions) -> {
+            var map = consumer.beginningOffsets(List.of(tp));
+            assertTrue(map.containsKey(tp));
+            assertEquals(0L, map.get(tp));
+        });
+    }
+
+    @ClusterTest
+    public void 
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws 
InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> assertDoesNotThrow(() -> 
executeConsumer.position(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback()
 throws InterruptedException {
+        try (var consumer = createConsumer(CONSUMER)) {
+            triggerOnPartitionsAssigned(tp, consumer,
+                (executeConsumer, partitions) -> assertDoesNotThrow(() -> 
executeConsumer.position(tp))
+            );
+        }
+    }
+
+    @ClusterTest
+    public void 
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() 
throws InterruptedException {
+        try (var consumer = createConsumer(CLASSIC)) {
+            var startingOffset = 100L;
+            var totalRecords = 120;
+            var startingTimestamp = 0L;
+
+            sendRecords(totalRecords, startingTimestamp);
+
+            triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+                executeConsumer.seek(tp, startingOffset);
+                executeConsumer.pause(Collections.singletonList(tp));
+            });
+
+            assertTrue(consumer.paused().contains(tp));
+            consumer.resume(Collections.singletonList(tp));

Review Comment:
   `List.of`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to