Vinicius Vieira dos Santos created KAFKA-18471:
--------------------------------------------------

             Summary:  Race conditions when accessing RecordHeader data
                 Key: KAFKA-18471
                 URL: https://issues.apache.org/jira/browse/KAFKA-18471
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 3.8.1
            Reporter: Vinicius Vieira dos Santos


There is a race condition in the {{RecordHeader}} class of Kafka when an 
instance is created using the [[constructor with 
ByteBuffer|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38]{{{}{}}}|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38].
 In this scenario, when attempting to access the {{key}} or {{{}value{}}}, a 
process copies the {{ByteBuffer}} into a byte array.

During this process, multiple threads may simultaneously invoke the method 
responsible for the copying. This can lead to a situation where one thread 
successfully completes the operation, while another abruptly has the buffer set 
to {{null}} during the process.

 

Exception example:

 
{code:java}
Exception in thread "pool-1-thread-3" java.lang.NullPointerException: Cannot 
invoke "java.nio.ByteBuffer.remaining()" because "this.keyBuffer" is null
    at 
org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45)
    at 
br.com.autbank.workflow.TestMainExample.lambda$main$0(TestMainExample.java:36)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at 
br.com.autbank.workflow.TestMainExample.lambda$main$1(TestMainExample.java:32)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583) {code}
 

Code example for error:

 
{code:java}
public class TestMainExample {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 200_000; i++) {
            Charset charset = StandardCharsets.UTF_8;
            RecordHeaders headers = new RecordHeaders();
            headers.add(new RecordHeader(charset.encode("header-key-1"), 
charset.encode("value-1")));
            headers.add(new RecordHeader(charset.encode("header-key-2"), 
charset.encode("value-2")));
            headers.add(new RecordHeader(charset.encode("header-key-3"), 
charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]")));
            headers.add(new RecordHeader(charset.encode("header-key-4"), 
charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]")));
            headers.add(new RecordHeader(charset.encode("header-key-5"), 
charset.encode("account-number")));
            headers.add(new RecordHeader(charset.encode("header-key-6"), 
charset.encode("operation-id")));
            headers.add(new RecordHeader(charset.encode("header-key-7"), 
charset.encode("agency-code")));
            headers.add(new RecordHeader(charset.encode("header-key-8"), 
charset.encode("branch-code")));
            
            CountDownLatch count = new CountDownLatch(5);
            for (int j = 0; j < 5; j++) {
                executorService.execute(() -> {
                    try {
                        headers.forEach((hdr) -> {
                            if (hdr.value() == null) {
                                throw new IllegalStateException("Bug find on 
value");
                            }
                            if (hdr.key() == null) {
                                throw new IllegalStateException("Bug find on 
key");
                            }
                        });
                    } finally {
                        count.countDown();
                    }
                });
            }
            count.await();
        }
    }
} {code}
I did a test synchronizing the method I use to access the headers and this 
resolved the problem in the context of my application, but I believe the ideal 
would be to either mark that the class is not thread safe or synchronize access 
to the bytebuffer. Thank you in advance to the team.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to