sashapolo commented on code in PR #5231:
URL: https://github.com/apache/ignite-3/pull/5231#discussion_r1955904885


##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java:
##########
@@ -204,6 +204,9 @@ private void startNodes(
         node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), 
"cluster");
 
         
placementDriver.setPrimary(node0.clusterService.topologyService().localMember());
+        // The exception for default zone: the zone initially will hadnle 
pending assignments equals to 0th node and while it's a primary

Review Comment:
   ```suggestion
           // An exception for the default zone: the zone initially will handle 
pending assignments equal to 0th node and while it's a primary
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> 
isLocalNodeIsPrimary(ReplicationGroupId repli
         }
     }
 
+    private void executeIfLocalNodeIsPrimaryForGroup(
+            ReplicationGroupId groupId,
+            Consumer<ReplicaMeta> toExecute
+    ) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
getPrimaryReplica(groupId);
+
+        isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> {
+            if (isPrimary) {
+                primaryReplicaFuture.thenAccept(toExecute);
+            }
+        });
+    }
+
+    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId 
replicationGroupId) {
+        return isLocalNodeIsPrimary(getPrimaryReplica(replicationGroupId));
+    }
+
+    private CompletableFuture<Boolean> 
isLocalNodeIsPrimary(CompletableFuture<ReplicaMeta> primaryReplicaFuture) {
+        return primaryReplicaFuture.thenApply(primaryReplicaMeta -> 
primaryReplicaMeta != null
+                && primaryReplicaMeta.getLeaseholder() != null
+                && 
primaryReplicaMeta.getLeaseholder().equals(localNode().name())
+        );
+    }
+
+    private void sendChangePeersAndLearnersRequest(
+            ReplicaMeta replicaMeta,
+            ZonePartitionId replicationGroupId,
+            Assignments pendingAssignments,
+            long currentRevision
+    ) {
+        
metaStorageMgr.get(pendingPartAssignmentsKey(replicationGroupId)).thenAccept(latestPendingAssignmentsEntry
 -> {
+            // Do not change peers of the raft group if this is a stale event.
+            // Note that we start raft node before for the sake of the 
consistency in a

Review Comment:
   > // Note that we start raft node before for the sake of the consistency in a
               // starting and stopping raft nodes.
   
   What does this mean?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -122,9 +271,75 @@ public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, UUID send
         }
     }
 
+    private CompletableFuture<Void> 
processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest
 request) {

Review Comment:
   Didn't we agree on extracting this handling logic into separate classes?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -122,9 +271,75 @@ public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, UUID send
         }
     }
 
+    private CompletableFuture<Void> 
processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest
 request) {
+        ZonePartitionId replicaGrpId = (ZonePartitionId) 
request.groupId().asReplicationGroupId();
+
+        RaftGroupService raftClient = raftCommandRunner instanceof 
RaftGroupService
+                ? (RaftGroupService) raftCommandRunner
+                : ((RaftGroupService) ((ExecutorInclinedRaftCommandRunner) 
raftCommandRunner).decoratedCommandRunner());
+
+        return raftClient.refreshAndGetLeaderWithTerm()
+                .exceptionally(throwable -> {
+                    throwable = unwrapCause(throwable);
+
+                    if (throwable instanceof TimeoutException) {
+                        LOG.info(
+                                "Node couldn't get the leader within timeout 
so the changing peers is skipped [grp={}].",
+                                replicaGrpId
+                        );
+
+                        return LeaderWithTerm.NO_LEADER;
+                    }
+
+                    throw new IgniteInternalException(
+                            INTERNAL_ERR,
+                            "Failed to get a leader for the RAFT replication 
group [get=" + replicaGrpId + "].",
+                            throwable
+                    );
+                })
+                .thenCompose(leaderWithTerm -> {
+                    if (leaderWithTerm.isEmpty() || 
!isTokenStillValidPrimary(request.enlistmentConsistencyToken())) {
+                        return nullCompletedFuture();
+                    }
+
+                    // run update of raft configuration if this node is a 
leader
+                    LOG.debug("Current node={} is the leader of partition raft 
group={}. "
+                                    + "Initiate rebalance process for zone={}, 
partition={}",
+                            leaderWithTerm.leader(),
+                            replicaGrpId,
+                            replicaGrpId.zoneId(),
+                            replicaGrpId.partitionId()
+                    );
+
+                    return 
raftClient.changePeersAndLearnersAsync(peersConfigurationFromMessage(request), 
leaderWithTerm.term());
+                });
+    }
+
+    private boolean isTokenStillValidPrimary(long 
suspectedEnlistmentConsistencyToken) {
+        HybridTimestamp currentTime = clockService.current();
+
+        ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(zoneReplicationGroupId, currentTime);
+
+        return meta != null
+                && isLocalPeer(meta.getLeaseholderId())
+                && clockService.before(currentTime, meta.getExpirationTime())
+                && suspectedEnlistmentConsistencyToken == 
meta.getStartTime().longValue();
+    }
+
+    private boolean isLocalPeer(UUID nodeId) {
+        return localNode.id().equals(nodeId);
+    }
+
+

Review Comment:
   ```suggestion
   ```



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java:
##########
@@ -46,13 +48,39 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
 
     private volatile ReplicaMeta primary;
 
+    private volatile ReplicaMeta defaultZonePrimaryReplica;
+
     /**
      * Set the primary replica.
      *
      * @param node Primary replica node.
+     * @param leaseStartTime Time when the node is considered elected.
      */
     public void setPrimary(ClusterNode node, HybridTimestamp leaseStartTime) {
-        primary = new ReplicaMeta() {
+        primary = createReplicaMeta(node, leaseStartTime);
+    }
+
+    /**
+     * Set the primary replica.
+     *
+     * @param node Primary replica node.
+     */
+    public void setPrimary(ClusterNode node) {
+        Loggers.forClass(TestPlacementDriver.class).info("Test Primary is {}", 
node);

Review Comment:
   Please extract this into a field



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -969,7 +989,10 @@ private CompletableFuture<Void> 
stopAndDestroyPartitionAndUpdateClients(
             Set<Assignment> stableAssignments,
             Assignments pendingAssignments,
             boolean isRecovery,
-            long revision
+            long revision,
+            // TODO: Remove after 
https://issues.apache.org/jira/browse/IGNITE-24374
+            // It's a workaround because with TestPlacementDriver we couldn't 
reserve primary replicas that's we we shouldn't stop them now.

Review Comment:
   ```suggestion
               // It's a workaround because with TestPlacementDriver we can't 
