Hi Here is the doc in the ReplicaFetcherThread.handleOffsetOutOfRange which maybe the answer:
/** * Handle a partition whose offset is out of range and return a new fetch offset. */ def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = { val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get /** * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower * and it may discover that the current leader's end offset is behind its own end offset. * * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching. * * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. */ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. if (!LogConfig.fromProps(brokerConfig.toProps, AdminUtils.fetchTopicConfig(replicaMgr.zkClient, topicAndPartition.topic)).uncleanLeaderElectionEnable) { // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) + " Current leader %d's latest offset %d is less than replica %d's latest offset %d" .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) Runtime.getRuntime.halt(1) } replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset)) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset)) leaderEndOffset } else { /** * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset). * * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching. */ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) leaderStartOffset } } 2016-02-25 14:38 GMT+08:00 Avanish Mishra <avanish...@yahoo.com.invalid>: > We are running 10 node kafka cluster in test setup with replication factor > of 3 and topics with min.insync.replica as 2. > Recently i noticed that few nodes halted on restart after multiple node > failure with FATAL message: > > "Halting because log truncation is not allowed for topic 1613_spam, > Current leader 2003's latest offset 20 is less than replica 2004's latest > offset 21 (kafka.server.ReplicaFetcherThread)" > My understanding is that this can happen if there is slow replica in ISR > which doesn't have latest committed message and high water mark. As > min.insync.replicas is 2, write will be committed when it complete on > leader and 1 follower. Since replica.lag.time.max.ms setting is 10000, > any slow replica can be in ISR for last 10 sec without fetching any > message. if leader goes down within that interval and slow follower is > elected as leader, this will result in new leader with offset less than the > follower. Is this explanation correct or i am missing something? What is > the best way to recover committed message in such situation? > > We are running cluster with following settings. > - replication factor 3- min.insync.replicas is set to 2. > - request.required.acks -1- unclean.leader.election.enable is set to > false- replica.lag.time.max.ms is 10000- > replica.high.watermark.checkpoint.interval.ms 1000 > > > Thanks > Avanish