sanpwc commented on code in PR #4966: URL: https://github.com/apache/ignite-3/pull/4966#discussion_r1898528666
########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java: ########## @@ -469,6 +489,67 @@ private static void doStableKeySwitch( } } + private static Operation handleAssignmentsChainChange( + ByteArray assignmentsChainKey, + Entry assignmentsChainEntry, + Assignments pendingAssignments, + Assignments stableAssignments + ) { + // We write this key only in HA mode. See TableManager.writeTableAssignmentsToMetastore. + if (assignmentsChainEntry.value() != null) { + AssignmentsChain updatedAssignmentsChain = updateAssignmentsChain( + AssignmentsChain.fromBytes(assignmentsChainEntry.value()), + stableAssignments, + pendingAssignments + ); + return put(assignmentsChainKey, updatedAssignmentsChain.toBytes()); + } else { + return Operations.noop(); + } + } + + private static AssignmentsChain updateAssignmentsChain(AssignmentsChain assignmentsChain, Assignments newStable, + Assignments pendingAssignments) { + + assert assignmentsChain != null : "Assignments chain cannot be null in HA mode."; + + assert !assignmentsChain.chain().isEmpty() : "Assignments chain cannot be empty on stable switch."; + + /* + This method covers the following case: Review Comment: Nice! ########## modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java: ########## @@ -1234,6 +1242,295 @@ void testTwoPhaseResetOnEmptyNodes() throws Exception { assertPlannedAssignments(node0, partId, assignments13); } + @Test + @ZoneParams(nodes = 7, replicas = 3, partitions = 1, consistencyMode = ConsistencyMode.HIGH_AVAILABILITY) + void testAssignmentsChainUpdate() throws Exception { + int partId = 0; + + IgniteImpl node0 = igniteImpl(0); + int catalogVersion = node0.catalogManager().latestCatalogVersion(); + long timestamp = node0.catalogManager().catalog(catalogVersion).time(); + Table table = node0.tables().table(TABLE_NAME); + + awaitPrimaryReplica(node0, partId); + + assertRealAssignments(node0, partId, 0, 2, 3); + + Assignments initialAssignments = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(2).name()), + Assignment.forPeer(node(3).name()) + ), timestamp); + + assertStableAssignments(node0, partId, initialAssignments); + + assertAssignmentsChain(node0, partId, AssignmentsChain.of(initialAssignments)); + + // Write data(1) to all nodes. + List<Throwable> errors = insertValues(table, partId, 0); + assertThat(errors, is(empty())); + + logger().info("Stopping nodes [ids={}].", 3); + + stopNode(3); + + logger().info("Stopped nodes [ids={}].", 3); + + Assignments link2Assignments = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(1).name()), + Assignment.forPeer(node(2).name()) + ), timestamp); + + assertRealAssignments(node0, partId, 0, 1, 2); + + assertStableAssignments(node0, partId, link2Assignments, 30_000); + + // Graceful change should reinit the assignments chain, in other words there should be only one link + // in the chain - the current stable assignments. + assertAssignmentsChain(node0, partId, AssignmentsChain.of(List.of(link2Assignments))); + + // Disable scale down to avoid unwanted rebalance. + executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE)); + + // Disable automatic rebalance since we want to restore replica factor. + setDistributionResetTimeout(node0, INFINITE_TIMER_VALUE); + + // Now stop the majority and the automatic reset should kick in. + logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1, 2})); + + Assignments resetAssignments = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(4).name()), + Assignment.forPeer(node(5).name()) + ), timestamp); + + AtomicBoolean blockedLink = new AtomicBoolean(true); + + // Block stable switch to check that we initially add reset phase 1 assignments to the chain. + blockMessage((nodeName, msg) -> blockedLink.get() && stableKeySwitchMessage(msg, partId, resetAssignments)); + + stopNodesInParallel(1, 2); + + CompletableFuture<?> updateFuture = node0.disasterRecoveryManager().resetAllPartitions( + zoneName, + QUALIFIED_TABLE_NAME, + true, + -1 + ); + + assertThat(updateFuture, willCompleteSuccessfully()); + + Assignments linkFirstPhaseReset = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()) + ), timestamp); + + assertStableAssignments(node0, partId, linkFirstPhaseReset, 60_000); + + // Assignments chain consists of stable and the first phase of reset. + assertAssignmentsChain(node0, partId, AssignmentsChain.of(List.of(link2Assignments, linkFirstPhaseReset))); + + // Unblock stable switch, wait for reset phase 2 assignments to replace phase 1 assignments in the chain. + blockedLink.set(false); + + logger().info("Unblocked stable switch."); + + assertRealAssignments(node0, partId, 0, 4, 5); + + assertStableAssignments(node0, partId, resetAssignments, 60_000); + + // Assignments chain consists of stable and the second phase of reset. + assertAssignmentsChain(node0, partId, AssignmentsChain.of(List.of(link2Assignments, resetAssignments))); + } + + @Test + @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode = ConsistencyMode.HIGH_AVAILABILITY) + void testAssignmentsChainUpdatedOnAutomaticReset() throws Exception { + int partId = 0; + + IgniteImpl node0 = igniteImpl(0); + int catalogVersion = node0.catalogManager().latestCatalogVersion(); + long timestamp = node0.catalogManager().catalog(catalogVersion).time(); + Table table = node0.tables().table(TABLE_NAME); + + awaitPrimaryReplica(node0, partId); + + // Disable scale down to avoid unwanted rebalance. + executeSql(format("ALTER ZONE %s SET data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE)); + + assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6); + + Assignments allAssignments = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(1).name()), + Assignment.forPeer(node(2).name()), + Assignment.forPeer(node(3).name()), + Assignment.forPeer(node(4).name()), + Assignment.forPeer(node(5).name()), + Assignment.forPeer(node(6).name()) + ), timestamp); + + assertStableAssignments(node0, partId, allAssignments); + + // Assignments chain is equal to the stable assignments. + assertAssignmentsChain(node0, partId, AssignmentsChain.of(allAssignments)); + + // Write data(1) to all nodes. + List<Throwable> errors = insertValues(table, partId, 0); + assertThat(errors, is(empty())); + + Assignments link2Assignments = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()), + Assignment.forPeer(node(1).name()), + Assignment.forPeer(node(2).name()) + ), timestamp); + + AtomicBoolean blockedLink2 = new AtomicBoolean(true); + + // Block stable switch to check that we initially add reset phase 1 assignments to the chain. + blockMessage((nodeName, msg) -> blockedLink2.get() && stableKeySwitchMessage(msg, partId, link2Assignments)); + + logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{3, 4, 5, 6})); + + stopNodesInParallel(3, 4, 5, 6); + + Assignments link2FirstPhaseReset = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()) + ), timestamp); + + assertStableAssignments(node0, partId, link2FirstPhaseReset, 60_000); + + // Assignments chain consists of stable and the first phase of reset. + assertAssignmentsChain(node0, partId, AssignmentsChain.of(List.of(allAssignments, link2FirstPhaseReset))); + + // Unblock stable switch, wait for reset phase 2 assignments to replace phase 1 assignments in the chain. + blockedLink2.set(false); + + assertStableAssignments(node0, partId, link2Assignments, 30_000); + + // Assignments chain consists of stable and the second phase of reset. + assertAssignmentsChain(node0, partId, AssignmentsChain.of(List.of(allAssignments, link2Assignments))); + + logger().info("Stopping nodes [ids={}].", Arrays.toString(new int[]{1, 2})); + stopNodesInParallel(1, 2); + + Assignments link3Assignments = Assignments.of(Set.of( + Assignment.forPeer(node(0).name()) + ), timestamp); + + assertStableAssignments(node0, partId, link3Assignments, 30_000); + + assertAssignmentsChain(node0, partId, AssignmentsChain.of(List.of(allAssignments, link2Assignments, link3Assignments))); + } + + @Test + @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode = ConsistencyMode.HIGH_AVAILABILITY) + void testSecondResetRewritesFirst() throws Exception { Review Comment: The name is misleading. second phaze of reset should rewrite first one, on the other hand second reset(full) should append to the first one. ########## modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java: ########## @@ -1542,12 +1851,25 @@ private List<Integer> getRealAssignments(IgniteImpl node0, int partId) { return stable.empty() ? null : Assignments.fromBytes(stable.value()); } + private @Nullable AssignmentsChain getAssignmentsChain(IgniteImpl node, int partId) { + CompletableFuture<Entry> chainFut = node.metaStorageManager() + .get(assignmentsChainKey(new TablePartitionId(tableId, partId))); + + assertThat(chainFut, willCompleteSuccessfully()); + + Entry chain = chainFut.join(); + + return chain.empty() ? null : AssignmentsChain.fromBytes(chain.value()); + } + @Retention(RetentionPolicy.RUNTIME) @interface ZoneParams { int replicas(); int partitions(); int nodes() default INITIAL_NODES; + + ConsistencyMode consistencyMode() default ConsistencyMode.STRONG_CONSISTENCY; } } Review Comment: I'd say that we miss: 1. Verification that graceful change will rewrite chain after force resets. 2. Verification that !HA zone has empty chain. ########## modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChain.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partitiondistribution; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.versioned.VersionedSerialization; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.Nullable; + +/** + * Contains the chain of changed assignments. + */ +public class AssignmentsChain { + Review Comment: Blank line. ########## modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChain.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partitiondistribution; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.versioned.VersionedSerialization; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.Nullable; + +/** + * Contains the chain of changed assignments. + */ +public class AssignmentsChain { + + /** Chain of assignments. */ + @IgniteToStringInclude + private final List<Assignments> chain; Review Comment: I though that you have a wrapper for Assignments that we may use to store configurationIndex in. If not - not a problem, we will add one within https://issues.apache.org/jira/browse/IGNITE-24106. ########## modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsChain.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partitiondistribution; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.versioned.VersionedSerialization; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.Nullable; + +/** + * Contains the chain of changed assignments. + */ +public class AssignmentsChain { + + /** Chain of assignments. */ + @IgniteToStringInclude + private final List<Assignments> chain; + + private AssignmentsChain(List<Assignments> chain) { + this.chain = chain; + } + + public List<Assignments> chain() { + return chain; + } + + /** + * Create a new {@link AssignmentsChain} with the last link in the chain replaced with the provided one. + * + * @param newLast New last link. + * @return new AssignmentsChain. + */ + public AssignmentsChain replaceLast(Assignments newLast) { + assert !chain.isEmpty() : "Assignments chain is empty."; + + List<Assignments> newChain = new ArrayList<>(chain); + + newChain.set(newChain.size() - 1, newLast); + + return new AssignmentsChain(newChain); + } + + /** + * Create a new {@link AssignmentsChain} with a new link added to the chain. + * + * @param newLast New last link. + * @return new AssignmentsChain. + */ + public AssignmentsChain addLast(Assignments newLast) { + assert !chain.isEmpty() : "Assignments chain is empty."; + + List<Assignments> newChain = new ArrayList<>(chain); + + newChain.add(newLast); + + return new AssignmentsChain(newChain); + } + + /** + * Creates a new instance. + * + * @param assignments Partition assignments. + */ + public static AssignmentsChain of(Assignments assignments) { + return new AssignmentsChain(List.of(assignments)); + } + + /** + * Creates a new instance. + * + * @param assignmentsChain Chain of partition assignments. + */ + public static AssignmentsChain of(List<Assignments> assignmentsChain) { + return new AssignmentsChain(assignmentsChain); + } + + public byte[] toBytes() { + return VersionedSerialization.toBytes(this, AssignmentsChainSerializer.INSTANCE); + } + + /** + * Deserializes assignments from the array of bytes. Returns {@code null} if the argument is {@code null}. + */ + @Nullable + @Contract("null -> null; !null -> !null") + public static AssignmentsChain fromBytes(byte @Nullable [] bytes) { + return bytes == null ? null : VersionedSerialization.fromBytes(bytes, AssignmentsChainSerializer.INSTANCE); + } + + @Override + public String toString() { + return S.toString(this); Review Comment: Could you please share an example of the output? ########## modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java: ########## @@ -1234,6 +1242,295 @@ void testTwoPhaseResetOnEmptyNodes() throws Exception { assertPlannedAssignments(node0, partId, assignments13); } + @Test Review Comment: Could you please run given and other tests in the class with @RepeatedTest locally in order to verify that they are stable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org