reserve primary replicas so we shouldn't stop them now.
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> 
isLocalNodeIsPrimary(ReplicationGroupId repli
         }
     }
 
+    private void executeIfLocalNodeIsPrimaryForGroup(
+            ReplicationGroupId groupId,
+            Consumer<ReplicaMeta> toExecute
+    ) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
getPrimaryReplica(groupId);
+
+        isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> {
+            if (isPrimary) {
+                primaryReplicaFuture.thenAccept(toExecute);
+            }
+        });
+    }
+
+    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId 
replicationGroupId) {
+        return isLocalNodeIsPrimary(getPrimaryReplica(replicationGroupId));
+    }
+
+    private CompletableFuture<Boolean> 
isLocalNodeIsPrimary(CompletableFuture<ReplicaMeta> primaryReplicaFuture) {

Review Comment:
   I would propose to change this method into a predicate, it will make 
`executeIfLocalNodeIsPrimaryForGroup` less awkward, for example.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> 
isLocalNodeIsPrimary(ReplicationGroupId repli
         }
     }
 
+    private void executeIfLocalNodeIsPrimaryForGroup(
+            ReplicationGroupId groupId,
+            Consumer<ReplicaMeta> toExecute
+    ) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
getPrimaryReplica(groupId);
+
+        isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> {

Review Comment:
   You don't have any exception handling here, you will simply swallow them if 
any of the futures fail for some reason



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> 
isLocalNodeIsPrimary(ReplicationGroupId repli
         }
     }
 
