showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1342676499
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -76,9 +85,37 @@ protected LeaderState(
boolean hasAcknowledgedLeader = voterId == localId;
this.voterStates.put(voterId, new ReplicaState(voterId,
hasAcknowledgedLeader));
}
+ this.majority = voters.size() / 2;
this.grantingVoters = Collections.unmodifiableSet(new
HashSet<>(grantingVoters));
this.log = logContext.logger(LeaderState.class);
this.accumulator = Objects.requireNonNull(accumulator, "accumulator
must be non-null");
+ this.fetchTimeoutMs = fetchTimeoutMs;
+ this.fetchTimer = time.timer(fetchTimeoutMs);
+ }
+
+ public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {
+ fetchTimer.update(currentTimeMs);
+ boolean isExpired = fetchTimer.isExpired();
+ if (isExpired) {
+ log.info("Did not receive fetch request from the majority of the
voters within {}ms. Current fetched voters are {}.",
+ fetchTimeoutMs, fetchedVoters);
+ }
+ return isExpired;
+ }
+
+ public void maybeResetMajorityFollowerFetchTimeout(int id, long
currentTimeMs) {
+ updateFetchedVoters(id);
+ if (fetchedVoters.size() >= majority) {
+ fetchedVoters.clear();
+ fetchTimer.update(currentTimeMs);
+ fetchTimer.reset(fetchTimeoutMs);
+ }
+ }
+
+ private void updateFetchedVoters(int id) {
+ if (isVoter(id)) {
+ fetchedVoters.add(id);
+ }
Review Comment:
> Note that ReplicaState already contains the lastFetchTimestamp.
I'm trying to re-use the `lastFetchTimestamp` in ReplicaState today, but
found it won't work as expected since the default value for it is `-1`, which
means, when a note becomes a leader, all the `lastFetchTimestamp` of follower
nodes are `-1`. Using current `timer` way is more readable IMO.
> The part that is not clear to me is when or how to wake up the leader for
a poll. We need to update KafkaRaftClient::pollLeader so that the replicas'
last fetch time is taken into account when blocking on the messageQueue.poll
Good question. My thought is, we add some buffer to tolerate the operation
time. Like when [checking
shrinkISR](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L373-L375),
we give a 1.5x of the timeout to make things easier, instead of calculating
the accurate timestamp. So, I'm thinking we use `fetchTimeout * 1.5`. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]