sashapolo commented on code in PR #5231: URL: https://github.com/apache/ignite-3/pull/5231#discussion_r1955904885
########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java: ########## @@ -204,6 +204,9 @@ private void startNodes( node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), "cluster"); placementDriver.setPrimary(node0.clusterService.topologyService().localMember()); + // The exception for default zone: the zone initially will hadnle pending assignments equals to 0th node and while it's a primary Review Comment: ```suggestion // An exception for the default zone: the zone initially will handle pending assignments equal to 0th node and while it's a primary ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId repli } } + private void executeIfLocalNodeIsPrimaryForGroup( + ReplicationGroupId groupId, + Consumer<ReplicaMeta> toExecute + ) { + CompletableFuture<ReplicaMeta> primaryReplicaFuture = getPrimaryReplica(groupId); + + isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> { + if (isPrimary) { + primaryReplicaFuture.thenAccept(toExecute); + } + }); + } + + private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId replicationGroupId) { + return isLocalNodeIsPrimary(getPrimaryReplica(replicationGroupId)); + } + + private CompletableFuture<Boolean> isLocalNodeIsPrimary(CompletableFuture<ReplicaMeta> primaryReplicaFuture) { + return primaryReplicaFuture.thenApply(primaryReplicaMeta -> primaryReplicaMeta != null + && primaryReplicaMeta.getLeaseholder() != null + && primaryReplicaMeta.getLeaseholder().equals(localNode().name()) + ); + } + + private void sendChangePeersAndLearnersRequest( + ReplicaMeta replicaMeta, + ZonePartitionId replicationGroupId, + Assignments pendingAssignments, + long currentRevision + ) { + metaStorageMgr.get(pendingPartAssignmentsKey(replicationGroupId)).thenAccept(latestPendingAssignmentsEntry -> { + // Do not change peers of the raft group if this is a stale event. + // Note that we start raft node before for the sake of the consistency in a Review Comment: > // Note that we start raft node before for the sake of the consistency in a // starting and stopping raft nodes. What does this mean? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -122,9 +271,75 @@ public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID send } } + private CompletableFuture<Void> processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest request) { Review Comment: Didn't we agree on extracting this handling logic into separate classes? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -122,9 +271,75 @@ public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID send } } + private CompletableFuture<Void> processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest request) { + ZonePartitionId replicaGrpId = (ZonePartitionId) request.groupId().asReplicationGroupId(); + + RaftGroupService raftClient = raftCommandRunner instanceof RaftGroupService + ? (RaftGroupService) raftCommandRunner + : ((RaftGroupService) ((ExecutorInclinedRaftCommandRunner) raftCommandRunner).decoratedCommandRunner()); + + return raftClient.refreshAndGetLeaderWithTerm() + .exceptionally(throwable -> { + throwable = unwrapCause(throwable); + + if (throwable instanceof TimeoutException) { + LOG.info( + "Node couldn't get the leader within timeout so the changing peers is skipped [grp={}].", + replicaGrpId + ); + + return LeaderWithTerm.NO_LEADER; + } + + throw new IgniteInternalException( + INTERNAL_ERR, + "Failed to get a leader for the RAFT replication group [get=" + replicaGrpId + "].", + throwable + ); + }) + .thenCompose(leaderWithTerm -> { + if (leaderWithTerm.isEmpty() || !isTokenStillValidPrimary(request.enlistmentConsistencyToken())) { + return nullCompletedFuture(); + } + + // run update of raft configuration if this node is a leader + LOG.debug("Current node={} is the leader of partition raft group={}. " + + "Initiate rebalance process for zone={}, partition={}", + leaderWithTerm.leader(), + replicaGrpId, + replicaGrpId.zoneId(), + replicaGrpId.partitionId() + ); + + return raftClient.changePeersAndLearnersAsync(peersConfigurationFromMessage(request), leaderWithTerm.term()); + }); + } + + private boolean isTokenStillValidPrimary(long suspectedEnlistmentConsistencyToken) { + HybridTimestamp currentTime = clockService.current(); + + ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(zoneReplicationGroupId, currentTime); + + return meta != null + && isLocalPeer(meta.getLeaseholderId()) + && clockService.before(currentTime, meta.getExpirationTime()) + && suspectedEnlistmentConsistencyToken == meta.getStartTime().longValue(); + } + + private boolean isLocalPeer(UUID nodeId) { + return localNode.id().equals(nodeId); + } + + Review Comment: ```suggestion ``` ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java: ########## @@ -46,13 +48,39 @@ public class TestPlacementDriver extends AbstractEventProducer<PrimaryReplicaEve private volatile ReplicaMeta primary; + private volatile ReplicaMeta defaultZonePrimaryReplica; + /** * Set the primary replica. * * @param node Primary replica node. + * @param leaseStartTime Time when the node is considered elected. */ public void setPrimary(ClusterNode node, HybridTimestamp leaseStartTime) { - primary = new ReplicaMeta() { + primary = createReplicaMeta(node, leaseStartTime); + } + + /** + * Set the primary replica. + * + * @param node Primary replica node. + */ + public void setPrimary(ClusterNode node) { + Loggers.forClass(TestPlacementDriver.class).info("Test Primary is {}", node); Review Comment: Please extract this into a field ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -969,7 +989,10 @@ private CompletableFuture<Void> stopAndDestroyPartitionAndUpdateClients( Set<Assignment> stableAssignments, Assignments pendingAssignments, boolean isRecovery, - long revision + long revision, + // TODO: Remove after https://issues.apache.org/jira/browse/IGNITE-24374 + // It's a workaround because with TestPlacementDriver we couldn't reserve primary replicas that's we we shouldn't stop them now. Review Comment: ```suggestion // It's a workaround because with TestPlacementDriver we can't reserve primary replicas so we shouldn't stop them now. ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId repli } } + private void executeIfLocalNodeIsPrimaryForGroup( + ReplicationGroupId groupId, + Consumer<ReplicaMeta> toExecute + ) { + CompletableFuture<ReplicaMeta> primaryReplicaFuture = getPrimaryReplica(groupId); + + isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> { + if (isPrimary) { + primaryReplicaFuture.thenAccept(toExecute); + } + }); + } + + private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId replicationGroupId) { + return isLocalNodeIsPrimary(getPrimaryReplica(replicationGroupId)); + } + + private CompletableFuture<Boolean> isLocalNodeIsPrimary(CompletableFuture<ReplicaMeta> primaryReplicaFuture) { Review Comment: I would propose to change this method into a predicate, it will make `executeIfLocalNodeIsPrimaryForGroup` less awkward, for example. ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId repli } } + private void executeIfLocalNodeIsPrimaryForGroup( + ReplicationGroupId groupId, + Consumer<ReplicaMeta> toExecute + ) { + CompletableFuture<ReplicaMeta> primaryReplicaFuture = getPrimaryReplica(groupId); + + isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> { Review Comment: You don't have any exception handling here, you will simply swallow them if any of the futures fail for some reason ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId repli } } + private void executeIfLocalNodeIsPrimaryForGroup( + ReplicationGroupId groupId, + Consumer<ReplicaMeta> toExecute + ) { + CompletableFuture<ReplicaMeta> primaryReplicaFuture = getPrimaryReplica(groupId); + + isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> { + if (isPrimary) { + primaryReplicaFuture.thenAccept(toExecute); + } + }); + } + + private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId replicationGroupId) { + return isLocalNodeIsPrimary(getPrimaryReplica(replicationGroupId)); + } + + private CompletableFuture<Boolean> isLocalNodeIsPrimary(CompletableFuture<ReplicaMeta> primaryReplicaFuture) { + return primaryReplicaFuture.thenApply(primaryReplicaMeta -> primaryReplicaMeta != null + && primaryReplicaMeta.getLeaseholder() != null + && primaryReplicaMeta.getLeaseholder().equals(localNode().name()) + ); + } + + private void sendChangePeersAndLearnersRequest( + ReplicaMeta replicaMeta, + ZonePartitionId replicationGroupId, + Assignments pendingAssignments, + long currentRevision + ) { + metaStorageMgr.get(pendingPartAssignmentsKey(replicationGroupId)).thenAccept(latestPendingAssignmentsEntry -> { + // Do not change peers of the raft group if this is a stale event. + // Note that we start raft node before for the sake of the consistency in a + // starting and stopping raft nodes. + if (currentRevision < latestPendingAssignmentsEntry.revision()) { + return; + } + + ZonePartitionIdMessage zonePartitionIdMessage = ReplicaMessageUtils + .toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId); + + ChangePeersAndLearnersAsyncReplicaRequest request = PARTITION_REPLICATION_MESSAGES_FACTORY + .changePeersAndLearnersAsyncReplicaRequest() + .groupId(zonePartitionIdMessage) + .pendingAssignments(pendingAssignments.toBytes()) + .enlistmentConsistencyToken(replicaMeta.getStartTime().longValue()) + .build(); + + replicaSvc.invoke(localNode(), request); Review Comment: We are missing exception handling logic again ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java: ########## @@ -369,7 +371,7 @@ void testReplicationOnAllNodes(boolean useExplicitTx) throws Exception { * Tests that inserted data is replicated to a newly joined replica node. */ @ParameterizedTest(name = "truncateRaftLog={0}") - @ValueSource(booleans = {false, true}) Review Comment: Why did you do this? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java: ########## @@ -1257,6 +1219,58 @@ private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId repli } } + private void executeIfLocalNodeIsPrimaryForGroup( + ReplicationGroupId groupId, + Consumer<ReplicaMeta> toExecute + ) { + CompletableFuture<ReplicaMeta> primaryReplicaFuture = getPrimaryReplica(groupId); + + isLocalNodeIsPrimary(primaryReplicaFuture).thenAccept(isPrimary -> { + if (isPrimary) { + primaryReplicaFuture.thenAccept(toExecute); + } + }); + } + + private CompletableFuture<Boolean> isLocalNodeIsPrimary(ReplicationGroupId replicationGroupId) { + return isLocalNodeIsPrimary(getPrimaryReplica(replicationGroupId)); + } + + private CompletableFuture<Boolean> isLocalNodeIsPrimary(CompletableFuture<ReplicaMeta> primaryReplicaFuture) { + return primaryReplicaFuture.thenApply(primaryReplicaMeta -> primaryReplicaMeta != null + && primaryReplicaMeta.getLeaseholder() != null + && primaryReplicaMeta.getLeaseholder().equals(localNode().name()) + ); + } + + private void sendChangePeersAndLearnersRequest( + ReplicaMeta replicaMeta, + ZonePartitionId replicationGroupId, + Assignments pendingAssignments, + long currentRevision + ) { + metaStorageMgr.get(pendingPartAssignmentsKey(replicationGroupId)).thenAccept(latestPendingAssignmentsEntry -> { + // Do not change peers of the raft group if this is a stale event. + // Note that we start raft node before for the sake of the consistency in a + // starting and stopping raft nodes. + if (currentRevision < latestPendingAssignmentsEntry.revision()) { + return; + } + + ZonePartitionIdMessage zonePartitionIdMessage = ReplicaMessageUtils + .toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, replicationGroupId); Review Comment: I think it would make sense to move `REPLICA_MESSAGES_FACTORY` into `ReplicaMessageUtils` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -82,23 +130,124 @@ public ZonePartitionReplicaListener( schemaSyncService, catalogService, raftClient, - replicationGroupId + zoneReplicationGroupId ); } @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { + return ensureReplicaIsPrimary(request) + .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) + .thenApply(res -> { + if (res instanceof ReplicaResult) { + return (ReplicaResult) res; + } else { + return new ReplicaResult(res, null); + } + }); + } + + /** + * Ensure that the primary replica was not changed. + * + * @param request Replica request. + * @return Future with {@link IgniteBiTuple} containing {@code boolean} (whether the replica is primary) and the start time of current + * lease. The boolean is not {@code null} only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The + * lease start time is not {@code null} in case of {@link PrimaryReplicaRequest}. + */ + private CompletableFuture<IgniteBiTuple<Boolean, Long>> ensureReplicaIsPrimary(ReplicaRequest request) { Review Comment: Please introduce a separate class instead of this `IgniteBiTuple`. You literally spend 3 sentences in the javadoc describing what this tuple means) ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java: ########## @@ -384,6 +384,7 @@ private void startComponents() throws Exception { new PartitionReplicaLifecycleManager( catalogManager, replicaMgr, + null, // pass null there as TableManager Review Comment: This sentence looks incomplete ########## modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java: ########## @@ -204,6 +204,9 @@ private void startNodes( node0.cmgManager.initCluster(List.of(node0.name), List.of(node0.name), "cluster"); placementDriver.setPrimary(node0.clusterService.topologyService().localMember()); + // The exception for default zone: the zone initially will hadnle pending assignments equals to 0th node and while it's a primary + // colocated node, then primary-related requests will work correctly. Review Comment: I don't understand what you are trying to say here =( ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -122,9 +271,75 @@ public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID send } } + private CompletableFuture<Void> processChangePeersAndLearnersReplicaRequest(ChangePeersAndLearnersAsyncReplicaRequest request) { + ZonePartitionId replicaGrpId = (ZonePartitionId) request.groupId().asReplicationGroupId(); + + RaftGroupService raftClient = raftCommandRunner instanceof RaftGroupService Review Comment: I understand that this is not your fault as you probably copied this code from somewhere, but it's very funny how bad we are at OOP =) ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -53,26 +76,51 @@ public class ZonePartitionReplicaListener implements ReplicaListener { // TODO: https://issues.apache.org/jira/browse/IGNITE-22624 await for the table replica listener if needed. private final Map<TablePartitionId, ReplicaListener> replicas = new ConcurrentHashMap<>(); - private final RaftCommandRunner raftClient; + private final PlacementDriver placementDriver; + + /** Instance of the local node. */ + private final ClusterNode localNode; + + /** Clock service. */ + private final ClockService clockService; + + private final RaftCommandRunner raftCommandRunner; + + private final ZonePartitionId zoneReplicationGroupId; private final TxFinishReplicaRequestHandler txFinishReplicaRequestHandler; /** * The constructor. * + * @param txStatePartitionStorage Storage for transactions' states. Review Comment: I would prefer to remove all these `@param` tags instead. They serve no purpose and are very cumbersome to support in actual state. ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -82,23 +130,124 @@ public ZonePartitionReplicaListener( schemaSyncService, catalogService, raftClient, - replicationGroupId + zoneReplicationGroupId ); } @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { + return ensureReplicaIsPrimary(request) + .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) + .thenApply(res -> { + if (res instanceof ReplicaResult) { + return (ReplicaResult) res; + } else { + return new ReplicaResult(res, null); + } + }); + } + + /** + * Ensure that the primary replica was not changed. + * + * @param request Replica request. + * @return Future with {@link IgniteBiTuple} containing {@code boolean} (whether the replica is primary) and the start time of current + * lease. The boolean is not {@code null} only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The + * lease start time is not {@code null} in case of {@link PrimaryReplicaRequest}. + */ + private CompletableFuture<IgniteBiTuple<Boolean, Long>> ensureReplicaIsPrimary(ReplicaRequest request) { + HybridTimestamp current = clockService.current(); + + if (request instanceof PrimaryReplicaRequest) { + Long enlistmentConsistencyToken = ((PrimaryReplicaRequest) request).enlistmentConsistencyToken(); + + Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo = primaryReplicaMeta -> { + if (primaryReplicaMeta == null) { + throw new PrimaryReplicaMissException( + localNode.name(), + null, + localNode.id(), + null, + enlistmentConsistencyToken, + null, + null + ); + } + + long currentEnlistmentConsistencyToken = primaryReplicaMeta.getStartTime().longValue(); + + if (enlistmentConsistencyToken != currentEnlistmentConsistencyToken + || clockService.before(primaryReplicaMeta.getExpirationTime(), current) + || !isLocalPeer(primaryReplicaMeta.getLeaseholderId()) + ) { + throw new PrimaryReplicaMissException( + localNode.name(), + primaryReplicaMeta.getLeaseholder(), + localNode.id(), + primaryReplicaMeta.getLeaseholderId(), + enlistmentConsistencyToken, + currentEnlistmentConsistencyToken, + null); + } + + return new IgniteBiTuple<>(null, primaryReplicaMeta.getStartTime().longValue()); + }; + + ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(zoneReplicationGroupId, current); + + if (meta != null) { + try { + return completedFuture(validateClo.apply(meta)); + } catch (Exception e) { + return failedFuture(e); + } + } + + return placementDriver.getPrimaryReplica(zoneReplicationGroupId, current).thenApply(validateClo); + } else if (request instanceof ReadOnlyReplicaRequest) { + return isLocalNodePrimaryReplicaAt(current); + } else if (request instanceof ReplicaSafeTimeSyncRequest) { + return isLocalNodePrimaryReplicaAt(current); + } else { + return completedFuture(new IgniteBiTuple<>(null, null)); + } + } + Review Comment: ```suggestion ``` ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -82,23 +130,124 @@ public ZonePartitionReplicaListener( schemaSyncService, catalogService, raftClient, - replicationGroupId + zoneReplicationGroupId ); } @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { + return ensureReplicaIsPrimary(request) + .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) + .thenApply(res -> { + if (res instanceof ReplicaResult) { + return (ReplicaResult) res; + } else { + return new ReplicaResult(res, null); + } + }); + } + + /** + * Ensure that the primary replica was not changed. + * + * @param request Replica request. + * @return Future with {@link IgniteBiTuple} containing {@code boolean} (whether the replica is primary) and the start time of current + * lease. The boolean is not {@code null} only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The + * lease start time is not {@code null} in case of {@link PrimaryReplicaRequest}. + */ + private CompletableFuture<IgniteBiTuple<Boolean, Long>> ensureReplicaIsPrimary(ReplicaRequest request) { + HybridTimestamp current = clockService.current(); + + if (request instanceof PrimaryReplicaRequest) { + Long enlistmentConsistencyToken = ((PrimaryReplicaRequest) request).enlistmentConsistencyToken(); + + Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo = primaryReplicaMeta -> { Review Comment: Can this be a separate method? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -82,23 +130,124 @@ public ZonePartitionReplicaListener( schemaSyncService, catalogService, raftClient, - replicationGroupId + zoneReplicationGroupId ); } @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { + return ensureReplicaIsPrimary(request) + .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) + .thenApply(res -> { + if (res instanceof ReplicaResult) { + return (ReplicaResult) res; + } else { + return new ReplicaResult(res, null); + } + }); + } + + /** + * Ensure that the primary replica was not changed. + * + * @param request Replica request. + * @return Future with {@link IgniteBiTuple} containing {@code boolean} (whether the replica is primary) and the start time of current + * lease. The boolean is not {@code null} only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The + * lease start time is not {@code null} in case of {@link PrimaryReplicaRequest}. + */ + private CompletableFuture<IgniteBiTuple<Boolean, Long>> ensureReplicaIsPrimary(ReplicaRequest request) { + HybridTimestamp current = clockService.current(); + + if (request instanceof PrimaryReplicaRequest) { + Long enlistmentConsistencyToken = ((PrimaryReplicaRequest) request).enlistmentConsistencyToken(); + + Function<ReplicaMeta, IgniteBiTuple<Boolean, Long>> validateClo = primaryReplicaMeta -> { + if (primaryReplicaMeta == null) { + throw new PrimaryReplicaMissException( + localNode.name(), + null, + localNode.id(), + null, + enlistmentConsistencyToken, + null, + null + ); + } + + long currentEnlistmentConsistencyToken = primaryReplicaMeta.getStartTime().longValue(); + + if (enlistmentConsistencyToken != currentEnlistmentConsistencyToken + || clockService.before(primaryReplicaMeta.getExpirationTime(), current) + || !isLocalPeer(primaryReplicaMeta.getLeaseholderId()) + ) { + throw new PrimaryReplicaMissException( + localNode.name(), + primaryReplicaMeta.getLeaseholder(), + localNode.id(), + primaryReplicaMeta.getLeaseholderId(), + enlistmentConsistencyToken, + currentEnlistmentConsistencyToken, + null); + } + + return new IgniteBiTuple<>(null, primaryReplicaMeta.getStartTime().longValue()); + }; + + ReplicaMeta meta = placementDriver.getCurrentPrimaryReplica(zoneReplicationGroupId, current); + + if (meta != null) { + try { + return completedFuture(validateClo.apply(meta)); + } catch (Exception e) { + return failedFuture(e); + } + } + + return placementDriver.getPrimaryReplica(zoneReplicationGroupId, current).thenApply(validateClo); + } else if (request instanceof ReadOnlyReplicaRequest) { + return isLocalNodePrimaryReplicaAt(current); + } else if (request instanceof ReplicaSafeTimeSyncRequest) { + return isLocalNodePrimaryReplicaAt(current); + } else { + return completedFuture(new IgniteBiTuple<>(null, null)); + } + } + + + private CompletableFuture<IgniteBiTuple<Boolean, Long>> isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) { + return placementDriver.getPrimaryReplica(zoneReplicationGroupId, timestamp) + .thenApply(primaryReplica -> new IgniteBiTuple<>( + primaryReplica != null && isLocalPeer(primaryReplica.getLeaseholderId()), + null + )); + } + + private CompletableFuture<?> processRequest( + ReplicaRequest request, + @Nullable Boolean isPrimary, + UUID senderId, + @Nullable Long leaseStartTime + ) { if (!(request instanceof TableAware)) { // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement ReplicaSafeTimeSyncRequest processing. if (request instanceof TxFinishReplicaRequest) { return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request) .thenApply(res -> new ReplicaResult(res, null)); - } else { - if (request instanceof ReplicaSafeTimeSyncRequest) { - LOG.debug("Non table request is not supported by the zone partition yet " + request); + } else if (request instanceof ChangePeersAndLearnersAsyncReplicaRequest) { + ReplicationGroupId replicationGroupId = request.groupId().asReplicationGroupId(); + + if (replicationGroupId instanceof TablePartitionId) { + replicas.get(replicationGroupId).invoke(request, senderId); + } else if (replicationGroupId instanceof ZonePartitionId) { + return processChangePeersAndLearnersReplicaRequest((ChangePeersAndLearnersAsyncReplicaRequest) request); } else { - LOG.warn("Non table request is not supported by the zone partition yet " + request); + throw new IllegalArgumentException("Requests with replication group type " + + request.groupId().getClass() + " is not supported"); } + } else if (request instanceof ReplicaSafeTimeSyncRequest) { Review Comment: Why do we need a separate clause? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org