[ https://issues.apache.org/jira/browse/KAFKA-14667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Viktor Somogyi-Vass reassigned KAFKA-14667: ------------------------------------------- Assignee: Viktor Somogyi-Vass > Delayed leader election operation gets stuck in purgatory > --------------------------------------------------------- > > Key: KAFKA-14667 > URL: https://issues.apache.org/jira/browse/KAFKA-14667 > Project: Kafka > Issue Type: Bug > Affects Versions: 3.1.1 > Reporter: Daniel Urban > Assignee: Viktor Somogyi-Vass > Priority: Major > > This was observed with Kafka 3.1.1, but I believe that latest versions are > also affected. > In the Cruise Control project, there is an integration test: > com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle > On our infrastructure, this test fails every ~20th run with a timeout - the > triggered preferred leadership election is never completed. After some > investigation, it turns out that: > # The admin client never gets a response from the broker. > # The leadership change is executed successfully. > # The ElectLeader purgatory never gets an update for the relevant topic > partition. > A few relevant lines from a failed run (this test uses an embedded cluster, > logs are mixed): > CC successfully sends a preferred election request to the controller (broker > 0), topic1-0 needs a leadership change from broker 0 to broker 1: > {code:java} > 2023-02-01 01:20:26.028 [controller-event-thread] DEBUG > kafka.controller.KafkaController - [Controller id=0] Waiting for any > successful result for election type (PREFERRED) by AdminClientTriggered for > partitions: Map(topic1-0 -> Right(1), topic0-0 -> > Left(ApiError(error=ELECTION_NOT_NEEDED, message=Leader election not needed > for topic partition.))) > 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG > kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: > HashMap(topic1-0 -> 1) {code} > The delayed operation for the leader election is triggered 2 times in quick > succession (yes, same ms in both logs): > {code:java} > 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG > kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: > HashMap(topic1-0 -> 1) > 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG > kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: > HashMap(topic1-0 -> 1){code} > Shortly after (few ms later based on the logs), broker 0 receives an > UpdateMetadataRequest from the controller (itself) and processes it: > {code:java} > 2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG > org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] > Sending UPDATE_METADATA request with header > RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, > correlationId=19) and timeout 30000 to node 0: > UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, > ungroupedPartitionStates=[], > topicStates=[UpdateMetadataTopicState(topicName='topic1', > topicId=gkFP8VnkSGyEf_LBBZSowQ, > partitionStates=[UpdateMetadataPartitionState(topicName='topic1', > partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], > zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], > liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, > endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', > listener='PLAINTEXT', securityProtocol=0)], rack=null), > UpdateMetadataBroker(id=0, v0Host='', v0Port=0, > endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', > listener='PLAINTEXT', securityProtocol=0)], rack=null)]) > 2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG > org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] > Received UPDATE_METADATA response from node 0 for request with header > RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, > correlationId=19): UpdateMetadataResponseData(errorCode=0) > 2023-02-01 01:20:26.035 > [data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG > kafka.request.logger - Completed > request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} > {code} > The update metadata request should trigger an update on the ElectLeader > purgatory, and we should see a log line like this: "Request key X unblocked Y > ElectLeader." > In the failing test, this last line never appears. In successful tests, it > appears. > > I believe that kafka.server.KafkaApis#handleUpdateMetadataRequest, > kafka.server.ReplicaManager#hasDelayedElectionOperations and > kafka.server.DelayedOperationPurgatory#tryCompleteElseWatch have a > concurrency issue. > handleUpdateMetadataRequest calls hasDelayedElectionOperations which doesn't > lock on the state of the purgatory: > {code:java} > if (replicaManager.hasDelayedElectionOperations) { > updateMetadataRequest.partitionStates.forEach { partitionState => > val tp = new TopicPartition(partitionState.topicName, > partitionState.partitionIndex) > replicaManager.tryCompleteElection(TopicPartitionOperationKey(tp)) > } > } {code} > Since the "Request key X unblocked Y ElectLeader." log never appears in the > failed run, but the request processing finishes (so it is not a deadlock in > the request handler), it is safe to assume that handleUpdateMetadataRequest > never enters the then branch. > I don't have an exact scenario how can this happen (a concurrent metadata > update and a delayed elect leader operation are not "syncing" up), but this > definitely looks like a concurrency problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)