[ 
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-4485.
----------------------------
    Resolution: Fixed

Issue resolved by pull request 2208
[https://github.com/apache/kafka/pull/2208]

> Follower should be in the isr if its FetchRequest has fetched up to the 
> logEndOffset of leader
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4485
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4485
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.1.0
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>             Fix For: 0.10.2.0
>
>
> As of current implementation, we will exclude follower from ISR if the begin 
> offset of FetchRequest from this follower is smaller than logEndOffset of 
> leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR 
> if the beginOffset of FetchRequest from this follower is equal or larger than 
> high watermark of this partition.
> This is problematic for the following reasons:
> 1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
> maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to 
> the ISR (e.g. in the scenario described below).
> 2) A follower may be removed from the ISR even if its fetch rate can keep up 
> with produce rate. Suppose a produce keeps producing a lot of small requests 
> at high request rate but low byte rate (e.g. many mirror makers), and the 
> follower is always able to read all the available data at the time leader 
> receives it. However, the begin offset of fetch request will always be 
> smaller than logEndOffset of leader. Thus the follower will be removed from 
> ISR after replicaLagTimeMaxMs.
> In the following we describe the solution to this problem.
> Terminology:
> - Definition of replica lag: we say a replica lags behind leader by X ms if 
> its current log end offset if equivalent to the log end offset of leader X ms 
> ago.
> - Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | 
> replica belongs to the given partition AND replica's lag <= 
> replicaLagTimeMaxMs}
> - Definition of high-watermark of a partition: high-watermark of a partition 
> is the max(current high-watermark of the partition, min(offset of replicas in 
> the pseudo-ISR set of this partition))
> - Definition of ISR set: ISR set of a partition = {replica | replica is in 
> pseudo-ISR set of the given partition AND log end offset of replica >= 
> high-watermark of the given partition}
> Guarantee:
> 1) If a follower is close enough to the replica in the sense that its replica 
> lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. 
> Thus the high-watermark will stop to increase until this follower's log end 
> offset >= high-watermark, at which moment this follower will be added to the 
> ISR set. This allows us the solve the 2nd problem described above.
> 2) If a follower lags behind leader for more than X ms, it will be removed 
> out of ISR set.
> 3) High watermark of a partition will never decrease.
> 4) For any replica in ISR set, its log end offset >= high-watermark. 
> Implementation:
> 1) For each follower, the leader keeps track of the time of the last fetch 
> request from this follower. Let's call it lastFetchTime. In addition, the 
> leader also maintains the log end offset of the leader at the lastFetchTime 
> for each follower. Let's call it lastFetchLeaderLEO. Both variables will be 
> updated after leader has processed a FetchRequest from a follower.
> 2) When leader receives FetchRequest from a follower, if begin offset of the 
> FetchRequest >= current leader's LEO, follower's lastCatchUpTimeMs will be 
> set to current system time. Otherwise, if begin offset of the FetchRequest >= 
> lastFetchLeaderLEO, follower's lastCatchUpTimeMs will be set to 
> lastFetchTime. Replica's lag = current system time - lastCatchUpTimeMs.
> 3) The leader can update pseudo-ISR set, high-watermark and ISR set of the 
> partition based on the lag of replicas of this partition, according to the 
> definition described above.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to