m1a2st commented on code in PR #19319:
URL: https://github.com/apache/kafka/pull/19319#discussion_r2030063959


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static kafka.utils.TestUtils.consumeRecords;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@ClusterTestDefaults(
+        types = {Type.KRAFT}
+)
+class ProducerCompressionTest {
+
+    private final String topicName = "topic";
+    private final int numRecords = 2000;
+
+    /**
+     * testCompression
+     * <p>
+     * Compressed messages should be able to sent and consumed correctly
+     */
+    @ClusterTest
+    void testCompression(ClusterInstance cluster) {
+        Set<String> compressionSet = Set.of("none", "gzip", "snappy", "lz4", 
"zstd");

Review Comment:
   We could use `CompressionType` enum to get all compression types.
   ```
   List<String> compressionSet = Arrays.stream(CompressionType.values()).map(c 
-> c.name).toList();
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidPidMappingException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+
+import org.opentest4j.AssertionFailedError;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static kafka.utils.TestUtils.consumeRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
+import static 
org.apache.kafka.server.config.ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG;
+import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@ClusterTestDefaults(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
value = "false"),
+            // Set a smaller value for the number of partitions for the 
__consumer_offsets topic
+            // so that the creation of that topic/partition(s) and subsequent 
leader assignment doesn't take relatively long.
+            @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, 
value = "1"),
+            @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
value = "3"),
+            @ClusterConfigProperty(key = 
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+            @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
value = "2"),
+            @ClusterConfigProperty(key = CONTROLLED_SHUTDOWN_ENABLE_CONFIG, 
value = "true"),
+            //  ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG is 
not a constant 
+            @ClusterConfigProperty(key = "unclean.leader.election.enable", 
value = "false"),
+            @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, 
value = "false"),
+            @ClusterConfigProperty(key = 
GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
+            @ClusterConfigProperty(key = 
TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = 
"200"),
+            @ClusterConfigProperty(key = 
TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "5000"),
+            @ClusterConfigProperty(key = 
TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value 
= "500"),
+            @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_MS_CONFIG, 
value = "10000"),
+            @ClusterConfigProperty(key = 
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500")
+        }
+)
+public class ProducerIdExpirationTest {
+    private final String topic1 = "topic1";
+    private final int numPartitions = 1;
+    private final short replicationFactor = 3;
+    private final TopicPartition tp0 = new TopicPartition(topic1, 0);
+    private final String transactionalId = "transactionalProducer";
+    private final ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "");
+

Review Comment:
   We can move `cluster.createTopic(topic1, numPartitions, replicationFactor);` 
into `@BeforeEach`



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static kafka.utils.TestUtils.consumeRecords;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@ClusterTestDefaults(
+        types = {Type.KRAFT}
+)
+class ProducerCompressionTest {
+
+    private final String topicName = "topic";
+    private final int numRecords = 2000;
+
+    /**
+     * testCompression
+     * <p>
+     * Compressed messages should be able to sent and consumed correctly
+     */
+    @ClusterTest
+    void testCompression(ClusterInstance cluster) {
+        Set<String> compressionSet = Set.of("none", "gzip", "snappy", "lz4", 
"zstd");
+        compressionSet.forEach(compression -> {
+            try {
+                processCompressionTest(cluster, compression);
+            } catch (InterruptedException | ExecutionException e) {
+                fail(e);
+            }
+        });
+    }
+
+
+    void processCompressionTest(ClusterInstance cluster, String compression) 
throws InterruptedException,
+            ExecutionException {
+        String compressionTopic = topicName + "_" + compression;
+        cluster.createTopic(compressionTopic, 1, (short) 1);
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression);
+        producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000");
+        producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200");
+        Producer<byte[], byte[]> producer = cluster.producer(producerProps);
+        Consumer<byte[], byte[]> classicConsumer = 
cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"));
+        Consumer<byte[], byte[]> consumer = 
cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"));
+        try (producer) {
+            int partition = 0;
+            // prepare the messages
+            List<String> messages = IntStream.range(0, 
numRecords).mapToObj(this::messageValue).toList();
+            Header[] headerArr = new Header[]{new RecordHeader("key", 
"value".getBytes())};
+            RecordHeaders headers = new RecordHeaders(headerArr);
+
+            // make sure the returned messages are correct
+            long now = System.currentTimeMillis();
+            List<Future<RecordMetadata>> responses = new ArrayList<>();
+            messages.forEach(message -> {
+                // 1. send message without key and header
+                responses.add(producer.send(new 
ProducerRecord<>(compressionTopic, null, now, null,
+                        message.getBytes())));
+                // 2. send message with key, without header
+                responses.add(producer.send(new 
ProducerRecord<>(compressionTopic, null, now,
+                        String.valueOf(message.length()).getBytes(), 
message.getBytes())));
+                // 3. send message with key and header
+                responses.add(producer.send(new 
ProducerRecord<>(compressionTopic, null, now,
+                        String.valueOf(message.length()).getBytes(), 
message.getBytes(), headers)));
+            });
+            for (int offset = 0; offset < responses.size(); offset++) {
+                assertEquals(offset, responses.get(offset).get().offset(), 
compression);
+            }
+            verifyConsumerRecords(consumer, messages, now, headerArr, 
partition, compressionTopic, compression);
+            verifyConsumerRecords(classicConsumer, messages, now, headerArr, 
partition, compressionTopic, compression);
+        } finally {
+            //  This consumer close very slowly, which may cause the entire 
test to time out, and we can't wait for 
+            //  it to  auto close 
+            consumer.close(Duration.ofSeconds(1));
+            classicConsumer.close(Duration.ofSeconds(1));
+        }
+    }
+
+    private void verifyConsumerRecords(Consumer<byte[], byte[]> consumer, 
List<String> messages, long now,
+                                       Header[] headerArr, int partition, 
String topic, String compression) {
+        TopicPartition tp = new TopicPartition(topic, partition);
+        consumer.assign(Collections.singleton(tp));

Review Comment:
   `List.of` can instead of `Collections.singleton` .



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static kafka.utils.TestUtils.consumeRecords;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@ClusterTestDefaults(
+        types = {Type.KRAFT}

Review Comment:
   Nit: Please follow the indentation style used in other tests — 4 spaces.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerCompressionTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static kafka.utils.TestUtils.consumeRecords;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@ClusterTestDefaults(
+        types = {Type.KRAFT}
+)
+class ProducerCompressionTest {
+
+    private final String topicName = "topic";
+    private final int numRecords = 2000;
+
+    /**
+     * testCompression
+     * <p>
+     * Compressed messages should be able to sent and consumed correctly
+     */
+    @ClusterTest
+    void testCompression(ClusterInstance cluster) {
+        Set<String> compressionSet = Set.of("none", "gzip", "snappy", "lz4", 
"zstd");
+        compressionSet.forEach(compression -> {
+            try {
+                processCompressionTest(cluster, compression);
+            } catch (InterruptedException | ExecutionException e) {
+                fail(e);
+            }
+        });
+    }
+
+
+    void processCompressionTest(ClusterInstance cluster, String compression) 
throws InterruptedException,
+            ExecutionException {
+        String compressionTopic = topicName + "_" + compression;
+        cluster.createTopic(compressionTopic, 1, (short) 1);
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression);
+        producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000");
+        producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200");
+        Producer<byte[], byte[]> producer = cluster.producer(producerProps);
+        Consumer<byte[], byte[]> classicConsumer = 
cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"));
+        Consumer<byte[], byte[]> consumer = 
cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"));
+        try (producer) {

Review Comment:
   We can move create producer into try-with-resource
   ```
   try (Producer<byte[], byte[]> producer = cluster.producer(producerProps)) {
   // ...
   }
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidPidMappingException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+
+import org.opentest4j.AssertionFailedError;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static kafka.utils.TestUtils.consumeRecords;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
+import static 
org.apache.kafka.server.config.ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG;
+import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@ClusterTestDefaults(
+        brokers = 3,
+        serverProperties = {
+            @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
value = "false"),
+            // Set a smaller value for the number of partitions for the 
__consumer_offsets topic
+            // so that the creation of that topic/partition(s) and subsequent 
leader assignment doesn't take relatively long.
+            @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, 
value = "1"),
+            @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
value = "3"),
+            @ClusterConfigProperty(key = 
TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+            @ClusterConfigProperty(key = TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, 
value = "2"),
+            @ClusterConfigProperty(key = CONTROLLED_SHUTDOWN_ENABLE_CONFIG, 
value = "true"),
+            //  ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG is 
not a constant 
+            @ClusterConfigProperty(key = "unclean.leader.election.enable", 
value = "false"),
+            @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, 
value = "false"),
+            @ClusterConfigProperty(key = 
GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
+            @ClusterConfigProperty(key = 
TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = 
"200"),
+            @ClusterConfigProperty(key = 
TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "5000"),
+            @ClusterConfigProperty(key = 
TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value 
= "500"),
+            @ClusterConfigProperty(key = PRODUCER_ID_EXPIRATION_MS_CONFIG, 
value = "10000"),
+            @ClusterConfigProperty(key = 
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500")
+        }
+)

Review Comment:
   Nit: Please follow the indentation style used in other tests — 4 spaces.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static 
org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC;
+import static 
org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
+import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@ClusterTestDefaults(
+        types = {Type.KRAFT},
+        brokers = 2,
+        serverProperties = {
+            @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
value = "false"),
+            //  15000 is filed serverMessageMaxBytes
+            @ClusterConfigProperty(key = MESSAGE_MAX_BYTES_CONFIG, value = 
"15000"),
+            //  15200 is filed replicaFetchMaxBytes
+            @ClusterConfigProperty(key = REPLICA_FETCH_MAX_BYTES_CONFIG, value 
= "15200"),
+            //  15400 is filed replicaFetchMaxResponseBytes
+            @ClusterConfigProperty(key = REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, 
value = "15400"),
+            // Set a smaller value for the number of partitions for the offset 
commit topic (__consumer_offset topic)
+            // so that the creation of that topic/partition(s) and subsequent 
leader assignment doesn't take relatively long
+            @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, 
value = "1"),
+        }
+)

