Hello,

We have recently upgraded our Kafka brokers to version 4.0.0 and have noticed 
that our latencies are substantially worse than on 3.9.1 (both versions running 
with KRaft).

We have created a small project that demonstrates the issue. It has a single 
producer and consumer and measures how long it takes between the producer 
sending the record and the consumer pulling it off (both clients are using 
version 3.9.1 in the experiments). A record is sent every millisecond.
Below are the results:

Kafka 3.9.1

50th percentile:     297 us
90th percentile:     366 us
99th percentile:     528 us
99.9th percentile: 1,460 us

Kafka 4.0.0

50th percentile:    8,178 us
90th percentile:   10,355 us
99th percentile:   11,042 us
99.9th percentile: 13,638 us

Kafka has been started locally in a docker container using `docker run -p 
9092:9092 apache/kafka:3.9.1` and then `docker run -p 9092:9092 
apache/kafka:4.0.0`.

As shown, upgrading versions appears to cause ~20x increase in latency which we 
believe is unexpected. Interestingly all the latency seems to be due to 
`commitSync()` on the consumer, if that line is commented out then the 
latencies between the 3.9.1 setup and 4.0.0 are comparable.
If we decrease the rate at which records are sent then this also reduces the 
latency observed when using version 4.0.0 (e.g sending a record every 20ms 
instead of 1ms).

We were wondering if this is to be expected (if so are there any ways to 
improve it without committing asynchronously?) or maybe it is a degradation in 
performance that needs to be addressed?

Here is the Java code used to generate the above results:

```
public class Main
{
    private static final int MESSAGE_SENDING_INTERVAL_MILLIS = 1;
    private static final int NUMBER_OF_MESSAGES = 50_000;
    private static final Histogram HISTOGRAM = new Histogram(5);

    public static void main(String[] args) throws InterruptedException, 
ExecutionException, TimeoutException, IOException
    {
        final AtomicBoolean shutdown = new AtomicBoolean(false);

        final String kafkaUrl = "localhost:9092";
        final AdminClient client = 
AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaUrl));

        client.deleteTopics(List.of("test"));
        client.createTopics(Collections.singleton(new NewTopic("test", 1, 
(short) 1))).all().get(100, TimeUnit.SECONDS);

        final Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, 
UUID.randomUUID().toString());
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class.getName());

        final Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        final KafkaProducer<String, Long> kafkaProducer = new 
KafkaProducer<>(producerProps);
        final KafkaConsumer<String, Long> kafkaConsumer = new 
KafkaConsumer<>(consumerProps);

        kafkaConsumer.assign(List.of(new TopicPartition("test", 0)));

        try (final ExecutorService executorService = 
Executors.newFixedThreadPool(2))
        {
            executorService.execute(() -> {
                sendMessages(kafkaProducer, shutdown);
                kafkaConsumer.wakeup();
            });
            executorService.execute(() -> receiveMessages(kafkaConsumer, 
shutdown));
        }

        System.out.println("50th percentile: " + 
HISTOGRAM.getValueAtPercentile(50) / 1_000 + "us");
        System.out.println("90th percentile: " + 
HISTOGRAM.getValueAtPercentile(90) / 1_000 + "us");
        System.out.println("99th percentile: " + 
HISTOGRAM.getValueAtPercentile(99) / 1_000 + "us");
        System.out.println("99.9th percentile: " + 
HISTOGRAM.getValueAtPercentile(99.9) / 1_000 + "us");

        kafkaConsumer.close();
        kafkaProducer.close();
        client.close();
    }

    private static void sendMessages(final KafkaProducer<String, Long> 
kafkaProducer, final AtomicBoolean shutdown)
    {
        for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
        {
            kafkaProducer.send(new ProducerRecord<>("test", System.nanoTime()));
            if (i % 1000 == 0)
            {
                System.out.println("Sent: " + i);
            }
            sleep(MESSAGE_SENDING_INTERVAL_MILLIS);
        }
        shutdown.set(true);
    }

    private static void receiveMessages(final KafkaConsumer<String, Long> 
kafkaConsumer, final AtomicBoolean shutdown)
    {
        try
        {
            while (!shutdown.get())
            {
                final ConsumerRecords<String, Long> records = 
kafkaConsumer.poll(Duration.ofSeconds(60));
                records.forEach(record ->
                        HISTOGRAM.recordValue(System.nanoTime() - 
record.value())
                );
                kafkaConsumer.commitSync();
            }
        }
        catch (final WakeupException e)
        {
            // ignored
        }
    }

    private static void sleep(final long sleepTimeMillis)
    {
        try
        {
            Thread.sleep(sleepTimeMillis);
        }
        catch (InterruptedException e)
        {
            throw new RuntimeException(e);
        }
    }
}
```

Any pointers or explanations would be greatly appreciated!

Many thanks,
Dan

This message and its attachments are confidential, may not be disclosed or used 
by any person other than the addressee and are intended only for the named 
recipient(s). If you are not the intended recipient, please notify the sender 
immediately and delete any copies of this message.

LMAX Group is the holding company of LMAX Exchange, LMAX Global and LMAX 
Digital. Our registered address is Yellow Building, 1A Nicholas Road, London 
W11 4AN.

Reply via email to