Kamil Adam Nowak created KAFKA-18024:
----------------------------------------

             Summary: ConcurrentModificationException in Kafka OffsetFetcher - 
Proposal for Thread-Safety Fix
                 Key: KAFKA-18024
                 URL: https://issues.apache.org/jira/browse/KAFKA-18024
             Project: Kafka
          Issue Type: Bug
            Reporter: Kamil Adam Nowak
            Assignee: Kamil Adam Nowak


I am using Kafka (specifically kafka-clients.jar version 3.5.1). 
I am experiencing an issue that, based on the code, has not been fixed in newer 
versions.
 
During the working of my microservice, the following error appears:
{code}
[2024-11-12T10:57:37.997+02:00] [ERROR] 
[myservice.kafka.consumer.handler.KafkaConsumerHandler] 
[dataChangeEventConsumer_0] [] [] [null] [: 
java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.remove(HashMap.java:1507)
at java.base/java.util.AbstractCollection.retainAll(AbstractCollection.java:420)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:228)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:223)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:489)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:480)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.handleListOffsetResponse(OffsetFetcher.java:665)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.access$1000(OffsetFetcher.java:67)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:572)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:567)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1460)
]
{code}
 
These are new logs from 2024-11-12, but I had the same issue in another 
microservice a few months ago, on 2024-07-05:
{code}
2024-07-05 01:11:11,341 [ERROR] 
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
[kafka-coordinator-heartbeat-thread | myServiceResult_consumer] [] [] [Consumer 
clientId=myServiceResult_consumer_0, groupId=myServiceResult_consumer] 
Heartbeat thread failed due to unexpected error: 
java.util.ConcurrentModificationException
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1493)
at java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1516)
at java.base/java.util.AbstractCollection.retainAll(AbstractCollection.java:419)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:228)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:223)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:489)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:480)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.handleListOffsetResponse(OffsetFetcher.java:665)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.access$1000(OffsetFetcher.java:67)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:572)
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:567)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1460)
{code}
 
It is the same error, just from a different place. But the source is the same: 
ConcurrentModificationException from OffsetFetcher:
https://github.com/apache/kafka/blob/3.5.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L228
{code}
...
219        Map<TopicPartition, Long> remainingToSearch = new 
HashMap<>(timestampsToSearch);
220        do {
221            RequestFuture<ListOffsetResult> future = 
sendListOffsetsRequests(remainingToSearch, requireTimestamps);
222
223            future.addListener(new RequestFutureListener<ListOffsetResult>() 
{
224                @Override
225                public void onSuccess(ListOffsetResult value) {
226                    synchronized (future) {
227                        result.fetchedOffsets.putAll(value.fetchedOffsets);
228                        
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
...
{code}
 
So the issue is the call:
{code}
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
{code}
on `remainingToSearch`, which is a regular `HashMap`. For some reason, 
concurrent modification is possible here. The `.keySet()` returns a simple 
`Set`, which is not thread-safe.
 
I believe it would be good to make the code thread-safe to prevent this error.
 
My proposal is to replace:
{code}
Map<TopicPartition, Long> remainingToSearch = new HashMap<>(timestampsToSearch);
{code}
with:
{code}
Map<TopicPartition, Long> remainingToSearch = new 
ConcurrentHashMap<>(timestampsToSearch);
{code}
 
`ConcurrentHashMap` returns a thread-safe `KeySetView` when `.keySet()` is 
called.
 
In newer versions of kafka-clients.jar (OffsetFetcher.java), this behavior 
remains the same (as in my used version 3.5.1), so the same error will occur. 
Therefore, I suggest applying this fix starting from the latest version.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to