[ https://issues.apache.org/jira/browse/KAFKA-9769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Rao resolved KAFKA-9769. ---------------------------- Fix Version/s: 2.7.0 Assignee: Andrew Choi Resolution: Fixed merged the PR to trunk > ReplicaManager Partition.makeFollower Increases LeaderEpoch when ZooKeeper > disconnect occurs > -------------------------------------------------------------------------------------------- > > Key: KAFKA-9769 > URL: https://issues.apache.org/jira/browse/KAFKA-9769 > Project: Kafka > Issue Type: Bug > Components: replication > Reporter: Andrew Choi > Assignee: Andrew Choi > Priority: Minor > Labels: kafka, replica, replication > Fix For: 2.7.0 > > > The ZooKeeper Session once expired and got disconnected and the broker > received the 1st LeaderAndIsr request simultaneously. As the broker was > processing the 1st LeaderAndIsr Request, the ZooKeeper session has not been > reestablished just yet. > Within the makeFollowers method, _partition.getOrCreateReplica_ is called > before the fetcher begins. _partition.getOrCreateReplica_ needs to fetch > information from ZooKeeper but an exception is thrown when calling the > ZooKeeper client because the session is invalid, rendering the fetcher start > to be skipped. > > In Partition class's getOrCreateReplica method calls AdminZkClient's > fetchEntityConfig(..) which throws an exception if the ZooKeeper session is > invalid. > > {code:java} > val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic){code} > > When this occurs, the leader epoch should not have been incremented due to > ZooKeeper being invalid because once the second LeaderAndIsr request comes > in, the leader epoch could be the same between the brokers. > Few options I can think of for a fix. I think third route could be feasible: > 1 - Make LeaderEpoch update and fetch update atomic. > 2 - Wait until all individual partitions are successful without problems then > process fetch. > 3 - Catch the ZooKeeper exception in the caller code block > (ReplicaManager.makeFollowers) and simply do not touch the remaining > partitions to ensure that the batch of successful partitions up to that point > are updated and processed (fetch). > 4 - Or make LeaderAndIsr request never arrive at the broker in case of > ZooKeeper disconnect, then that would be safe because it is already possible > for some replicas to receive the LeaderAndIsr later than the others. However, > in that case, the code need to make sure the controller will retry. > > {code:java} > else if (requestLeaderEpoch > currentLeaderEpoch) { > // If the leader epoch is valid record the epoch of the controller that made > the leadership decision. > // This is useful while updating the isr to maintain the decision maker > controller's epoch in the zookeeper path > if (stateInfo.basePartitionState.replicas.contains(localBrokerId)) > partitionState.put(partition, stateInfo) > else > > def getOrCreateReplica(replicaId: Int, isNew: Boolean = false): Replica = { > allReplicasMap.getAndMaybePut(replicaId, { > if (isReplicaLocal(replicaId)) { > val adminZkClient = new AdminZkClient(zkClient) val props = > adminZkClient.fetchEntityConfig(ConfigType.Topic, topic) > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)