[ 
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4485:
----------------------------
    Description: 
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is smaller than logEndOffset of 
leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR 
if the beginOffset of FetchRequest from this follower is equal or larger than 
high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to 
the ISR (e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

In the following we describe the solution to this problem.

Terminology:
- Definition of replica lag: we say a replica lags behind leader by X ms if its 
current log end offset if equivalent to the log end offset of leader X ms ago.
- Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | 
replica belongs to the given partition AND replica's lag <= replicaLagTimeMaxMs}
- Definition of high-watermark of a partition: high-watermark of a partition is 
the max(current high-watermark of the partition, min(offset of replicas in the 
pseudo-ISR set of this partition))
- Definition of ISR set: ISR set of a partition = {replica | replica is in 
pseudo-ISR set of the given partition AND log end offset of replica >= 
high-watermark of the given partition}

Guarantee:
1) If a follower is close enough to the replica in the sense that its replica 
lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. 
Thus the high-watermark will stop to increase until this follower's log end 
offset >= high-watermark, at which moment this follower will be added to the 
ISR set. This allows us the solve the 2nd problem described above.
2) If a follower lags behind leader for more than X ms, it will be removed out 
of ISR set.
3) High watermark of a partition will never decrease.
4) For any replica in ISR set, its log end offset >= high-watermark. 

Implementation:
1) For each follower, the leader keeps track of the time of the last fetch 
request from this follower. Let's call it lastFetchTime. In addition, the 
leader also maintains the log end offset of the leader at the lastFetchTime for 
each follower. Let's call it lastFetchLeaderLEO. Both variables will be updated 
after leader has processed a FetchRequest from a follower.
2) When leader receives FetchRequest from a follower, if begin offset of the 
FetchRequest >= current leader's LEO, follower's lastCatchUpTimeMs will be set 
to current system time. Otherwise, if begin offset of the FetchRequest >= 
lastFetchLeaderLEO, follower's lastCatchUpTimeMs will be set to lastFetchTime. 
Replica's lag = current system time - lastCatchUpTimeMs.
3) The leader can update pseudo-ISR set, high-watermark and ISR set of the 
partition based on the lag of replicas of this partition, according to the 
definition described above.





  was:
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is smaller than logEndOffset of 
leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR 
if the beginOffset of FetchRequest from this follower is equal or larger than 
high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to 
the ISR (e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

In the following we describe the solution to this problem.

Terminology:
- Definition of replica lag: we say a replica lags behind leader by X ms if its 
current log end offset if equivalent to the log end offset of leader X ms ago.
- Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | 
replica belongs to the given partition AND replica's lag <= replicaLagTimeMaxMs}
- Definition of high-watermark of a partition: high-watermark of a partition is 
the max(current high-watermark of the partition, min(offset of replicas in the 
pseudo-ISR set of this partition))
- Definition of ISR set: ISR set of a partition = {replica | replica is in 
pseudo-ISR set of the given partition AND log end offset of replica >= 
high-watermark of the given partition}

Guarantee:
1) If a follower is close enough to the replica in the sense that its replica 
lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. 
Thus the high-watermark will stop to increase until this follower's log end 
offset >= high-watermark, at which moment this follower will be added to the 
ISR set. This allows us the solve the 2nd problem described above.
2) If a follower lags behind leader for more than X ms, it will be removed out 
of ISR set.
3) High watermark of a partition will never decrease.
4) For any replica in ISR set, its log end offset >= high-watermark. 

Implementation:
1) For each follower, the leader keeps track of the time of the last fetch 
request from this follower. Let's call it lastFetchTime. In addition, the 
leader also maintains the log end offset of the leader at the lastFetchTime for 
each follower. Let's call it lastFetchLeaderLEO. Both variables will be updated 
after leader has processed a FetchRequest from a follower.
2) When leader receives FetchRequest from a follower, it will set the 
follower's lag to lastFetchTime if begin offset of the FetchRequest >= 
lastFetchLeaderLEO.
3) The leader can update pseudo-ISR set, high-watermark and ISR set of the 
partition based on the lag of replicas of this partition, according to the 
definition described above.






> Follower should be in the isr if its FetchRequest has fetched up to the 
> logEndOffset of leader
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4485
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4485
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.1.0
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>             Fix For: 0.10.2.0
>
>
> As of current implementation, we will exclude follower from ISR if the begin 
> offset of FetchRequest from this follower is smaller than logEndOffset of 
> leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR 
> if the beginOffset of FetchRequest from this follower is equal or larger than 
> high watermark of this partition.
> This is problematic for the following reasons:
> 1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
> maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to 
> the ISR (e.g. in the scenario described below).
> 2) A follower may be removed from the ISR even if its fetch rate can keep up 
> with produce rate. Suppose a produce keeps producing a lot of small requests 
> at high request rate but low byte rate (e.g. many mirror makers), and the 
> follower is always able to read all the available data at the time leader 
> receives it. However, the begin offset of fetch request will always be 
> smaller than logEndOffset of leader. Thus the follower will be removed from 
> ISR after replicaLagTimeMaxMs.
> In the following we describe the solution to this problem.
> Terminology:
> - Definition of replica lag: we say a replica lags behind leader by X ms if 
> its current log end offset if equivalent to the log end offset of leader X ms 
> ago.
> - Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | 
> replica belongs to the given partition AND replica's lag <= 
> replicaLagTimeMaxMs}
> - Definition of high-watermark of a partition: high-watermark of a partition 
> is the max(current high-watermark of the partition, min(offset of replicas in 
> the pseudo-ISR set of this partition))
> - Definition of ISR set: ISR set of a partition = {replica | replica is in 
> pseudo-ISR set of the given partition AND log end offset of replica >= 
> high-watermark of the given partition}
> Guarantee:
> 1) If a follower is close enough to the replica in the sense that its replica 
> lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. 
> Thus the high-watermark will stop to increase until this follower's log end 
> offset >= high-watermark, at which moment this follower will be added to the 
> ISR set. This allows us the solve the 2nd problem described above.
> 2) If a follower lags behind leader for more than X ms, it will be removed 
> out of ISR set.
> 3) High watermark of a partition will never decrease.
> 4) For any replica in ISR set, its log end offset >= high-watermark. 
> Implementation:
> 1) For each follower, the leader keeps track of the time of the last fetch 
> request from this follower. Let's call it lastFetchTime. In addition, the 
> leader also maintains the log end offset of the leader at the lastFetchTime 
> for each follower. Let's call it lastFetchLeaderLEO. Both variables will be 
> updated after leader has processed a FetchRequest from a follower.
> 2) When leader receives FetchRequest from a follower, if begin offset of the 
> FetchRequest >= current leader's LEO, follower's lastCatchUpTimeMs will be 
> set to current system time. Otherwise, if begin offset of the FetchRequest >= 
> lastFetchLeaderLEO, follower's lastCatchUpTimeMs will be set to 
> lastFetchTime. Replica's lag = current system time - lastCatchUpTimeMs.
> 3) The leader can update pseudo-ISR set, high-watermark and ISR set of the 
> partition based on the lag of replicas of this partition, according to the 
> definition described above.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to