TaiJuWu commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2546222726
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3356,30 +3366,56 @@ private boolean
shouldSendAddOrRemoveVoterRequest(FollowerState state, long curr
quorumConfig.autoJoin() &&
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
}
+ private boolean shouldSendAddVoterRequest(FollowerState state, long
currentTimeMs) {
+ return canAutoJoin && maybeAutoJoin(state, currentTimeMs);
+ }
+
+ private boolean shouldSendRemoveVoterRequest(FollowerState state, long
currentTimeMs) {
+ final var localReplicaKey = quorum.localReplicaKeyOrThrow();
+ final var voters = partitionState.lastVoterSet();
+
+ if (voters.voterIds().contains(localReplicaKey.id())) {
+ if (maybeAutoJoin(state, currentTimeMs)) {
+ // When the bootstrap controller needs to update directory id,
+ // it should be removed and then rejoining to cluster
+ // In such a case, we should set canAutoJoin to true, and it
will
+ // be removed from the cluster, update directory id and rejoin
+ // to the cluster.
Review Comment:
The disk failure is not only the case, one case is at line 3403.
```
/* The replica's id is in the voter set but the replica is not a voter
because
* the directory id of the voter set entry is different. Remove the old
voter.
* Local replica is not in the voter set because the replica is an observer.
*/
```
##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +219,70 @@ public void testControllersAutoJoinStandaloneVoter()
throws Exception {
}
}
+ @Test
+ public void testRemovedControllerWontJoinAgain() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(), controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
true).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ // Static voter set is initialized
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+ AtomicLong removedAtHighWatermark = new AtomicLong();
+ // Remove 3002 from the voter set
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ if (!voters.containsKey(3002)) {
+ // if there are no node 3002, it should be return
+
removedAtHighWatermark.set(cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong());
+ return;
+ }
+
+ admin.removeRaftVoter(3002, voters.get(3002)).all().get();
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+ TestUtils.waitForCondition(() ->
cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong()
+ > removedAtHighWatermark.get() + 10,
+ 30_000, 100, () -> "High watermark is not advanced in
30000ms"
+ );
Review Comment:
@showuon Your suggestion make sense to me but there is an issue which is the
high watermark is tuned base on different compute.
If I use
```
cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong()
> removedAtHighWatermark.get()
```
The original code is still passed.
##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,82 @@ public void testControllersAutoJoinStandaloneVoter()
throws Exception {
}
}
+ @Test
+ public void testRemovedControllerWontJoinAgain() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ // Configure the initial voters with one voter having a different
directory ID.
+ // This simulates the case where the controller failed and is brought
back up with a different directory ID.
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ final var oldDirectoryId = Uuid.randomUuid();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.id() == TestKitDefaults.CONTROLLER_ID_OFFSET ?
+ oldDirectoryId : controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
true).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ // Static voter set is initialized
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+ // Remove 3002 from voter set
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ if (!voters.containsKey(3002)) {
+ // if there are no node 3002, it should be removed
+ return;
+ }
+
+ admin.removeRaftVoter(3002, voters.get(3002)).all().get();
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+
+ // do not join the voter set in the next twenty seconds
+ for (int i = 0; i < 20; ++i) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, ()
-> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ if (!voters.containsKey(3002)) {
+ // if there are no node 3002, it should be
removed
+ return;
+ }
+
+ admin.removeRaftVoter(3002,
voters.get(3002)).all().get();
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+ });
+ Thread.sleep(1000);
+ }
Review Comment:
> This test doesn't make sense to me.
>
> 1. `do not join the voter set in the next twenty seconds`: The 20 seconds
is not true because the `retryOnExceptionWithTimeout` could take up to 30
seconds one time.
It make sense to me and time-base test is not a good thing as you say bellow.
> 2. Why should we use 2 `retryOnExceptionWithTimeout` here?
My mistake.
> 3. The test is not testing what we expected.
> 3.1. What are you asserting here? I can only see you assert the node
3000, 3001 are in the voters, but we want to test 3002, right? If 3002 is in
the voters, the test can still be passed!?
From my understanding, only the voter keys (3000 and 3001) are in the
cluster is same as 3002 is removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]