Kamil Adam Nowak created KAFKA-18024: ----------------------------------------
Summary: ConcurrentModificationException in Kafka OffsetFetcher - Proposal for Thread-Safety Fix Key: KAFKA-18024 URL: https://issues.apache.org/jira/browse/KAFKA-18024 Project: Kafka Issue Type: Bug Reporter: Kamil Adam Nowak Assignee: Kamil Adam Nowak I am using Kafka (specifically kafka-clients.jar version 3.5.1). I am experiencing an issue that, based on the code, has not been fixed in newer versions. During the working of my microservice, the following error appears: {code} [2024-11-12T10:57:37.997+02:00] [ERROR] [myservice.kafka.consumer.handler.KafkaConsumerHandler] [dataChangeEventConsumer_0] [] [] [null] [: java.util.ConcurrentModificationException at java.base/java.util.HashMap$HashIterator.remove(HashMap.java:1507) at java.base/java.util.AbstractCollection.retainAll(AbstractCollection.java:420) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:228) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:223) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:489) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:480) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.OffsetFetcher.handleListOffsetResponse(OffsetFetcher.java:665) at org.apache.kafka.clients.consumer.internals.OffsetFetcher.access$1000(OffsetFetcher.java:67) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:572) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:567) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1460) ] {code} These are new logs from 2024-11-12, but I had the same issue in another microservice a few months ago, on 2024-07-05: {code} 2024-07-05 01:11:11,341 [ERROR] [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [kafka-coordinator-heartbeat-thread | myServiceResult_consumer] [] [] [Consumer clientId=myServiceResult_consumer_0, groupId=myServiceResult_consumer] Heartbeat thread failed due to unexpected error: java.util.ConcurrentModificationException at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1493) at java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1516) at java.base/java.util.AbstractCollection.retainAll(AbstractCollection.java:419) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:228) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$1.onSuccess(OffsetFetcher.java:223) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:489) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$4.onSuccess(OffsetFetcher.java:480) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.OffsetFetcher.handleListOffsetResponse(OffsetFetcher.java:665) at org.apache.kafka.clients.consumer.internals.OffsetFetcher.access$1000(OffsetFetcher.java:67) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:572) at org.apache.kafka.clients.consumer.internals.OffsetFetcher$5.onSuccess(OffsetFetcher.java:567) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:617) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:427) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:312) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1460) {code} It is the same error, just from a different place. But the source is the same: ConcurrentModificationException from OffsetFetcher: https://github.com/apache/kafka/blob/3.5.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L228 {code} ... 219 Map<TopicPartition, Long> remainingToSearch = new HashMap<>(timestampsToSearch); 220 do { 221 RequestFuture<ListOffsetResult> future = sendListOffsetsRequests(remainingToSearch, requireTimestamps); 222 223 future.addListener(new RequestFutureListener<ListOffsetResult>() { 224 @Override 225 public void onSuccess(ListOffsetResult value) { 226 synchronized (future) { 227 result.fetchedOffsets.putAll(value.fetchedOffsets); 228 remainingToSearch.keySet().retainAll(value.partitionsToRetry); ... {code} So the issue is the call: {code} remainingToSearch.keySet().retainAll(value.partitionsToRetry); {code} on `remainingToSearch`, which is a regular `HashMap`. For some reason, concurrent modification is possible here. The `.keySet()` returns a simple `Set`, which is not thread-safe. I believe it would be good to make the code thread-safe to prevent this error. My proposal is to replace: {code} Map<TopicPartition, Long> remainingToSearch = new HashMap<>(timestampsToSearch); {code} with: {code} Map<TopicPartition, Long> remainingToSearch = new ConcurrentHashMap<>(timestampsToSearch); {code} `ConcurrentHashMap` returns a thread-safe `KeySetView` when `.keySet()` is called. In newer versions of kafka-clients.jar (OffsetFetcher.java), this behavior remains the same (as in my used version 3.5.1), so the same error will occur. Therefore, I suggest applying this fix starting from the latest version. -- This message was sent by Atlassian Jira (v8.20.10#820010)