[
https://issues.apache.org/jira/browse/KAFKA-18471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ming-Yen Chung resolved KAFKA-18471.
------------------------------------
Fix Version/s: 4.2.0
Resolution: Fixed
> Race conditions when accessing RecordHeader data
> -------------------------------------------------
>
> Key: KAFKA-18471
> URL: https://issues.apache.org/jira/browse/KAFKA-18471
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 3.8.1
> Reporter: Vinicius Vieira dos Santos
> Priority: Major
> Fix For: 4.2.0
>
>
> There is a race condition in the {{RecordHeader}} class of Kafka when an
> instance is created using the [[constructor with
> ByteBuffer|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38]{{{}{}}}|https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeader.java#L38].
> In this scenario, when attempting to access the {{key}} or {{{}value{}}}, a
> process copies the {{ByteBuffer}} into a byte array.
> During this process, multiple threads may simultaneously invoke the method
> responsible for the copying. This can lead to a situation where one thread
> successfully completes the operation, while another abruptly has the buffer
> set to {{null}} during the process.
>
> Exception example:
>
> {code:java}
> Exception in thread "pool-1-thread-3" java.lang.NullPointerException: Cannot
> invoke "java.nio.ByteBuffer.remaining()" because "this.keyBuffer" is null
> at
> org.apache.kafka.common.header.internals.RecordHeader.key(RecordHeader.java:45)
> at
> br.com.autbank.workflow.TestMainExample.lambda$main$0(TestMainExample.java:36)
> at java.base/java.lang.Iterable.forEach(Iterable.java:75)
> at
> br.com.autbank.workflow.TestMainExample.lambda$main$1(TestMainExample.java:32)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> at java.base/java.lang.Thread.run(Thread.java:1583) {code}
>
> Code example for error:
>
> {code:java}
> public class TestMainExample {
> public static void main(String[] args) throws InterruptedException {
> ExecutorService executorService = Executors.newFixedThreadPool(5);
> for (int i = 0; i < 200_000; i++) {
> Charset charset = StandardCharsets.UTF_8;
> RecordHeaders headers = new RecordHeaders();
> headers.add(new RecordHeader(charset.encode("header-key-1"),
> charset.encode("value-1")));
> headers.add(new RecordHeader(charset.encode("header-key-2"),
> charset.encode("value-2")));
> headers.add(new RecordHeader(charset.encode("header-key-3"),
> charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]")));
> headers.add(new RecordHeader(charset.encode("header-key-4"),
> charset.encode("2025-01-06T00:00:00.000000000-00:00[UTC]")));
> headers.add(new RecordHeader(charset.encode("header-key-5"),
> charset.encode("account-number")));
> headers.add(new RecordHeader(charset.encode("header-key-6"),
> charset.encode("operation-id")));
> headers.add(new RecordHeader(charset.encode("header-key-7"),
> charset.encode("agency-code")));
> headers.add(new RecordHeader(charset.encode("header-key-8"),
> charset.encode("branch-code")));
>
> CountDownLatch count = new CountDownLatch(5);
> for (int j = 0; j < 5; j++) {
> executorService.execute(() -> {
> try {
> headers.forEach((hdr) -> {
> if (hdr.value() == null) {
> throw new IllegalStateException("Bug find on
> value");
> }
> if (hdr.key() == null) {
> throw new IllegalStateException("Bug find on
> key");
> }
> });
> } finally {
> count.countDown();
> }
> });
> }
> count.await();
> }
> }
> } {code}
> I did a test synchronizing the method I use to access the headers and this
> resolved the problem in the context of my application, but I believe the
> ideal would be to either mark that the class is not thread safe or
> synchronize access to the bytebuffer. Thank you in advance to the team.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)