Hello, We have recently upgraded our Kafka brokers to version 4.0.0 and have noticed that our latencies are substantially worse than on 3.9.1 (both versions running with KRaft).
We have created a small project that demonstrates the issue. It has a single producer and consumer and measures how long it takes between the producer sending the record and the consumer pulling it off (both clients are using version 3.9.1 in the experiments). A record is sent every millisecond. Below are the results: Kafka 3.9.1 50th percentile: 297 us 90th percentile: 366 us 99th percentile: 528 us 99.9th percentile: 1,460 us Kafka 4.0.0 50th percentile: 8,178 us 90th percentile: 10,355 us 99th percentile: 11,042 us 99.9th percentile: 13,638 us Kafka has been started locally in a docker container using `docker run -p 9092:9092 apache/kafka:3.9.1` and then `docker run -p 9092:9092 apache/kafka:4.0.0`. As shown, upgrading versions appears to cause ~20x increase in latency which we believe is unexpected. Interestingly all the latency seems to be due to `commitSync()` on the consumer, if that line is commented out then the latencies between the 3.9.1 setup and 4.0.0 are comparable. If we decrease the rate at which records are sent then this also reduces the latency observed when using version 4.0.0 (e.g sending a record every 20ms instead of 1ms). We were wondering if this is to be expected (if so are there any ways to improve it without committing asynchronously?) or maybe it is a degradation in performance that needs to be addressed? Here is the Java code used to generate the above results: ``` public class Main { private static final int MESSAGE_SENDING_INTERVAL_MILLIS = 1; private static final int NUMBER_OF_MESSAGES = 50_000; private static final Histogram HISTOGRAM = new Histogram(5); public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException, IOException { final AtomicBoolean shutdown = new AtomicBoolean(false); final String kafkaUrl = "localhost:9092"; final AdminClient client = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl)); client.deleteTopics(List.of("test")); client.createTopics(Collections.singleton(new NewTopic("test", 1, (short) 1))).all().get(100, TimeUnit.SECONDS); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); final Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); final KafkaProducer<String, Long> kafkaProducer = new KafkaProducer<>(producerProps); final KafkaConsumer<String, Long> kafkaConsumer = new KafkaConsumer<>(consumerProps); kafkaConsumer.assign(List.of(new TopicPartition("test", 0))); try (final ExecutorService executorService = Executors.newFixedThreadPool(2)) { executorService.execute(() -> { sendMessages(kafkaProducer, shutdown); kafkaConsumer.wakeup(); }); executorService.execute(() -> receiveMessages(kafkaConsumer, shutdown)); } System.out.println("50th percentile: " + HISTOGRAM.getValueAtPercentile(50) / 1_000 + "us"); System.out.println("90th percentile: " + HISTOGRAM.getValueAtPercentile(90) / 1_000 + "us"); System.out.println("99th percentile: " + HISTOGRAM.getValueAtPercentile(99) / 1_000 + "us"); System.out.println("99.9th percentile: " + HISTOGRAM.getValueAtPercentile(99.9) / 1_000 + "us"); kafkaConsumer.close(); kafkaProducer.close(); client.close(); } private static void sendMessages(final KafkaProducer<String, Long> kafkaProducer, final AtomicBoolean shutdown) { for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { kafkaProducer.send(new ProducerRecord<>("test", System.nanoTime())); if (i % 1000 == 0) { System.out.println("Sent: " + i); } sleep(MESSAGE_SENDING_INTERVAL_MILLIS); } shutdown.set(true); } private static void receiveMessages(final KafkaConsumer<String, Long> kafkaConsumer, final AtomicBoolean shutdown) { try { while (!shutdown.get()) { final ConsumerRecords<String, Long> records = kafkaConsumer.poll(Duration.ofSeconds(60)); records.forEach(record -> HISTOGRAM.recordValue(System.nanoTime() - record.value()) ); kafkaConsumer.commitSync(); } } catch (final WakeupException e) { // ignored } } private static void sleep(final long sleepTimeMillis) { try { Thread.sleep(sleepTimeMillis); } catch (InterruptedException e) { throw new RuntimeException(e); } } } ``` Any pointers or explanations would be greatly appreciated! Many thanks, Dan This message and its attachments are confidential, may not be disclosed or used by any person other than the addressee and are intended only for the named recipient(s). If you are not the intended recipient, please notify the sender immediately and delete any copies of this message. LMAX Group is the holding company of LMAX Exchange, LMAX Global and LMAX Digital. Our registered address is Yellow Building, 1A Nicholas Road, London W11 4AN.