Maysam Yabandeh created KAFKA-3693:
--------------------------------------

             Summary: Race condition between highwatermark-checkpoint thread 
and handleLeaderAndIsrRequest at broker start-up
                 Key: KAFKA-3693
                 URL: https://issues.apache.org/jira/browse/KAFKA-3693
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 0.9.0.1
            Reporter: Maysam Yabandeh


Upon broker start-up, a race between highwatermark-checkpoint thread to write 
replication-offset-checkpoint file and handleLeaderAndIsrRequest thread reading 
from it causes the highwatermark for some partitions to be reset to 0. In the 
good case, this results the replica to truncate its entire log to 0 and hence 
initiates fetching of terabytes of data from the lead broker, which sometimes 
leads to hours of downtime. We observed the bad cases that the reset offset can 
propagate to recovery-point-offset-checkpoint file, making a lead broker to 
truncate the file. This seems to have the potential to lead to data loss if the 
truncation happens at both follower and leader brokers.

This is the particular faulty scenario manifested in our tests:
# The broker restarts and receive LeaderAndIsr from the controller
# LeaderAndIsr message however does not contain all the partitions (probably 
because other brokers were churning at the same time)
# becomeLeaderOrFollower calls getOrCreatePartition and updates the 
allPartitions with the partitions included in the LeaderAndIsr message {code}
  def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
    var partition = allPartitions.get((topic, partitionId))
    if (partition == null) {
      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, 
partitionId, time, this))
{code}
# replication-offset-checkpoint jumps in taking a snapshot of (the partial) 
allReplicas' high watermark into replication-offset-checkpoint file {code}  def 
checkpointHighWatermarks() {
    val replicas = 
allPartitions.values.map(_.getReplica(config.brokerId)).collect{case 
Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
# Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read 
the (now partial) file through Partition::getOrCreateReplica {code}
          val checkpoint = 
replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
          val offsetMap = checkpoint.read
          if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
            info("No checkpointed highwatermark is found for partition 
[%s,%d]".format(topic, partitionId))
{code}

We are not entirely sure whether the initial LeaderAndIsr message including a 
subset of partitions is critical in making this race condition manifest or not. 
But it is an important detail since it clarifies that a solution based on not 
letting the highwatermark-checkpoint thread jumping in the middle of processing 
a LeaderAndIsr message would not suffice.

The solution we are thinking of is to force initializing allPartitions by the 
partitions listed in the replication-offset-checkpoint (and perhaps 
recovery-point-offset-checkpoint file too) when a server starts.

Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to