denis-chudov commented on code in PR #5244: URL: https://github.com/apache/ignite-3/pull/5244#discussion_r1961718535
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java: ########## @@ -338,7 +339,7 @@ private static CompletableFuture<Integer> partitionUpdate( Iif invokeClosure = prepareMsInvokeClosure( partId, longToBytesKeepingOrder(revision), - Assignments.forced(Set.of(nextAssignment), assignmentsTimestamp).toBytes(), + new AssignmentsQueue(Assignments.forced(Set.of(nextAssignment), assignmentsTimestamp)).toBytes(), Review Comment: This is not going to remain like that in the final code after epic is done. We have discussed that you will need some method/class calculating the pending queue from current stable and target assignments. What do you think about adding the naive version of this class right now (adding just one element to the queue), to not touch this and similar places in code again when doing your future tickets? ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java: ########## @@ -200,7 +201,14 @@ public static CompletableFuture<Void> updatePendingAssignmentsKeys( boolean isNewAssignments = !tableCfgPartAssignments.equals(partAssignments); - byte[] partAssignmentsBytes = Assignments.toBytes(partAssignments, assignmentsTimestamp); + Assignments partAssignmentsPlanned = Assignments.of(partAssignments, assignmentsTimestamp); Review Comment: why "planned"? And let's introduce new name for assignments that will be last in the pending queue, for example "target assignments" or something like that, to compare them with stable like it is done below ########## modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partitiondistribution; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Objects; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.versioned.VersionedSerialization; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Class that encapsulates a queue with consequent configuration switches between planned and stable configuration + * e.g. promoting a learner to a peer or demoting a peer to a learner in following steps: + * <ul> + * <li> node removed from configuration + * <li> node added to configuration with different type (peer or learner) + * </ul> + */ +public class AssignmentsQueue implements Iterable<Assignments> { + + @IgniteToStringInclude + private final Deque<Assignments> queue; + + /** Constructor. */ + public AssignmentsQueue(Assignments... assignments) { + this(Arrays.asList(assignments)); + } + + /** Constructor. */ + public AssignmentsQueue(Collection<Assignments> assignments) { + this.queue = new LinkedList<>(assignments); + } + + /** + * Retrieves and removes the head of this queue, or returns {@code Assignments.EMPTY} if this queue is empty. + * + * @return the head of this queue, or {@code Assignments.EMPTY} if this queue is empty + */ + public Assignments poll() { + return Objects.requireNonNullElse(queue.poll(), Assignments.EMPTY); Review Comment: I am not sure returning EMPTY is the best way to deal with an empty queue. The empty queue seems to be an unexpected case, so we can place an assertion here ########## modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partitiondistribution; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Objects; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.versioned.VersionedSerialization; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Class that encapsulates a queue with consequent configuration switches between planned and stable configuration Review Comment: ```suggestion * Class that encapsulates a queue with consequent configuration switches between pending and stable configuration ``` ########## modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/AssignmentsQueue.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partitiondistribution; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Objects; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.versioned.VersionedSerialization; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Class that encapsulates a queue with consequent configuration switches between planned and stable configuration + * e.g. promoting a learner to a peer or demoting a peer to a learner in following steps: + * <ul> + * <li> node removed from configuration + * <li> node added to configuration with different type (peer or learner) Review Comment: pls add indentation ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceMinimumRequiredTimeProviderImpl.java: ########## @@ -250,10 +252,18 @@ private Map<Integer, Integer> tableIdToZoneIdMap(int earliestCatalogVersion, int return tableIdToZoneIdMap; } + private Map<Integer, Map<Integer, Assignments>> readPendingAssignments(long appliedRevision) { + return readAssignments(PENDING_ASSIGNMENTS_PREFIX_BYTES, appliedRevision, bytes -> AssignmentsQueue.fromBytes(bytes).poll()); + } + + private Map<Integer, Map<Integer, Assignments>> readAssignments(byte[] prefix, long appliedRevision) { + return readAssignments(prefix, appliedRevision, Assignments::fromBytes); + } + /** * Reads assignments from the metastorage locally. The resulting map is a {@code tableId -> {partitionId -> assignments}} mapping. */ - Map<Integer, Map<Integer, Assignments>> readAssignments(byte[] prefix, long appliedRevision) { + Map<Integer, Map<Integer, Assignments>> readAssignments(byte[] prefix, long appliedRevision, Function<byte[], Assignments> valDeser) { Review Comment: same about contraction :) I suggest `deserializer` ########## modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java: ########## @@ -200,7 +201,14 @@ public static CompletableFuture<Void> updatePendingAssignmentsKeys( boolean isNewAssignments = !tableCfgPartAssignments.equals(partAssignments); - byte[] partAssignmentsBytes = Assignments.toBytes(partAssignments, assignmentsTimestamp); + Assignments partAssignmentsPlanned = Assignments.of(partAssignments, assignmentsTimestamp); + AssignmentsQueue partAssignmentsPending = new AssignmentsQueue(partAssignmentsPlanned); + + assert partAssignmentsPlanned.equals(partAssignmentsPending.peekLast()); + + byte[] partAssignmentsPlannedBytes = partAssignmentsPlanned.toBytes(); + byte[] partAssignmentsBytes = partAssignmentsPending.toBytes(); Review Comment: I would also go to RebalanceUtil#PENDING_ASSIGNMENTS_PREFIX and RebalanceUtil#pendingPartAssignmentsKey and rename them and change their javadocs, to make them different from stable and planned assignment keys and prefixes and denote that it is not a set of assignments, but queue of sets. Also, their is a link to .md file which also tells about pending assignments, it also needs to be fixed. Same for zone keys. Also, please check the comments in RebalanceUtil/RebalanceUtilEx/ZoneRebalanceUtil/RebalanceRaftGroupEventsListener, etc. which describe the meta storage operations in pseudocode, for example, things like this may be confusing: `if partition.assignments.pending != calcPartAssignments` Logging also became invalid, for example, in RebalanceUtil#updatePendingAssignmentsKeys: ``` case PENDING_KEY_UPDATED: LOG.info( "Update metastore pending partitions key [key={}, partition={}, table={}/{}, newVal={}]", partAssignmentsPendingKey.toString(), partNum, tableDescriptor.id(), tableDescriptor.name(), partAssignments); ``` partAssignments is not a pending queue. ########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java: ########## @@ -183,7 +193,8 @@ private WatchListener createPendingAssignmentsListener() { private static void handleReceivedAssignments( WatchEvent event, byte[] assignmentsMetastoreKeyPrefix, - Map<ReplicationGroupId, TokenizedAssignments> groupIdToAssignmentsMap + Map<ReplicationGroupId, TokenizedAssignments> groupIdToAssignmentsMap, + Function<byte[], Set<Assignment>> deserFunction Review Comment: up to you, but I would prefer `deserializationFunction` or `deserializer`, because we dont usually use such contractions and there is no javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org