[ https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15332684#comment-15332684 ]
Jun Rao commented on KAFKA-3693: -------------------------------- [~maysamyabandeh], that by itself may not be a bad idea. We will have to think through UpdateMetadataRequest as well since currently, we expect there is an UpdateMetadataRequest before the first LeaderAndIsrRequest on broker startup. > Race condition between highwatermark-checkpoint thread and > handleLeaderAndIsrRequest at broker start-up > ------------------------------------------------------------------------------------------------------- > > Key: KAFKA-3693 > URL: https://issues.apache.org/jira/browse/KAFKA-3693 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.9.0.1 > Reporter: Maysam Yabandeh > > Upon broker start-up, a race between highwatermark-checkpoint thread to write > replication-offset-checkpoint file and handleLeaderAndIsrRequest thread > reading from it causes the highwatermark for some partitions to be reset to > 0. In the good case, this results the replica to truncate its entire log to 0 > and hence initiates fetching of terabytes of data from the lead broker, which > sometimes leads to hours of downtime. We observed the bad cases that the > reset offset can propagate to recovery-point-offset-checkpoint file, making a > lead broker to truncate the file. This seems to have the potential to lead to > data loss if the truncation happens at both follower and leader brokers. > This is the particular faulty scenario manifested in our tests: > # The broker restarts and receive LeaderAndIsr from the controller > # LeaderAndIsr message however does not contain all the partitions (probably > because other brokers were churning at the same time) > # becomeLeaderOrFollower calls getOrCreatePartition and updates the > allPartitions with the partitions included in the LeaderAndIsr message {code} > def getOrCreatePartition(topic: String, partitionId: Int): Partition = { > var partition = allPartitions.get((topic, partitionId)) > if (partition == null) { > allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, > partitionId, time, this)) > {code} > # replication-offset-checkpoint jumps in taking a snapshot of (the partial) > allReplicas' high watermark into replication-offset-checkpoint file {code} > def checkpointHighWatermarks() { > val replicas = > allPartitions.values.map(_.getReplica(config.brokerId)).collect{case > Some(replica) => replica}{code} hence rewriting the previous highwatermarks. > # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read > the (now partial) file through Partition::getOrCreateReplica {code} > val checkpoint = > replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) > val offsetMap = checkpoint.read > if (!offsetMap.contains(TopicAndPartition(topic, partitionId))) > info("No checkpointed highwatermark is found for partition > [%s,%d]".format(topic, partitionId)) > {code} > We are not entirely sure whether the initial LeaderAndIsr message including a > subset of partitions is critical in making this race condition manifest or > not. But it is an important detail since it clarifies that a solution based > on not letting the highwatermark-checkpoint thread jumping in the middle of > processing a LeaderAndIsr message would not suffice. > The solution we are thinking of is to force initializing allPartitions by the > partitions listed in the replication-offset-checkpoint (and perhaps > recovery-point-offset-checkpoint file too) when a server starts. > Thoughts? -- This message was sent by Atlassian JIRA (v6.3.4#6332)