Andrew Choi created KAFKA-9769:
----------------------------------

             Summary: ReplicaManager Partition.makeFollower Increases 
LeaderEpoch when ZooKeeper disconnect occurs
                 Key: KAFKA-9769
                 URL: https://issues.apache.org/jira/browse/KAFKA-9769
             Project: Kafka
          Issue Type: Bug
          Components: replication
            Reporter: Andrew Choi


The ZooKeeper Session once expired and got disconnected and the broker received 
the 1st LeaderAndIsr request simultaneously. As the broker was processing the 
1st LeaderAndIsr Request, the ZooKeeper session has not been reestablished just 
yet.

Within the makeFollowers method, _partition.getOrCreateReplica_ is called 
before the fetcher begins. _partition.getOrCreateReplica_ needs to fetch 
information from ZooKeeper but an exception is thrown when calling the 
ZooKeeper client because the session is invalid, rendering the fetcher start to 
be skipped.

 

In Partition class's getOrCreateReplica method calls AdminZkClient's 
fetchEntityConfig(..) which throws an exception if the ZooKeeper session is 
invalid. 

 
{code:java}
val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic){code}
 

When this occurs, the leader epoch should not have been incremented due to 
ZooKeeper being invalid because once the second LeaderAndIsr request comes in, 
the leader epoch could be the same between the brokers. 

Few options I can think of for a fix. I think third route could be feasible:

1 - Make LeaderEpoch update and fetch update atomic.

2 - Wait until all individual partitions are successful without problems then 
process fetch.

3 - Catch the ZooKeeper exception in the caller code block 
(ReplicaManager.makeFollowers) and simply do not touch the remaining partitions 
to ensure that the batch of successful partitions up to that point are updated 
and processed (fetch).

4 - Or make LeaderAndIsr request never arrive at the broker in case of 
ZooKeeper disconnect, then that would be safe because it is already possible 
for some replicas to receive the LeaderAndIsr later than the others. However, 
in that case, the code need to make sure the controller will retry.

 
{code:java}
 else if (requestLeaderEpoch > currentLeaderEpoch) {
 // If the leader epoch is valid record the epoch of the controller that made 
the leadership decision. 
// This is useful while updating the isr to maintain the decision maker 
controller's epoch in the zookeeper path 
if (stateInfo.basePartitionState.replicas.contains(localBrokerId))       
partitionState.put(partition, stateInfo)
else 
 
def getOrCreateReplica(replicaId: Int, isNew: Boolean = false): Replica = {     
     allReplicasMap.getAndMaybePut(replicaId, { 
if (isReplicaLocal(replicaId)) {
val adminZkClient = new AdminZkClient(zkClient) val props = 
adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)




{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to