m1a2st commented on code in PR #19319: URL: https://github.com/apache/kafka/pull/19319#discussion_r2030063959
########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; + + +@ClusterTestDefaults( + types = {Type.KRAFT} +) +class ProducerCompressionTest { + + private final String topicName = "topic"; + private final int numRecords = 2000; + + /** + * testCompression + * <p> + * Compressed messages should be able to sent and consumed correctly + */ + @ClusterTest + void testCompression(ClusterInstance cluster) { + Set<String> compressionSet = Set.of("none", "gzip", "snappy", "lz4", "zstd"); Review Comment: We could use `CompressionType` enum to get all compression types. ``` List<String> compressionSet = Arrays.stream(CompressionType.values()).map(c -> c.name).toList(); ``` ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ProducerState; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidPidMappingException; +import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; + +import org.opentest4j.AssertionFailedError; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +@ClusterTestDefaults( + brokers = 3, + serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + // Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long. + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "3"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "2"), + @ClusterConfigProperty(key = CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), + // ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG is not a constant + @ClusterConfigProperty(key = "unclean.leader.election.enable", value = "false"), + @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "200"), + @ClusterConfigProperty(key = TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "5000"), + @ClusterConfigProperty(key = TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value = "500"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_MS_CONFIG, value = "10000"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500") + } +) +public class ProducerIdExpirationTest { + private final String topic1 = "topic1"; + private final int numPartitions = 1; + private final short replicationFactor = 3; + private final TopicPartition tp0 = new TopicPartition(topic1, 0); + private final String transactionalId = "transactionalProducer"; + private final ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, ""); + Review Comment: We can move `cluster.createTopic(topic1, numPartitions, replicationFactor);` into `@BeforeEach` ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; + + +@ClusterTestDefaults( + types = {Type.KRAFT} +) +class ProducerCompressionTest { + + private final String topicName = "topic"; + private final int numRecords = 2000; + + /** + * testCompression + * <p> + * Compressed messages should be able to sent and consumed correctly + */ + @ClusterTest + void testCompression(ClusterInstance cluster) { + Set<String> compressionSet = Set.of("none", "gzip", "snappy", "lz4", "zstd"); + compressionSet.forEach(compression -> { + try { + processCompressionTest(cluster, compression); + } catch (InterruptedException | ExecutionException e) { + fail(e); + } + }); + } + + + void processCompressionTest(ClusterInstance cluster, String compression) throws InterruptedException, + ExecutionException { + String compressionTopic = topicName + "_" + compression; + cluster.createTopic(compressionTopic, 1, (short) 1); + Map<String, Object> producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200"); + Producer<byte[], byte[]> producer = cluster.producer(producerProps); + Consumer<byte[], byte[]> classicConsumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic")); + Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer")); + try (producer) { + int partition = 0; + // prepare the messages + List<String> messages = IntStream.range(0, numRecords).mapToObj(this::messageValue).toList(); + Header[] headerArr = new Header[]{new RecordHeader("key", "value".getBytes())}; + RecordHeaders headers = new RecordHeaders(headerArr); + + // make sure the returned messages are correct + long now = System.currentTimeMillis(); + List<Future<RecordMetadata>> responses = new ArrayList<>(); + messages.forEach(message -> { + // 1. send message without key and header + responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, null, + message.getBytes()))); + // 2. send message with key, without header + responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, + String.valueOf(message.length()).getBytes(), message.getBytes()))); + // 3. send message with key and header + responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, + String.valueOf(message.length()).getBytes(), message.getBytes(), headers))); + }); + for (int offset = 0; offset < responses.size(); offset++) { + assertEquals(offset, responses.get(offset).get().offset(), compression); + } + verifyConsumerRecords(consumer, messages, now, headerArr, partition, compressionTopic, compression); + verifyConsumerRecords(classicConsumer, messages, now, headerArr, partition, compressionTopic, compression); + } finally { + // This consumer close very slowly, which may cause the entire test to time out, and we can't wait for + // it to auto close + consumer.close(Duration.ofSeconds(1)); + classicConsumer.close(Duration.ofSeconds(1)); + } + } + + private void verifyConsumerRecords(Consumer<byte[], byte[]> consumer, List<String> messages, long now, + Header[] headerArr, int partition, String topic, String compression) { + TopicPartition tp = new TopicPartition(topic, partition); + consumer.assign(Collections.singleton(tp)); Review Comment: `List.of` can instead of `Collections.singleton` . ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; + + +@ClusterTestDefaults( + types = {Type.KRAFT} Review Comment: Nit: Please follow the indentation style used in other tests — 4 spaces. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java: ########## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.fail; + + +@ClusterTestDefaults( + types = {Type.KRAFT} +) +class ProducerCompressionTest { + + private final String topicName = "topic"; + private final int numRecords = 2000; + + /** + * testCompression + * <p> + * Compressed messages should be able to sent and consumed correctly + */ + @ClusterTest + void testCompression(ClusterInstance cluster) { + Set<String> compressionSet = Set.of("none", "gzip", "snappy", "lz4", "zstd"); + compressionSet.forEach(compression -> { + try { + processCompressionTest(cluster, compression); + } catch (InterruptedException | ExecutionException e) { + fail(e); + } + }); + } + + + void processCompressionTest(ClusterInstance cluster, String compression) throws InterruptedException, + ExecutionException { + String compressionTopic = topicName + "_" + compression; + cluster.createTopic(compressionTopic, 1, (short) 1); + Map<String, Object> producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression); + producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000"); + producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200"); + Producer<byte[], byte[]> producer = cluster.producer(producerProps); + Consumer<byte[], byte[]> classicConsumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic")); + Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer")); + try (producer) { Review Comment: We can move create producer into try-with-resource ``` try (Producer<byte[], byte[]> producer = cluster.producer(producerProps)) { // ... } ``` ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ProducerState; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidPidMappingException; +import org.apache.kafka.common.errors.TransactionalIdNotFoundException; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; + +import org.opentest4j.AssertionFailedError; + +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static kafka.utils.TestUtils.consumeRecords; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; +import static org.apache.kafka.test.TestUtils.assertFutureThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +@ClusterTestDefaults( + brokers = 3, + serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + // Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long. + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "3"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"), + @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "2"), + @ClusterConfigProperty(key = CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"), + // ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG is not a constant + @ClusterConfigProperty(key = "unclean.leader.election.enable", value = "false"), + @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"), + @ClusterConfigProperty(key = TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "200"), + @ClusterConfigProperty(key = TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "5000"), + @ClusterConfigProperty(key = TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value = "500"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_MS_CONFIG, value = "10000"), + @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500") + } +) Review Comment: Nit: Please follow the indentation style used in other tests — 4 spaces. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import kafka.server.KafkaBroker; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.record.DefaultRecord; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC; +import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + // 15000 is filed serverMessageMaxBytes + @ClusterConfigProperty(key = MESSAGE_MAX_BYTES_CONFIG, value = "15000"), + // 15200 is filed replicaFetchMaxBytes + @ClusterConfigProperty(key = REPLICA_FETCH_MAX_BYTES_CONFIG, value = "15200"), + // 15400 is filed replicaFetchMaxResponseBytes + @ClusterConfigProperty(key = REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, value = "15400"), + // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + } +) Review Comment: Nit: Please follow the indentation style used in other tests — 4 spaces. ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java: ########## @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import kafka.server.KafkaBroker; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.record.DefaultRecord; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG; +import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC; +import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = 2, + serverProperties = { + @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"), + // 15000 is filed serverMessageMaxBytes + @ClusterConfigProperty(key = MESSAGE_MAX_BYTES_CONFIG, value = "15000"), + // 15200 is filed replicaFetchMaxBytes + @ClusterConfigProperty(key = REPLICA_FETCH_MAX_BYTES_CONFIG, value = "15200"), + // 15400 is filed replicaFetchMaxResponseBytes + @ClusterConfigProperty(key = REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, value = "15400"), + // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic) + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + } +) +public class ProducerFailureHandlingTest { + + private final int producerBufferSize = 30000; + private final int serverMessageMaxBytes = producerBufferSize / 2; + private final int replicaFetchMaxPartitionBytes = serverMessageMaxBytes + 200; + private final int replicaFetchMaxResponseBytes = replicaFetchMaxPartitionBytes + 200; + private final String topic1 = "topic-1"; + private final String topic2 = "topic-2"; + + + /** + * With ack == 0 the future metadata will have no exceptions with offset -1 + */ + @ClusterTest + void testTooLargeRecordWithAckZero(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(0))) { + // send a too-large record + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topic1, null, "key".getBytes(), new byte[serverMessageMaxBytes + 1]); + + RecordMetadata recordMetadata = producer.send(record).get(); + assertNotNull(recordMetadata); + assertFalse(recordMetadata.hasOffset()); + assertEquals(-1L, recordMetadata.offset()); + } + } + + /** + * With ack == 1 the future metadata will throw ExecutionException caused by RecordTooLargeException + */ + @ClusterTest + void testTooLargeRecordWithAckOne(ClusterInstance clusterInstance) throws InterruptedException { + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(1))) { + // send a too-large record + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topic1, null, "key".getBytes(), new byte[serverMessageMaxBytes + 1]); + assertThrows(ExecutionException.class, () -> producer.send(record).get()); + } + } + + + /** + * This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 + */ + @ClusterTest + void testPartitionTooLargeForReplicationWithAckAll(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + checkTooLargeRecordForReplicationWithAckAll(clusterInstance, replicaFetchMaxPartitionBytes); + } + + /** + * This should succeed as the replica fetcher thread can handle oversized messages since KIP-74 + */ + @ClusterTest + void testResponseTooLargeForReplicationWithAckAll(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + checkTooLargeRecordForReplicationWithAckAll(clusterInstance, replicaFetchMaxResponseBytes); + } + + + /** + * With non-exist-topic the future metadata should return ExecutionException caused by TimeoutException + */ + @ClusterTest + void testNonExistentTopic(ClusterInstance clusterInstance) { + // send a record with non-exist topic + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topic2, null, "key".getBytes(), "value".getBytes()); + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(0))) { + assertThrows(ExecutionException.class, () -> producer.send(record).get()); + } + } + + + /** + * With incorrect broker-list the future metadata should return ExecutionException caused by TimeoutException + * <p> + * TODO: other exceptions that can be thrown in ExecutionException: + * UnknownTopicOrPartitionException + * NotLeaderOrFollowerException + * LeaderNotAvailableException + * CorruptRecordException + * TimeoutException + */ + @ClusterTest + void testWrongBrokerList(ClusterInstance clusterInstance) throws InterruptedException { + clusterInstance.createTopic(topic1, 1, (short) 1); + // producer with incorrect broker list + Map<String, Object> producerConfig = new HashMap<>(producerConfig(1)); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8686,localhost:4242"); + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig)) { + // send a record with incorrect broker list + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topic1, null, "key".getBytes(), "value".getBytes()); + assertThrows(ExecutionException.class, () -> producer.send(record).get()); + } + } + + /** + * Send with invalid partition id should return ExecutionException caused by TimeoutException + * when partition is higher than the upper bound of partitions. + */ + @ClusterTest + void testInvalidPartition(ClusterInstance clusterInstance) throws InterruptedException { + // create topic with a single partition + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + + // create a record with incorrect partition id (higher than the number of partitions), send should fail + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(0))) { + ProducerRecord<byte[], byte[]> higherRecord = + new ProducerRecord<>(topic1, 1, "key".getBytes(), "value".getBytes()); + Exception e = assertThrows(ExecutionException.class, () -> producer.send(higherRecord).get()); + assertEquals(TimeoutException.class, e.getCause().getClass()); + } + } + + + /** + * The send call after producer closed should throw IllegalStateException + */ + @ClusterTest + void testSendAfterClosed(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException { + // create topic + clusterInstance.createTopic(topic1, 1, (short) clusterInstance.brokers().size()); + + Producer<byte[], byte[]> producer1 = clusterInstance.producer(producerConfig(0)); + Producer<byte[], byte[]> producer2 = clusterInstance.producer(producerConfig(1)); + Producer<byte[], byte[]> producer3 = clusterInstance.producer(producerConfig(-1)); + + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topic1, null, "key".getBytes(), "value".getBytes()); + // first send a message to make sure the metadata is refreshed + producer1.send(record).get(); + producer2.send(record).get(); + producer3.send(record).get(); + + producer1.close(); + assertThrows(IllegalStateException.class, () -> producer1.send(record)); + producer2.close(); + assertThrows(IllegalStateException.class, () -> producer2.send(record)); + producer3.close(); + assertThrows(IllegalStateException.class, () -> producer3.send(record)); + } + + @ClusterTest + void testCannotSendToInternalTopic(ClusterInstance clusterInstance) throws InterruptedException { + try (Admin admin = clusterInstance.admin()) { + Map<String, String> topicConfig = new HashMap<>(); + clusterInstance.brokers().get(0) + .groupCoordinator() + .groupMetadataTopicConfigs() + .forEach((k, v) -> topicConfig.put(k.toString(), v.toString())); + admin.createTopics(List.of(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1, (short) 1).configs(topicConfig))); + clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 0); + } + + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(1))) { + Exception thrown = assertThrows(ExecutionException.class, + () -> producer.send(new ProducerRecord<>(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes(), + "test".getBytes())).get()); + assertInstanceOf(InvalidTopicException.class, thrown.getCause(), + () -> "Unexpected exception while sending to an invalid topic " + thrown.getCause()); + } + } + + @ClusterTest + void testNotEnoughReplicasAfterBrokerShutdown(ClusterInstance clusterInstance) throws InterruptedException, + ExecutionException { + String topicName = "minisrtest2"; + int brokerNum = clusterInstance.brokers().size(); + Map<String, String> topicConfig = Map.of(MIN_IN_SYNC_REPLICAS_CONFIG, String.valueOf(brokerNum)); + try (Admin admin = clusterInstance.admin()) { + admin.createTopics(List.of(new NewTopic(topicName, 1, (short) brokerNum).configs(topicConfig))); + } + + ProducerRecord<byte[], byte[]> record = + new ProducerRecord<>(topicName, null, "key".getBytes(), "value".getBytes()); + + try (Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(-1))) { + // this should work with all brokers up and running + producer.send(record).get(); + // shut down one broker + KafkaBroker oneBroker = clusterInstance.brokers().get(0); + oneBroker.shutdown(); + oneBroker.awaitShutdown(); + + Exception e = assertThrows(ExecutionException.class, () -> producer.send(record).get()); + assertTrue(e.getCause() instanceof NotEnoughReplicasException || + e.getCause() instanceof NotEnoughReplicasAfterAppendException || + e.getCause() instanceof TimeoutException); + + // restart the server + oneBroker.startup(); + } + } + + private void checkTooLargeRecordForReplicationWithAckAll(ClusterInstance clusterInstance, int maxFetchSize) throws InterruptedException, ExecutionException { + int maxMessageSize = maxFetchSize + 100; + int brokerSize = clusterInstance.brokers().size(); + Map<String, String> topicConfig = new HashMap<>(); + topicConfig.put(MIN_IN_SYNC_REPLICAS_CONFIG, String.valueOf(brokerSize)); + topicConfig.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(maxMessageSize)); + + // create topic + String topic10 = "topic10"; + try (Admin admin = clusterInstance.admin()) { + admin.createTopics(List.of(new NewTopic(topic10, brokerSize, (short) brokerSize).configs(topicConfig))); + clusterInstance.waitTopicDeletion("topic10"); + } + + // send a record that is too large for replication, but within the broker max message limit + byte[] value = + new byte[maxMessageSize - DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD]; + Producer<byte[], byte[]> producer = clusterInstance.producer(producerConfig(-1)); + try (producer) { Review Comment: ditto: We can move create producer into try-with-resource -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org