Vinicius Vieira dos Santos created KAFKA-18471: --------------------------------------------------
Summary: Race conditions when accessing RecordHeader data Key: KAFKA-18471 URL: https://issues.apache.org/jira/browse/KAFKA-18471 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.8.1 Reporter: Vinicius Vieira dos Santos There is a race condition in the {{RecordHeader}} class of Kafka when an instance is created using the [[constructor with ByteBuffer|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38]{{{}{}}}|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38]. In this scenario, when attempting to access the {{key}} or {{{}value{}}}, a process copies the {{ByteBuffer}} into a byte array. During this process, multiple threads may simultaneously invoke the method responsible for the copying. This can lead to a situation where one thread successfully completes the operation, while another abruptly has the buffer set to {{null}} during the process. Exception example: {code:java} Exception in thread "pool-1-thread-3" java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.remaining()" because "this.keyBuffer" is null at org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45) at br.com.autbank.workflow.TestMainExample.lambda$main$0(TestMainExample.java:36) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at br.com.autbank.workflow.TestMainExample.lambda$main$1(TestMainExample.java:32) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) {code} Code example for error: {code:java} public class TestMainExample { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 200_000; i++) { Charset charset = StandardCharsets.UTF_8; RecordHeaders headers = new RecordHeaders(); headers.add(new RecordHeader(charset.encode("header-key-1"), charset.encode("value-1"))); headers.add(new RecordHeader(charset.encode("header-key-2"), charset.encode("value-2"))); headers.add(new RecordHeader(charset.encode("header-key-3"), charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]"))); headers.add(new RecordHeader(charset.encode("header-key-4"), charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]"))); headers.add(new RecordHeader(charset.encode("header-key-5"), charset.encode("account-number"))); headers.add(new RecordHeader(charset.encode("header-key-6"), charset.encode("operation-id"))); headers.add(new RecordHeader(charset.encode("header-key-7"), charset.encode("agency-code"))); headers.add(new RecordHeader(charset.encode("header-key-8"), charset.encode("branch-code"))); CountDownLatch count = new CountDownLatch(5); for (int j = 0; j < 5; j++) { executorService.execute(() -> { try { headers.forEach((hdr) -> { if (hdr.value() == null) { throw new IllegalStateException("Bug find on value"); } if (hdr.key() == null) { throw new IllegalStateException("Bug find on key"); } }); } finally { count.countDown(); } }); } count.await(); } } } {code} I did a test synchronizing the method I use to access the headers and this resolved the problem in the context of my application, but I believe the ideal would be to either mark that the class is not thread safe or synchronize access to the bytebuffer. Thank you in advance to the team. -- This message was sent by Atlassian Jira (v8.20.10#820010)