[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson resolved KAFKA-7280. ------------------------------------ Resolution: Fixed Fix Version/s: (was: 1.1.2) > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -------------------------------------------------------------------------- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 1.1.1, 2.0.0 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Critical > Fix For: 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for > node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) > (org.apache.kafka.clients.FetchSessionHandler) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), > toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: > 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Sending FETCH > {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=109800960,epoch=237,topics=[],forgotten_topics_data=[ > Unknown macro: \{topic=test_topic,partitions=[2]} > ]} with correlation id 418 to node 2 (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-2 because there is an > in-flight request to worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > {quote} > The sequence in the logs show > # FETCH response received > # FetchSessionHandler#sessionPartitions is updated (a partition is removed) > # New FETCH request is sent > # Heartbeat thread throws ConcurrentModificationException while iterating > over FetchSessionHandler#sessionPartitions > This could be because 1) and 4) were on the heartbeat thread and 2) and 3) on > the thread processing Consumer#poll(). > -- This message was sent by Atlassian JIRA (v7.6.3#76005)