Hi
  Here is the doc in the ReplicaFetcherThread.handleOffsetOutOfRange which
maybe the answer:

 /**
   * Handle a partition whose offset is out of range and return a new fetch
offset.
   */
  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
    val replica = replicaMgr.getReplica(topicAndPartition.topic,
topicAndPartition.partition).get

    /**
     * Unclean leader election: A follower goes down, in the meanwhile the
leader keeps appending messages. The follower comes back up
     * and before it has completely caught up with the leader's logs, all
replicas in the ISR go down. The follower is now uncleanly
     * elected as the new leader, and it starts appending messages from the
client. The old leader comes back up, becomes a follower
     * and it may discover that the current leader's end offset is behind
its own end offset.
     *
     * In such a case, truncate the current follower's log to the current
leader's end offset and continue fetching.
     *
     * There is a potential for a mismatch between the logs of the two
replicas here. We don't fix this mismatch as of now.
     */
    val leaderEndOffset =
simpleConsumer.earliestOrLatestOffset(topicAndPartition,
OffsetRequest.LatestTime, brokerConfig.brokerId)
    if (leaderEndOffset < replica.logEndOffset.messageOffset) {
      // Prior to truncating the follower's log, ensure that doing so is
not disallowed by the configuration for unclean leader election.
      // This situation could only happen if the unclean election
configuration for a topic changes while a replica is down. Otherwise,
      // we should never encounter this situation since a non-ISR leader
cannot be elected if disallowed by the broker configuration.
      if (!LogConfig.fromProps(brokerConfig.toProps,
AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
        topicAndPartition.topic)).uncleanLeaderElectionEnable) {
        // Log a fatal error and shutdown the broker to ensure that data
loss does not unexpectedly occur.
        fatal("Halting because log truncation is not allowed for topic
%s,".format(topicAndPartition.topic) +
          " Current leader %d's latest offset %d is less than replica %d's
latest offset %d"
          .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId,
replica.logEndOffset.messageOffset))
        Runtime.getRuntime.halt(1)
      }

      replicaMgr.logManager.truncateTo(Map(topicAndPartition ->
leaderEndOffset))
      warn("Replica %d for partition %s reset its fetch offset from %d to
current leader %d's latest offset %d"
        .format(brokerConfig.brokerId, topicAndPartition,
replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
      leaderEndOffset
    } else {
      /**
       * The follower could have been down for a long time and when it
starts up, its end offset could be smaller than the leader's
       * start offset because the leader has deleted old logs
(log.logEndOffset < leaderStartOffset).
       *
       * Roll out a new log at the follower with the start offset equal to
the current leader's start offset and continue fetching.
       */
      val leaderStartOffset =
simpleConsumer.earliestOrLatestOffset(topicAndPartition,
OffsetRequest.EarliestTime, brokerConfig.brokerId)
      replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition,
leaderStartOffset)
      warn("Replica %d for partition %s reset its fetch offset from %d to
current leader %d's start offset %d"
        .format(brokerConfig.brokerId, topicAndPartition,
replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
      leaderStartOffset
    }
  }

2016-02-25 14:38 GMT+08:00 Avanish Mishra <avanish...@yahoo.com.invalid>:

> We are running 10 node kafka cluster in test setup with replication factor
> of 3 and topics with min.insync.replica as 2.
> Recently i noticed that few nodes halted on restart after multiple node
> failure with FATAL message:
>
> "Halting because log truncation is not allowed for topic 1613_spam,
> Current leader 2003's latest offset 20 is less than replica 2004's latest
> offset 21 (kafka.server.ReplicaFetcherThread)"
> My understanding is that this can happen if there is slow replica in ISR
> which doesn't have latest committed message and high water mark. As
> min.insync.replicas is 2, write will be committed when it complete on
> leader and 1 follower. Since replica.lag.time.max.ms setting is 10000,
> any slow replica can be in ISR for last 10 sec without fetching any
> message. if leader goes down within that interval and slow follower is
> elected as leader, this will result in new leader with offset less than the
> follower.  Is this explanation correct or i am missing something? What is
> the best way to recover committed message in such situation?
>
> We are running cluster with following settings.
> -  replication factor  3-  min.insync.replicas is set to 2.
>  -  request.required.acks -1-  unclean.leader.election.enable is set to
> false-  replica.lag.time.max.ms is 10000-
> replica.high.watermark.checkpoint.interval.ms 1000
>
>
> Thanks
> Avanish

Reply via email to