[ https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jun Rao resolved KAFKA-4485. ---------------------------- Resolution: Fixed Issue resolved by pull request 2208 [https://github.com/apache/kafka/pull/2208] > Follower should be in the isr if its FetchRequest has fetched up to the > logEndOffset of leader > ---------------------------------------------------------------------------------------------- > > Key: KAFKA-4485 > URL: https://issues.apache.org/jira/browse/KAFKA-4485 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.1.0 > Reporter: Dong Lin > Assignee: Dong Lin > Fix For: 0.10.2.0 > > > As of current implementation, we will exclude follower from ISR if the begin > offset of FetchRequest from this follower is smaller than logEndOffset of > leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR > if the beginOffset of FetchRequest from this follower is equal or larger than > high watermark of this partition. > This is problematic for the following reasons: > 1) The criteria for ISR is inconsistent between maybeExpandIsr() and > maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to > the ISR (e.g. in the scenario described below). > 2) A follower may be removed from the ISR even if its fetch rate can keep up > with produce rate. Suppose a produce keeps producing a lot of small requests > at high request rate but low byte rate (e.g. many mirror makers), and the > follower is always able to read all the available data at the time leader > receives it. However, the begin offset of fetch request will always be > smaller than logEndOffset of leader. Thus the follower will be removed from > ISR after replicaLagTimeMaxMs. > In the following we describe the solution to this problem. > Terminology: > - Definition of replica lag: we say a replica lags behind leader by X ms if > its current log end offset if equivalent to the log end offset of leader X ms > ago. > - Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | > replica belongs to the given partition AND replica's lag <= > replicaLagTimeMaxMs} > - Definition of high-watermark of a partition: high-watermark of a partition > is the max(current high-watermark of the partition, min(offset of replicas in > the pseudo-ISR set of this partition)) > - Definition of ISR set: ISR set of a partition = {replica | replica is in > pseudo-ISR set of the given partition AND log end offset of replica >= > high-watermark of the given partition} > Guarantee: > 1) If a follower is close enough to the replica in the sense that its replica > lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. > Thus the high-watermark will stop to increase until this follower's log end > offset >= high-watermark, at which moment this follower will be added to the > ISR set. This allows us the solve the 2nd problem described above. > 2) If a follower lags behind leader for more than X ms, it will be removed > out of ISR set. > 3) High watermark of a partition will never decrease. > 4) For any replica in ISR set, its log end offset >= high-watermark. > Implementation: > 1) For each follower, the leader keeps track of the time of the last fetch > request from this follower. Let's call it lastFetchTime. In addition, the > leader also maintains the log end offset of the leader at the lastFetchTime > for each follower. Let's call it lastFetchLeaderLEO. Both variables will be > updated after leader has processed a FetchRequest from a follower. > 2) When leader receives FetchRequest from a follower, if begin offset of the > FetchRequest >= current leader's LEO, follower's lastCatchUpTimeMs will be > set to current system time. Otherwise, if begin offset of the FetchRequest >= > lastFetchLeaderLEO, follower's lastCatchUpTimeMs will be set to > lastFetchTime. Replica's lag = current system time - lastCatchUpTimeMs. > 3) The leader can update pseudo-ISR set, high-watermark and ISR set of the > partition based on the lag of replicas of this partition, according to the > definition described above. -- This message was sent by Atlassian JIRA (v6.3.4#6332)