Hi all, I am facing a problem when I detect an exception in kafka producer callback, I immediately stop sending more new records and stop the kafka producer, but some extra records were still sent.
I found a way to resolve this issue: setting max.in.flight.requests.per.connection to 1 and closing kafka producer when encountering an exception in kafka producer callback. set max.in.flight.requests.per.connection to 1 will make sure only one request will be inflight for one partition, and closing kafka producer in producer callback will make Sender in "forceClose" state thus avoiding sending extra records. But, as far as I know, setting max.in.flight.requests.per.connection to 1 will decrease the performance of kafka producer. I would like to know, is there any other way to work around this issue without setting max.in.flight.requests.per.connection to 1 so that I can ensure performance of kafka producer? here is my demo source code, you can also find it on Github Gist: https://gist.github.com/52Heartz/a5d67cf266b35bafcbfa7bc2552f4576 public class KafkaProducerProblemDemo { public static void main(String[] args) { Logger rootLogger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); rootLogger.setLevel(Level.INFO); String topicName = "test_topic_202403112035"; Map<String, String> kafkaTopicConfigs = new HashMap<>(); Properties props = new Properties(); props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "3000"); props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, " 192.168.223.3:9094"); CreateTopicsResult createTopicsResult; try (AdminClient adminClient = AdminClient.create(props)) { NewTopic newTopic = new NewTopic(topicName, 1, (short) 1); newTopic.configs(kafkaTopicConfigs); kafkaTopicConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1048576"); kafkaTopicConfigs.put(TopicConfig.RETENTION_BYTES_CONFIG, "1048576"); kafkaTopicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, "86400000"); createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic)); System.out.println(createTopicsResult.all().get()); } catch (Exception e) { rootLogger.error("create topic error", e); } // adjust requestTimeout to ensure the request timeout is enough long requestTimeout = 2000; Properties kafkaProps = new Properties(); kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeout)); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class.getName()); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.ByteArraySerializer.class.getName()); kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, " 192.168.223.3:9094"); kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, String.valueOf(requestTimeout)); kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152); // force one batch per record kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "1"); kafkaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); try (KafkaProducer<String, byte[]> kafkaProducer = new KafkaProducer<>(kafkaProps)) { AtomicBoolean isFirstRecord = new AtomicBoolean(true); AtomicReference<Exception> sendException = new AtomicReference<>(); for (int i = 0; i < 2048; i++) { String content = String.valueOf(i); ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, content.getBytes()); if (sendException.get() != null) { // once found exception in callback, stop sending more records kafkaProducer.close(); break; } kafkaProducer.send(record, (RecordMetadata metadata, Exception exception) -> { if (isFirstRecord.getAndSet(false)) { try { // sleep more than twice the DELIVERY_TIMEOUT_MS_CONFIG to make waiting batch expired // simulate spend too much time in kafka callback Thread.sleep(requestTimeout * 2 + 1000); } catch (Exception e) { throw new RuntimeException(e); } } if (exception != null) { rootLogger.error("send data failed, record content: {}, reason: {}", content, exception.toString()); sendException.compareAndSet(null, exception); } else { rootLogger.info("send data success, offset: {}, record content: {}", metadata.offset(), content); } }); Thread.sleep(1000); } } catch (Exception e) { rootLogger.error("send data error", e); } } }