jsancio commented on a change in pull request #11109:
URL: https://github.com/apache/kafka/pull/11109#discussion_r675065535
##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -362,24 +363,27 @@ LeaderAndEpoch notifiedLeader() {
}
void handleCommit(MemoryBatchReader<ApiMessageAndVersion> reader) {
- listener.handleCommit(reader);
+ listener.handleCommit(this, reader);
offset = reader.lastOffset().getAsLong();
}
void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
- listener.handleSnapshot(reader);
+ listener.handleSnapshot(this, reader);
offset = reader.lastContainedLogOffset();
}
void handleLeaderChange(long offset, LeaderAndEpoch leader) {
- listener.handleLeaderChange(leader);
+ listener.handleLeaderChange(this, leader);
notifiedLeader = leader;
this.offset = offset;
}
void beginShutdown() {
- listener.beginShutdown();
+ listener.beginShutdown(this);
}
+
+ @Override
+ public void close() {}
Review comment:
Should fix this by appending an event to eventQueue that removes this
listener.
##########
File path: raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
##########
@@ -2491,6 +2492,47 @@ public void testLateRegisteredListenerCatchesUp() throws
Exception {
assertEquals(9L, secondListener.claimedEpochStartOffset(epoch));
}
+ @Test
+ public void testReregistrationChangesListenerContext() throws Exception {
+ int localId = 0;
+ int otherNodeId = 1;
+ int epoch = 5;
+ Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
+
+ List<String> batch1 = Arrays.asList("1", "2", "3");
+ List<String> batch2 = Arrays.asList("4", "5", "6");
+ List<String> batch3 = Arrays.asList("7", "8", "9");
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(1, batch1)
+ .appendToLog(1, batch2)
+ .appendToLog(2, batch3)
+ .withUnknownLeader(epoch - 1)
+ .build();
+
+ context.becomeLeader();
+ context.client.poll();
+ assertEquals(10L, context.log.endOffset().offset);
+
+ // Let the initial listener catch up
+ context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 10L,
epoch, 0));
Review comment:
Use the helper method "advance high-watermark" in a few of these places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]