Review Comment:
   Nit: Please follow the indentation style used in other tests — 4 spaces.



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerFailureHandlingTest.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import kafka.server.KafkaBroker;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.record.DefaultRecord;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
+import static 
org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC;
+import static 
org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
+import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@ClusterTestDefaults(
+        types = {Type.KRAFT},
+        brokers = 2,
+        serverProperties = {
+            @ClusterConfigProperty(key = AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
value = "false"),
+            //  15000 is filed serverMessageMaxBytes
+            @ClusterConfigProperty(key = MESSAGE_MAX_BYTES_CONFIG, value = 
"15000"),
+            //  15200 is filed replicaFetchMaxBytes
+            @ClusterConfigProperty(key = REPLICA_FETCH_MAX_BYTES_CONFIG, value 
= "15200"),
+            //  15400 is filed replicaFetchMaxResponseBytes
+            @ClusterConfigProperty(key = REPLICA_FETCH_RESPONSE_MAX_BYTES_DOC, 
value = "15400"),
+            // Set a smaller value for the number of partitions for the offset 
commit topic (__consumer_offset topic)
+            // so that the creation of that topic/partition(s) and subsequent 
leader assignment doesn't take relatively long
+            @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, 
value = "1"),
+        }
+)
+public class ProducerFailureHandlingTest {
+
+    private final int producerBufferSize = 30000;
+    private final int serverMessageMaxBytes = producerBufferSize / 2;
+    private final int replicaFetchMaxPartitionBytes = serverMessageMaxBytes + 
200;
+    private final int replicaFetchMaxResponseBytes = 
replicaFetchMaxPartitionBytes + 200;
+    private final String topic1 = "topic-1";
+    private final String topic2 = "topic-2";
+
+
+    /**
+     * With ack == 0 the future metadata will have no exceptions with offset -1
+     */
+    @ClusterTest
+    void testTooLargeRecordWithAckZero(ClusterInstance clusterInstance) throws 
InterruptedException,
+            ExecutionException {
+        clusterInstance.createTopic(topic1, 1, (short) 
clusterInstance.brokers().size());
+        try (Producer<byte[], byte[]> producer = 
clusterInstance.producer(producerConfig(0))) {
+            // send a too-large record
+            ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(topic1, null, "key".getBytes(), new 
byte[serverMessageMaxBytes + 1]);
+
+            RecordMetadata recordMetadata = producer.send(record).get();
+            assertNotNull(recordMetadata);
+            assertFalse(recordMetadata.hasOffset());
+            assertEquals(-1L, recordMetadata.offset());
+        }
+    }
+
+    /**
+     * With ack == 1 the future metadata will throw ExecutionException caused 
by RecordTooLargeException
+     */
+    @ClusterTest
+    void testTooLargeRecordWithAckOne(ClusterInstance clusterInstance) throws 
InterruptedException {
+        clusterInstance.createTopic(topic1, 1, (short) 
clusterInstance.brokers().size());
+
+        try (Producer<byte[], byte[]> producer = 
clusterInstance.producer(producerConfig(1))) {
+            // send a too-large record
+            ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(topic1, null, "key".getBytes(), new 
byte[serverMessageMaxBytes + 1]);
+            assertThrows(ExecutionException.class, () -> 
producer.send(record).get());
+        }
+    }
+
+
+    /**
+     * This should succeed as the replica fetcher thread can handle oversized 
messages since KIP-74
+     */
+    @ClusterTest
+    void testPartitionTooLargeForReplicationWithAckAll(ClusterInstance 
clusterInstance) throws InterruptedException,
+            ExecutionException {
+        checkTooLargeRecordForReplicationWithAckAll(clusterInstance, 
replicaFetchMaxPartitionBytes);
+    }
+
+    /**
+     * This should succeed as the replica fetcher thread can handle oversized 
messages since KIP-74
+     */
+    @ClusterTest
+    void testResponseTooLargeForReplicationWithAckAll(ClusterInstance 
clusterInstance) throws InterruptedException,
+            ExecutionException {
+        checkTooLargeRecordForReplicationWithAckAll(clusterInstance, 
replicaFetchMaxResponseBytes);
+    }
+
+
+    /**
+     * With non-exist-topic the future metadata should return 
ExecutionException caused by TimeoutException
+     */
+    @ClusterTest
+    void testNonExistentTopic(ClusterInstance clusterInstance) {
+        // send a record with non-exist topic
+        ProducerRecord<byte[], byte[]> record =
+                new ProducerRecord<>(topic2, null, "key".getBytes(), 
"value".getBytes());
+        try (Producer<byte[], byte[]> producer = 
clusterInstance.producer(producerConfig(0))) {
+            assertThrows(ExecutionException.class, () -> 
producer.send(record).get());
+        }
+    }
+
+
+    /**
+     * With incorrect broker-list the future metadata should return 
ExecutionException caused by TimeoutException
+     * <p>
+     * TODO: other exceptions that can be thrown in ExecutionException:
+     *    UnknownTopicOrPartitionException
+     *    NotLeaderOrFollowerException
+     *    LeaderNotAvailableException
+     *    CorruptRecordException
+     *    TimeoutException
+     */
+    @ClusterTest
+    void testWrongBrokerList(ClusterInstance clusterInstance) throws 
InterruptedException {
+        clusterInstance.createTopic(topic1, 1, (short) 1);
+        // producer with incorrect broker list
+        Map<String, Object> producerConfig = new HashMap<>(producerConfig(1));
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:8686,localhost:4242");
+        try (Producer<byte[], byte[]> producer = 
clusterInstance.producer(producerConfig)) {
+            // send a record with incorrect broker list
+            ProducerRecord<byte[], byte[]> record =
+                    new ProducerRecord<>(topic1, null, "key".getBytes(), 
"value".getBytes());
+            assertThrows(ExecutionException.class, () -> 
producer.send(record).get());
+        }
+    }
+
+    /**
+     * Send with invalid partition id should return ExecutionException caused 
by TimeoutException
+     * when partition is higher than the upper bound of partitions.
+     */
+    @ClusterTest
+    void testInvalidPartition(ClusterInstance clusterInstance) throws 
InterruptedException {
+        // create topic with a single partition
+        clusterInstance.createTopic(topic1, 1, (short) 
clusterInstance.brokers().size());
+
+        // create a record with incorrect partition id (higher than the number 
of partitions), send should fail
+        try (Producer<byte[], byte[]> producer = 
clusterInstance.producer(producerConfig(0))) {
+            ProducerRecord<byte[], byte[]> higherRecord =
+                    new ProducerRecord<>(topic1, 1, "key".getBytes(), 
"value".getBytes());
+            Exception e = assertThrows(ExecutionException.class, () -> 
producer.send(higherRecord).get());
+            assertEquals(TimeoutException.class, e.getCause().getClass());
+        }
+    }
+
+
+    /**
+     * The send call after producer closed should throw IllegalStateException
+     */
+    @ClusterTest
+    void testSendAfterClosed(ClusterInstance clusterInstance) throws 
InterruptedException, ExecutionException {
+        // create topic
+        clusterInstance.createTopic(topic1, 1, (short) 
clusterInstance.brokers().size());
+
+        Producer<byte[], byte[]> producer1 = 
clusterInstance.producer(producerConfig(0));
+        Producer<byte[], byte[]> producer2 = 
clusterInstance.producer(producerConfig(1));
+        Producer<byte[], byte[]> producer3 = 
clusterInstance.producer(producerConfig(-1));
+
+        ProducerRecord<byte[], byte[]> record =
+                new ProducerRecord<>(topic1, null, "key".getBytes(), 
"value".getBytes());
+        // first send a message to make sure the metadata is refreshed
+        producer1.send(record).get();
+        producer2.send(record).get();
+        producer3.send(record).get();
+
+        producer1.close();
+        assertThrows(IllegalStateException.class, () -> 
producer1.send(record));
+        producer2.close();
+        assertThrows(IllegalStateException.class, () -> 
producer2.send(record));
+        producer3.close();
+        assertThrows(IllegalStateException.class, () -> 
producer3.send(record));
+    }
+
+    @ClusterTest
+    void testCannotSendToInternalTopic(ClusterInstance clusterInstance) throws 
InterruptedException {
+        try (Admin admin = clusterInstance.admin()) {
+            Map<String, String> topicConfig = new HashMap<>();
+            clusterInstance.brokers().get(0)
+                    .groupCoordinator()
+                    .groupMetadataTopicConfigs()
+                    .forEach((k, v) -> topicConfig.put(k.toString(), 
v.toString()));
+            admin.createTopics(List.of(new 
NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 1, (short) 1).configs(topicConfig)));
+            clusterInstance.waitForTopic(Topic.GROUP_METADATA_TOPIC_NAME, 0);
+        }
+
+        try (Producer<byte[], byte[]> producer = 
clusterInstance.producer(producerConfig(1))) {
+            Exception thrown = assertThrows(ExecutionException.class,
+                    () -> producer.send(new 
ProducerRecord<>(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes(),
+                            "test".getBytes())).get());
+            assertInstanceOf(InvalidTopicException.class, thrown.getCause(),
+                    () -> "Unexpected exception while sending to an invalid 
topic " + thrown.getCause());
+        }
+    }
+
+    @ClusterTest
+    void testNotEnoughReplicasAfterBrokerShutdown(ClusterInstance 
clusterInstance) throws InterruptedException,
+            ExecutionException {
+        String topicName = "minisrtest2";
+        int brokerNum = clusterInstance.brokers().size();
+        Map<String, String> topicConfig = Map.of(MIN_IN_SYNC_REPLICAS_CONFIG, 
String.valueOf(brokerNum));
+        try (Admin admin = clusterInstance.admin()) {
+            admin.createTopics(List.of(new NewTopic(topicName, 1, (short) 
brokerNum).configs(topicConfig)));
+        }
+
+        ProducerRecord<byte[], byte[]> record =
+                new ProducerRecord<>(topicName, null, "key".getBytes(), 
"value".getBytes());
+
+        try (Producer<byte[], byte[]> producer = 
clusterInstance.producer(producerConfig(-1))) {
+            // this should work with all brokers up and running
+            producer.send(record).get();
+            // shut down one broker
+            KafkaBroker oneBroker = clusterInstance.brokers().get(0);
+            oneBroker.shutdown();
+            oneBroker.awaitShutdown();
+
+            Exception e = assertThrows(ExecutionException.class, () -> 
producer.send(record).get());
+            assertTrue(e.getCause() instanceof NotEnoughReplicasException ||
+                    e.getCause() instanceof 
NotEnoughReplicasAfterAppendException ||
+                    e.getCause() instanceof TimeoutException);
+
+            // restart the server
+            oneBroker.startup();
+        }
+    }
+
+    private void checkTooLargeRecordForReplicationWithAckAll(ClusterInstance 
clusterInstance, int maxFetchSize) throws InterruptedException, 
ExecutionException {
+        int maxMessageSize = maxFetchSize + 100;
+        int brokerSize = clusterInstance.brokers().size();
+        Map<String, String> topicConfig = new HashMap<>();
+        topicConfig.put(MIN_IN_SYNC_REPLICAS_CONFIG, 
String.valueOf(brokerSize));
+        topicConfig.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 
String.valueOf(maxMessageSize));
+
+        // create topic
+        String topic10 = "topic10";
+        try (Admin admin = clusterInstance.admin()) {
+            admin.createTopics(List.of(new NewTopic(topic10, brokerSize, 
(short) brokerSize).configs(topicConfig)));
+            clusterInstance.waitTopicDeletion("topic10");
+        }
+
+        // send a record that is too large for replication, but within the 
broker max message limit
+        byte[] value =
+                new byte[maxMessageSize - 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD];
+        Producer<byte[], byte[]> producer = 
clusterInstance.producer(producerConfig(-1));
+        try (producer) {

Review Comment:
   ditto: We can move create producer into try-with-resource



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to