+    private void executeIfLocalNodeIsPrimaryForGroup(
+            ReplicationGroupId groupId,
+            Consumer<ReplicaMeta> toExecute
+    ) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
getPrimaryReplica(groupId);
+
+        isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> {
+            if (isPrimary) {
+                primaryReplicaFuture.thenAccept(toExecute);
+            }
+        });
+    }
+
+    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId 
replicationGroupId) {
+        return isLocalNodeIsPrimary(getPrimaryReplica(replicationGroupId));
+    }
+
+    private CompletableFuture<Boolean> 
isLocalNodeIsPrimary(CompletableFuture<ReplicaMeta> primaryReplicaFuture) {
+        return primaryReplicaFuture.thenApply(primaryReplicaMeta -> 
primaryReplicaMeta != null
+                && primaryReplicaMeta.getLeaseholder() != null
+                && 
primaryReplicaMeta.getLeaseholder().equals(localNode().name())
+        );
+    }
+
+    private void sendChangePeersAndLearnersRequest(
+            ReplicaMeta replicaMeta,
+            ZonePartitionId replicationGroupId,
+            Assignments pendingAssignments,
+            long currentRevision
+    ) {
+        
metaStorageMgr.get(pendingPartAssignmentsKey(replicationGroupId)).thenAccept(latestPendingAssignmentsEntry
 -> {
+            // Do not change peers of the raft group if this is a stale event.
+            // Note that we start raft node before for the sake of the 
consistency in a
+            // starting and stopping raft nodes.
+            if (currentRevision < latestPendingAssignmentsEntry.revision()) {
+                return;
+            }
+
+            ZonePartitionIdMessage zonePartitionIdMessage = ReplicaMessageUtils
+                    .toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
replicationGroupId);
+
+            ChangePeersAndLearnersAsyncReplicaRequest request = 
PARTITION_REPLICATION_MESSAGES_FACTORY
+                    .changePeersAndLearnersAsyncReplicaRequest()
+                    .groupId(zonePartitionIdMessage)
+                    .pendingAssignments(pendingAssignments.toBytes())
+                    
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
+                    .build();
+
+            replicaSvc.invoke(localNode(), request);

Review Comment:
   We are missing exception handling logic again



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -369,7 +371,7 @@ void testReplicationOnAllNodes(boolean useExplicitTx) 
throws Exception {
      * Tests that inserted data is replicated to a newly joined replica node.
      */
     @ParameterizedTest(name = "truncateRaftLog={0}")
-    @ValueSource(booleans = {false, true})

Review Comment:
   Why did you do this?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> 
isLocalNodeIsPrimary(ReplicationGroupId repli
         }
     }
 
+    private void executeIfLocalNodeIsPrimaryForGroup(
+            ReplicationGroupId groupId,
+            Consumer<ReplicaMeta> toExecute
+    ) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
getPrimaryReplica(groupId);
+
+        isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> {
+            if (isPrimary) {
+                primaryReplicaFuture.thenAccept(toExecute);
+            }
+        });
+    }
+
+    private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId 
replicationGroupId) {
+        return isLocalNodeIsPrimary(getPrimaryReplica(replicationGroupId));
+    }
+
+    private CompletableFuture<Boolean> 
isLocalNodeIsPrimary(CompletableFuture<ReplicaMeta> primaryReplicaFuture) {
+        return primaryReplicaFuture.thenApply(primaryReplicaMeta -> 
primaryReplicaMeta != null
+                && primaryReplicaMeta.getLeaseholder() != null
+                && 
primaryReplicaMeta.getLeaseholder().equals(localNode().name())
+        );
+    }
+
+    private void sendChangePeersAndLearnersRequest(
+            ReplicaMeta replicaMeta,
+            ZonePartitionId replicationGroupId,
+            Assignments pendingAssignments,
+            long currentRevision
+    ) {
+        
metaStorageMgr.get(pendingPartAssignmentsKey(replicationGroupId)).thenAccept(latestPendingAssignmentsEntry
 -> {
+            // Do not change peers of the raft group if this is a stale event.
+            // Note that we start raft node before for the sake of the 
consistency in a
+            // starting and stopping raft nodes.
+            if (currentRevision < latestPendingAssignmentsEntry.revision()) {
+                return;
+            }
+
+            ZonePartitionIdMessage zonePartitionIdMessage = ReplicaMessageUtils
+                    .toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
replicationGroupId);

Review Comment:
   I think it would make sense to move `REPLICA_MESSAGES_FACTORY` into 
`ReplicaMessageUtils`



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -82,23 +130,124 @@ public ZonePartitionReplicaListener(
                 schemaSyncService,
                 catalogService,
                 raftClient,
-                replicationGroupId
+                zoneReplicationGroupId
         );
     }
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
+        return ensureReplicaIsPrimary(request)
+                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
+                .thenApply(res -> {
+                    if (res instanceof ReplicaResult) {
+                        return (ReplicaResult) res;
+                    } else {
+                        return new ReplicaResult(res, null);
+                    }
+                });
+    }
+
+    /**
+     * Ensure that the primary replica was not changed.
+     *
+     * @param request Replica request.
+     * @return Future with {@link IgniteBiTuple} containing {@code boolean} 
(whether the replica is primary) and the start time of current
+     *     lease. The boolean is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
+     *     lease start time is not {@code null} in case of {@link 
PrimaryReplicaRequest}.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
ensureReplicaIsPrimary(ReplicaRequest request) {

Review Comment:
   Please introduce a separate class instead of this `IgniteBiTuple`. You 
literally spend 3 sentences in the javadoc describing what this tuple means)



##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java:
##########
@@ -384,6 +384,7 @@ private void startComponents() throws Exception {
                 new PartitionReplicaLifecycleManager(
                         catalogManager,
                         replicaMgr,
+                        null, // pass null there as TableManager

Review Comment:
   This sentence looks incomplete



##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java:
##########
@@ -204,6 +204,9 @@ private void startNodes(
         node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), 
"cluster");
 
         
placementDriver.setPrimary(node0.clusterService.topologyService().localMember());
+        // The exception for default zone: the zone initially will hadnle 
pending assignments equals to 0th node and while it's a primary
+        // colocated node, then primary-related requests will work correctly.

Review Comment:
   I don't understand what you are trying to say here =(



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -122,9 +271,75 @@ public CompletableFuture<ReplicaResult> 
invoke(ReplicaRequest request, UUID send
         }
     }
 
+    private CompletableFuture<Void> 
processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest
 request) {
+        ZonePartitionId replicaGrpId = (ZonePartitionId) 
request.groupId().asReplicationGroupId();
+
+        RaftGroupService raftClient = raftCommandRunner instanceof 
RaftGroupService

Review Comment:
   I understand that this is not your fault as you probably copied this code 
from somewhere, but it's very funny how bad we are at OOP =)



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -53,26 +76,51 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
     // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the 
table replica listener if needed.
     private final Map<TablePartitionId, ReplicaListener> replicas = new 
ConcurrentHashMap<>();
 
-    private final RaftCommandRunner raftClient;
+    private final PlacementDriver placementDriver;
+
+    /** Instance of the local node. */
+    private final ClusterNode localNode;
+
+    /** Clock service. */
+    private final ClockService clockService;
+
+    private final RaftCommandRunner raftCommandRunner;
+
+    private final ZonePartitionId zoneReplicationGroupId;
 
     private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler;
 
     /**
      * The constructor.
      *
+     * @param txStatePartitionStorage Storage for transactions' states.

Review Comment:
   I would prefer to remove all these `@param` tags instead. They serve no 
purpose and are very cumbersome to support in actual state.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -82,23 +130,124 @@ public ZonePartitionReplicaListener(
                 schemaSyncService,
                 catalogService,
                 raftClient,
-                replicationGroupId
+                zoneReplicationGroupId
         );
     }
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
+        return ensureReplicaIsPrimary(request)
+                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
+                .thenApply(res -> {
+                    if (res instanceof ReplicaResult) {
+                        return (ReplicaResult) res;
+                    } else {
+                        return new ReplicaResult(res, null);
+                    }
+                });
+    }
+
+    /**
+     * Ensure that the primary replica was not changed.
+     *
+     * @param request Replica request.
+     * @return Future with {@link IgniteBiTuple} containing {@code boolean} 
(whether the replica is primary) and the start time of current
+     *     lease. The boolean is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
+     *     lease start time is not {@code null} in case of {@link 
PrimaryReplicaRequest}.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
ensureReplicaIsPrimary(ReplicaRequest request) {
+        HybridTimestamp current = clockService.current();
+
+        if (request instanceof PrimaryReplicaRequest) {
+            Long enlistmentConsistencyToken = ((PrimaryReplicaRequest) 
request).enlistmentConsistencyToken();
+
+            Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo = 
primaryReplicaMeta -> {
+                if (primaryReplicaMeta == null) {
+                    throw new PrimaryReplicaMissException(
+                            localNode.name(),
+                            null,
+                            localNode.id(),
+                            null,
+                            enlistmentConsistencyToken,
+                            null,
+                            null
+                    );
+                }
+
+                long currentEnlistmentConsistencyToken = 
primaryReplicaMeta.getStartTime().longValue();
+
+                if (enlistmentConsistencyToken != 
currentEnlistmentConsistencyToken
+                        || 
clockService.before(primaryReplicaMeta.getExpirationTime(), current)
+                        || !isLocalPeer(primaryReplicaMeta.getLeaseholderId())
+                ) {
+                    throw new PrimaryReplicaMissException(
+                            localNode.name(),
+                            primaryReplicaMeta.getLeaseholder(),
+                            localNode.id(),
+                            primaryReplicaMeta.getLeaseholderId(),
+                            enlistmentConsistencyToken,
+                            currentEnlistmentConsistencyToken,
+                            null);
+                }
+
+                return new IgniteBiTuple<>(null, 
primaryReplicaMeta.getStartTime().longValue());
+            };
+
+            ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(zoneReplicationGroupId, current);
+
+            if (meta != null) {
+                try {
+                    return completedFuture(validateClo.apply(meta));
+                } catch (Exception e) {
+                    return failedFuture(e);
+                }
+            }
+
+            return placementDriver.getPrimaryReplica(zoneReplicationGroupId, 
current).thenApply(validateClo);
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            return isLocalNodePrimaryReplicaAt(current);
+        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+            return isLocalNodePrimaryReplicaAt(current);
+        } else {
+            return completedFuture(new IgniteBiTuple<>(null, null));
+        }
+    }
+

Review Comment:
   ```suggestion
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -82,23 +130,124 @@ public ZonePartitionReplicaListener(
                 schemaSyncService,
                 catalogService,
                 raftClient,
-                replicationGroupId
+                zoneReplicationGroupId
         );
     }
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
+        return ensureReplicaIsPrimary(request)
+                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
+                .thenApply(res -> {
+                    if (res instanceof ReplicaResult) {
+                        return (ReplicaResult) res;
+                    } else {
+                        return new ReplicaResult(res, null);
+                    }
+                });
+    }
+
+    /**
+     * Ensure that the primary replica was not changed.
+     *
+     * @param request Replica request.
+     * @return Future with {@link IgniteBiTuple} containing {@code boolean} 
(whether the replica is primary) and the start time of current
+     *     lease. The boolean is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
+     *     lease start time is not {@code null} in case of {@link 
PrimaryReplicaRequest}.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
ensureReplicaIsPrimary(ReplicaRequest request) {
+        HybridTimestamp current = clockService.current();
+
+        if (request instanceof PrimaryReplicaRequest) {
+            Long enlistmentConsistencyToken = ((PrimaryReplicaRequest) 
request).enlistmentConsistencyToken();
+
+            Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo = 
primaryReplicaMeta -> {

Review Comment:
   Can this be a separate method?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -82,23 +130,124 @@ public ZonePartitionReplicaListener(
                 schemaSyncService,
                 catalogService,
                 raftClient,
-                replicationGroupId
+                zoneReplicationGroupId
         );
     }
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
+        return ensureReplicaIsPrimary(request)
+                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
+                .thenApply(res -> {
+                    if (res instanceof ReplicaResult) {
+                        return (ReplicaResult) res;
+                    } else {
+                        return new ReplicaResult(res, null);
+                    }
+                });
+    }
+
+    /**
+     * Ensure that the primary replica was not changed.
+     *
+     * @param request Replica request.
+     * @return Future with {@link IgniteBiTuple} containing {@code boolean} 
(whether the replica is primary) and the start time of current
+     *     lease. The boolean is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
+     *     lease start time is not {@code null} in case of {@link 
PrimaryReplicaRequest}.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
ensureReplicaIsPrimary(ReplicaRequest request) {
+        HybridTimestamp current = clockService.current();
+
+        if (request instanceof PrimaryReplicaRequest) {
+            Long enlistmentConsistencyToken = ((PrimaryReplicaRequest) 
request).enlistmentConsistencyToken();
+
+            Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo = 
primaryReplicaMeta -> {
+                if (primaryReplicaMeta == null) {
+                    throw new PrimaryReplicaMissException(
+                            localNode.name(),
+                            null,
+                            localNode.id(),
+                            null,
+                            enlistmentConsistencyToken,
+                            null,
+                            null
+                    );
+                }
+
+                long currentEnlistmentConsistencyToken = 
primaryReplicaMeta.getStartTime().longValue();
+
+                if (enlistmentConsistencyToken != 
currentEnlistmentConsistencyToken
+                        || 
clockService.before(primaryReplicaMeta.getExpirationTime(), current)
+                        || !isLocalPeer(primaryReplicaMeta.getLeaseholderId())
+                ) {
+                    throw new PrimaryReplicaMissException(
+                            localNode.name(),
+                            primaryReplicaMeta.getLeaseholder(),
+                            localNode.id(),
+                            primaryReplicaMeta.getLeaseholderId(),
+                            enlistmentConsistencyToken,
+                            currentEnlistmentConsistencyToken,
+                            null);
+                }
+
+                return new IgniteBiTuple<>(null, 
primaryReplicaMeta.getStartTime().longValue());
+            };
+
+            ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(zoneReplicationGroupId, current);
+
+            if (meta != null) {
+                try {
+                    return completedFuture(validateClo.apply(meta));
+                } catch (Exception e) {
+                    return failedFuture(e);
+                }
+            }
+
+            return placementDriver.getPrimaryReplica(zoneReplicationGroupId, 
current).thenApply(validateClo);
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            return isLocalNodePrimaryReplicaAt(current);
+        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+            return isLocalNodePrimaryReplicaAt(current);
+        } else {
+            return completedFuture(new IgniteBiTuple<>(null, null));
+        }
+    }
+
+
+    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) {
+        return placementDriver.getPrimaryReplica(zoneReplicationGroupId, 
timestamp)
+                .thenApply(primaryReplica -> new IgniteBiTuple<>(
+                        primaryReplica != null && 
isLocalPeer(primaryReplica.getLeaseholderId()),
+                        null
+                ));
+    }
+
+    private CompletableFuture<?> processRequest(
+            ReplicaRequest request,
+            @Nullable Boolean isPrimary,
+            UUID senderId,
+            @Nullable Long leaseStartTime
+    ) {
         if (!(request instanceof TableAware)) {
             // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 
implement ReplicaSafeTimeSyncRequest processing.
             if (request instanceof TxFinishReplicaRequest) {
                 return 
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
                         .thenApply(res -> new ReplicaResult(res, null));
-            } else {
-                if (request instanceof ReplicaSafeTimeSyncRequest) {
-                    LOG.debug("Non table request is not supported by the zone 
partition yet " + request);
+            } else if (request instanceof 
ChangePeersAndLearnersAsyncReplicaRequest) {
+                ReplicationGroupId replicationGroupId = 
request.groupId().asReplicationGroupId();
+
+                if (replicationGroupId instanceof TablePartitionId) {
+                    replicas.get(replicationGroupId).invoke(request, senderId);
+                } else if (replicationGroupId instanceof ZonePartitionId) {
+                    return 
processChangePeersAndLearnersReplicaRequest((ChangePeersAndLearnersAsyncReplicaRequest)
 request);
                 } else {
-                    LOG.warn("Non table request is not supported by the zone 
partition yet " + request);
+                    throw new IllegalArgumentException("Requests with 
replication group type "
+                            + request.groupId().getClass() + " is not 
supported");
                 }
+            } else if (request instanceof ReplicaSafeTimeSyncRequest) {

Review Comment:
   Why do we need a separate clause?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to