sashapolo commented on code in PR #5352:
URL: https://github.com/apache/ignite-3/pull/5352#discussion_r1982931007


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.Objects.requireNonNull;
+
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents replica primacy info. Contains the following information:
+ *
+ * <ul>
+ *     <li>{@code leaseStartTime} - the moment when the replica became primary 
(only filled for {@link PrimaryReplicaRequest}s)</li>
+ *     <li>{@code isPrimary} - whether this node currently hosts the primary 
(only filled for {@link ReadOnlyReplicaRequest}s
+ *     and {@link ReplicaSafeTimeSyncRequest}s)</li>
+ * </ul>
+ */
+public class ReplicaPrimacy {
+    private final @Nullable Long leaseStartTime;
+    private final @Nullable Boolean isPrimary;
+
+    private ReplicaPrimacy(@Nullable Long leaseStartTime, @Nullable Boolean 
isPrimary) {
+        this.leaseStartTime = leaseStartTime;
+        this.isPrimary = isPrimary;
+    }
+
+    /**
+     * Creates an instance representing no primacy information.
+     */
+    public static ReplicaPrimacy empty() {
+        return new ReplicaPrimacy(null, null);

Review Comment:
   Shall we introduce a constant for this?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Logic related to replica primacy checks.
+ */
+public class ReplicaPrimacyEngine {
+    private final LeasePlacementDriver placementDriver;
+    private final ClockService clockService;
+    private final ReplicationGroupId replicationGroupId;
+    private final ClusterNode localNode;
+
+    /** Constructor. */
+    public ReplicaPrimacyEngine(
+            LeasePlacementDriver placementDriver,
+            ClockService clockService,
+            ReplicationGroupId replicationGroupId,
+            ClusterNode localNode
+    ) {
+        this.placementDriver = placementDriver;
+        this.clockService = clockService;
+        this.replicationGroupId = replicationGroupId;
+        this.localNode = localNode;
+    }
+
+    /**
+     * Validates replica primacy.
+     *
+     * <ul>
+     *     <li>For {@link PrimaryReplicaRequest}s, ensures that the primary 
replica is known, that it is hosted to this node,

Review Comment:
   ```suggestion
        *     <li>For {@link PrimaryReplicaRequest}s, ensures that the primary 
replica is known, that it is hosted on this node,
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Logic related to replica primacy checks.
+ */
+public class ReplicaPrimacyEngine {
+    private final LeasePlacementDriver placementDriver;
+    private final ClockService clockService;
+    private final ReplicationGroupId replicationGroupId;
+    private final ClusterNode localNode;
+
+    /** Constructor. */
+    public ReplicaPrimacyEngine(
+            LeasePlacementDriver placementDriver,
+            ClockService clockService,
+            ReplicationGroupId replicationGroupId,
+            ClusterNode localNode
+    ) {
+        this.placementDriver = placementDriver;
+        this.clockService = clockService;
+        this.replicationGroupId = replicationGroupId;
+        this.localNode = localNode;
+    }
+
+    /**
+     * Validates replica primacy.
+     *
+     * <ul>
+     *     <li>For {@link PrimaryReplicaRequest}s, ensures that the primary 
replica is known, that it is hosted to this node,
+     *     that it has not expired and corresponds to the request (i.e. it has 
not changed). If something of the above is violated, the

Review Comment:
   ```suggestion
        *     that it has not expired and corresponds to the request (i.e. it 
has not changed). If anything of the above is violated, the
   ```



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Logic related to replica primacy checks.
+ */
+public class ReplicaPrimacyEngine {
+    private final LeasePlacementDriver placementDriver;
+    private final ClockService clockService;
+    private final ReplicationGroupId replicationGroupId;
+    private final ClusterNode localNode;
+
+    /** Constructor. */
+    public ReplicaPrimacyEngine(
+            LeasePlacementDriver placementDriver,
+            ClockService clockService,
+            ReplicationGroupId replicationGroupId,
+            ClusterNode localNode
+    ) {
+        this.placementDriver = placementDriver;
+        this.clockService = clockService;
+        this.replicationGroupId = replicationGroupId;
+        this.localNode = localNode;
+    }
+
+    /**
+     * Validates replica primacy.
+     *
+     * <ul>
+     *     <li>For {@link PrimaryReplicaRequest}s, ensures that the primary 
replica is known, that it is hosted to this node,
+     *     that it has not expired and corresponds to the request (i.e. it has 
not changed). If something of the above is violated, the
+     *     future is completed with a {@link PrimaryReplicaMissException}.</li>
+     *     <li>For {@link ReadOnlyReplicaRequest}s and {@link 
ReplicaSafeTimeSyncRequest}s, detects whether this node is/was a primary
+     *     at the corresponding timestamp.</li>
+     * </ul>
+     *
+     * @param request Replica request.
+     */
+    public CompletableFuture<ReplicaPrimacy> validatePrimacy(ReplicaRequest 
request) {
+        HybridTimestamp now = clockService.current();
+
+        if (request instanceof PrimaryReplicaRequest) {
+            PrimaryReplicaRequest primaryReplicaRequest = 
(PrimaryReplicaRequest) request;
+            return ensureReplicaIsPrimary(primaryReplicaRequest, now);
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else {
+            return completedFuture(ReplicaPrimacy.empty());
+        }
+    }
+
+    private CompletableFuture<ReplicaPrimacy> ensureReplicaIsPrimary(
+            PrimaryReplicaRequest primaryReplicaRequest,
+            HybridTimestamp now
+    ) {
+        Long enlistmentConsistencyToken = 
primaryReplicaRequest.enlistmentConsistencyToken();
+
+        Function<ReplicaMeta, ReplicaPrimacy> validateClo = primaryReplicaMeta 
-> {
+            if (primaryReplicaMeta == null) {
+                throw new PrimaryReplicaMissException(
+                        localNode.name(),
+                        null,
+                        localNode.id(),
+                        null,
+                        enlistmentConsistencyToken,
+                        null,
+                        null
+                );
+            }
+
+            long currentEnlistmentConsistencyToken = 
primaryReplicaMeta.getStartTime().longValue();
+
+            if (enlistmentConsistencyToken != currentEnlistmentConsistencyToken
+                    || 
clockService.before(primaryReplicaMeta.getExpirationTime(), now)
+                    || !isLocalPeer(primaryReplicaMeta.getLeaseholderId())
+            ) {
+                throw new PrimaryReplicaMissException(
+                        localNode.name(),
+                        primaryReplicaMeta.getLeaseholder(),
+                        localNode.id(),
+                        primaryReplicaMeta.getLeaseholderId(),
+                        enlistmentConsistencyToken,
+                        currentEnlistmentConsistencyToken,
+                        null);
+            }
+
+            return 
ReplicaPrimacy.forPrimaryReplicaRequest(primaryReplicaMeta.getStartTime().longValue());
+        };
+
+        ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(replicationGroupId, now);
+
+        if (meta != null) {
+            try {
+                return completedFuture(validateClo.apply(meta));
+            } catch (Exception e) {
+                return failedFuture(e);
+            }
+        }
+
+        return placementDriver.getPrimaryReplica(replicationGroupId, 
now).thenApply(validateClo);
+    }
+
+    private CompletableFuture<ReplicaPrimacy> 
isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) {
+        return placementDriver.getPrimaryReplica(replicationGroupId, timestamp)
+                .thenApply(primaryReplica -> ReplicaPrimacy.forIsPrimary(
+                        primaryReplica != null && 
isLocalPeer(primaryReplica.getLeaseholderId())
+                ));
+    }
+
+    private boolean isLocalPeer(UUID nodeId) {
+        return localNode.id().equals(nodeId);
+    }
+
+    /**
+     * Checks whether the token is still valid primary.

Review Comment:
   How can a token be primary?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Logic related to replica primacy checks.
+ */
+public class ReplicaPrimacyEngine {
+    private final LeasePlacementDriver placementDriver;
+    private final ClockService clockService;
+    private final ReplicationGroupId replicationGroupId;
+    private final ClusterNode localNode;
+
+    /** Constructor. */
+    public ReplicaPrimacyEngine(
+            LeasePlacementDriver placementDriver,
+            ClockService clockService,
+            ReplicationGroupId replicationGroupId,
+            ClusterNode localNode
+    ) {
+        this.placementDriver = placementDriver;
+        this.clockService = clockService;
+        this.replicationGroupId = replicationGroupId;
+        this.localNode = localNode;
+    }
+
+    /**
+     * Validates replica primacy.
+     *
+     * <ul>
+     *     <li>For {@link PrimaryReplicaRequest}s, ensures that the primary 
replica is known, that it is hosted to this node,
+     *     that it has not expired and corresponds to the request (i.e. it has 
not changed). If something of the above is violated, the

Review Comment:
   > corresponds to the request
   
   I don't understand what you mean here



##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class ZonePartitionReplicaListenerTest extends BaseIgniteAbstractTest {
+    private final ClusterNode localNode = new ClusterNodeImpl(nodeId(1), 
"node1", NetworkAddress.from("127.0.0.1:127"));
+
+    private final ClusterNode anotherNode = new ClusterNodeImpl(nodeId(2), 
"node2", NetworkAddress.from("127.0.0.2:127"));
+
+    @Spy
+    private final TxStatePartitionStorage txStateStorage = new 
TestTxStatePartitionStorage();
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private final ClockService clockService = new TestClockService(clock);
+
+    @Mock
+    private TxManager txManager;

Review Comment:
   Such mocks can be injected directly into the `setUp` method, I think it will 
make the test cleaner



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacy.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.Objects.requireNonNull;
+
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents replica primacy info. Contains the following information:
+ *
+ * <ul>
+ *     <li>{@code leaseStartTime} - the moment when the replica became primary 
(only filled for {@link PrimaryReplicaRequest}s)</li>
+ *     <li>{@code isPrimary} - whether this node currently hosts the primary 
(only filled for {@link ReadOnlyReplicaRequest}s
+ *     and {@link ReplicaSafeTimeSyncRequest}s)</li>
+ * </ul>
+ */
+public class ReplicaPrimacy {
+    private final @Nullable Long leaseStartTime;
+    private final @Nullable Boolean isPrimary;
+
+    private ReplicaPrimacy(@Nullable Long leaseStartTime, @Nullable Boolean 
isPrimary) {
+        this.leaseStartTime = leaseStartTime;
+        this.isPrimary = isPrimary;
+    }
+
+    /**
+     * Creates an instance representing no primacy information.
+     */
+    public static ReplicaPrimacy empty() {
+        return new ReplicaPrimacy(null, null);
+    }
+
+    /**
+     * Creates an instance representing information about the primary replica 
held by this node.
+     */
+    static ReplicaPrimacy forPrimaryReplicaRequest(long leaseStartTime) {

Review Comment:
   I understand that some requests need only the lease start time, while others 
only need to know if the current replica is primary. But why can't we unify 
these two situations? I mean, we can only have the `leaseStartTime`, which will 
be `null` if the current node is not the primary replica, and remove the 
`isPrimary` flag.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Logic related to replica primacy checks.
+ */
+public class ReplicaPrimacyEngine {
+    private final LeasePlacementDriver placementDriver;
+    private final ClockService clockService;
+    private final ReplicationGroupId replicationGroupId;
+    private final ClusterNode localNode;
+
+    /** Constructor. */
+    public ReplicaPrimacyEngine(
+            LeasePlacementDriver placementDriver,
+            ClockService clockService,
+            ReplicationGroupId replicationGroupId,
+            ClusterNode localNode
+    ) {
+        this.placementDriver = placementDriver;
+        this.clockService = clockService;
+        this.replicationGroupId = replicationGroupId;
+        this.localNode = localNode;
+    }
+
+    /**
+     * Validates replica primacy.
+     *
+     * <ul>
+     *     <li>For {@link PrimaryReplicaRequest}s, ensures that the primary 
replica is known, that it is hosted to this node,
+     *     that it has not expired and corresponds to the request (i.e. it has 
not changed). If something of the above is violated, the
+     *     future is completed with a {@link PrimaryReplicaMissException}.</li>
+     *     <li>For {@link ReadOnlyReplicaRequest}s and {@link 
ReplicaSafeTimeSyncRequest}s, detects whether this node is/was a primary
+     *     at the corresponding timestamp.</li>
+     * </ul>
+     *
+     * @param request Replica request.
+     */
+    public CompletableFuture<ReplicaPrimacy> validatePrimacy(ReplicaRequest 
request) {
+        HybridTimestamp now = clockService.current();
+
+        if (request instanceof PrimaryReplicaRequest) {
+            PrimaryReplicaRequest primaryReplicaRequest = 
(PrimaryReplicaRequest) request;
+            return ensureReplicaIsPrimary(primaryReplicaRequest, now);
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else {
+            return completedFuture(ReplicaPrimacy.empty());
+        }
+    }
+
+    private CompletableFuture<ReplicaPrimacy> ensureReplicaIsPrimary(
+            PrimaryReplicaRequest primaryReplicaRequest,
+            HybridTimestamp now
+    ) {
+        Long enlistmentConsistencyToken = 
primaryReplicaRequest.enlistmentConsistencyToken();
+
+        Function<ReplicaMeta, ReplicaPrimacy> validateClo = primaryReplicaMeta 
-> {

Review Comment:
   can we make this a method?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Logic related to replica primacy checks.
+ */
+public class ReplicaPrimacyEngine {
+    private final LeasePlacementDriver placementDriver;
+    private final ClockService clockService;
+    private final ReplicationGroupId replicationGroupId;
+    private final ClusterNode localNode;
+
+    /** Constructor. */
+    public ReplicaPrimacyEngine(
+            LeasePlacementDriver placementDriver,
+            ClockService clockService,
+            ReplicationGroupId replicationGroupId,
+            ClusterNode localNode
+    ) {
+        this.placementDriver = placementDriver;
+        this.clockService = clockService;
+        this.replicationGroupId = replicationGroupId;
+        this.localNode = localNode;
+    }
+
+    /**
+     * Validates replica primacy.
+     *
+     * <ul>
+     *     <li>For {@link PrimaryReplicaRequest}s, ensures that the primary 
replica is known, that it is hosted to this node,
+     *     that it has not expired and corresponds to the request (i.e. it has 
not changed). If something of the above is violated, the
+     *     future is completed with a {@link PrimaryReplicaMissException}.</li>
+     *     <li>For {@link ReadOnlyReplicaRequest}s and {@link 
ReplicaSafeTimeSyncRequest}s, detects whether this node is/was a primary
+     *     at the corresponding timestamp.</li>
+     * </ul>
+     *
+     * @param request Replica request.
+     */
+    public CompletableFuture<ReplicaPrimacy> validatePrimacy(ReplicaRequest 
request) {
+        HybridTimestamp now = clockService.current();
+
+        if (request instanceof PrimaryReplicaRequest) {
+            PrimaryReplicaRequest primaryReplicaRequest = 
(PrimaryReplicaRequest) request;
+            return ensureReplicaIsPrimary(primaryReplicaRequest, now);
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else {
+            return completedFuture(ReplicaPrimacy.empty());
+        }
+    }
+
+    private CompletableFuture<ReplicaPrimacy> ensureReplicaIsPrimary(
+            PrimaryReplicaRequest primaryReplicaRequest,
+            HybridTimestamp now
+    ) {
+        Long enlistmentConsistencyToken = 
primaryReplicaRequest.enlistmentConsistencyToken();
+
+        Function<ReplicaMeta, ReplicaPrimacy> validateClo = primaryReplicaMeta 
-> {
+            if (primaryReplicaMeta == null) {
+                throw new PrimaryReplicaMissException(
+                        localNode.name(),
+                        null,
+                        localNode.id(),
+                        null,
+                        enlistmentConsistencyToken,
+                        null,
+                        null
+                );
+            }
+
+            long currentEnlistmentConsistencyToken = 
primaryReplicaMeta.getStartTime().longValue();
+
+            if (enlistmentConsistencyToken != currentEnlistmentConsistencyToken
+                    || 
clockService.before(primaryReplicaMeta.getExpirationTime(), now)
+                    || !isLocalPeer(primaryReplicaMeta.getLeaseholderId())

Review Comment:
   `getLeaseholderId` can return `null` and Idea doesn't like that, can we do 
something with it?



##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListenerTest.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ClusterNodeResolver;
+import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class ZonePartitionReplicaListenerTest extends BaseIgniteAbstractTest {
+    private final ClusterNode localNode = new ClusterNodeImpl(nodeId(1), 
"node1", NetworkAddress.from("127.0.0.1:127"));
+
+    private final ClusterNode anotherNode = new ClusterNodeImpl(nodeId(2), 
"node2", NetworkAddress.from("127.0.0.2:127"));
+
+    @Spy
+    private final TxStatePartitionStorage txStateStorage = new 
TestTxStatePartitionStorage();
+
+    private final HybridClock clock = new HybridClockImpl();
+
+    private final ClockService clockService = new TestClockService(clock);
+
+    @Mock
+    private TxManager txManager;
+
+    @Mock
+    private ValidationSchemasSource validationSchemasSource;
+
+    @Mock
+    private SchemaSyncService schemaSyncService;
+
+    @Mock
+    private CatalogService catalogService;
+
+    @Spy
+    private final TestPlacementDriver placementDriver = new 
TestPlacementDriver(localNode);
+
+    @Mock
+    private ClusterNodeResolver clusterNodeResolver;
+
+    @Mock
+    private RaftCommandRunner raftClient;
+
+    private final ZonePartitionId groupId = new ZonePartitionId(1, 2);
+
+    private ZonePartitionReplicaListener partitionReplicaListener;
+
+    private static UUID nodeId(int id) {
+        return new UUID(0, id);
+    }
+
+    @BeforeEach
+    void setUp() {
+        partitionReplicaListener = new ZonePartitionReplicaListener(
+                txStateStorage,
+                clockService,
+                txManager,
+                validationSchemasSource,
+                schemaSyncService,
+                catalogService,
+                placementDriver,
+                clusterNodeResolver,
+                raftClient,
+                localNode,
+                groupId
+        );
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void primaryReplicaRequestsAreRejectedWhenPrimaryIsNotKnown(Class<? 
extends PrimaryReplicaRequest> requestClass) {
+        doReturn(null).when(placementDriver).getCurrentPrimaryReplica(any(), 
any());
+        
doReturn(nullCompletedFuture()).when(placementDriver).getPrimaryReplica(any(), 
any());
+
+        PrimaryReplicaRequest request = mock(requestClass);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void 
primaryReplicaRequestsAreRejectedWhenPrimaryDoesNotMatchLeaseStartTime(Class<? 
extends PrimaryReplicaRequest> requestClass) {
+        long leaseStartTime = clock.nowLong();
+        placementDriver.setPrimaryReplicaSupplier(
+                () -> new TestReplicaMetaImpl(localNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE)
+        );
+
+        PrimaryReplicaRequest request = mock(requestClass);
+        when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime - 
1000);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void primaryReplicaRequestsAreRejectedWhenLeaseIsExpired(Class<? extends 
PrimaryReplicaRequest> requestClass) {
+        long leaseStartTime = clock.nowLong();
+        placementDriver.setPrimaryReplicaSupplier(
+                () -> new TestReplicaMetaImpl(localNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MIN_VALUE)
+        );
+
+        PrimaryReplicaRequest request = mock(requestClass);
+        when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);
+
+        assertThat(partitionReplicaListener.invoke(request, localNode.id()), 
willThrow(PrimaryReplicaMissException.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {PrimaryReplicaRequest.class, 
TxStateCommitPartitionRequest.class})
+    void primaryReplicaRequestsAreRejectedWhenLeaseholderIsDifferent(Class<? 
extends PrimaryReplicaRequest> requestClass) {
+        long leaseStartTime = clock.nowLong();
+        placementDriver.setPrimaryReplicaSupplier(
+                () -> new TestReplicaMetaImpl(anotherNode, 
hybridTimestamp(leaseStartTime), HybridTimestamp.MAX_VALUE)
+        );
+
+        PrimaryReplicaRequest request = mock(requestClass);
+        
lenient().when(request.enlistmentConsistencyToken()).thenReturn(leaseStartTime);

Review Comment:
   why is this `lenient`?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaPrimacyEngine.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyReplicaRequest;
+import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Logic related to replica primacy checks.
+ */
+public class ReplicaPrimacyEngine {
+    private final LeasePlacementDriver placementDriver;
+    private final ClockService clockService;
+    private final ReplicationGroupId replicationGroupId;
+    private final ClusterNode localNode;
+
+    /** Constructor. */
+    public ReplicaPrimacyEngine(
+            LeasePlacementDriver placementDriver,
+            ClockService clockService,
+            ReplicationGroupId replicationGroupId,
+            ClusterNode localNode
+    ) {
+        this.placementDriver = placementDriver;
+        this.clockService = clockService;
+        this.replicationGroupId = replicationGroupId;
+        this.localNode = localNode;
+    }
+
+    /**
+     * Validates replica primacy.
+     *
+     * <ul>
+     *     <li>For {@link PrimaryReplicaRequest}s, ensures that the primary 
replica is known, that it is hosted to this node,
+     *     that it has not expired and corresponds to the request (i.e. it has 
not changed). If something of the above is violated, the
+     *     future is completed with a {@link PrimaryReplicaMissException}.</li>
+     *     <li>For {@link ReadOnlyReplicaRequest}s and {@link 
ReplicaSafeTimeSyncRequest}s, detects whether this node is/was a primary
+     *     at the corresponding timestamp.</li>
+     * </ul>
+     *
+     * @param request Replica request.
+     */
+    public CompletableFuture<ReplicaPrimacy> validatePrimacy(ReplicaRequest 
request) {
+        HybridTimestamp now = clockService.current();
+
+        if (request instanceof PrimaryReplicaRequest) {
+            PrimaryReplicaRequest primaryReplicaRequest = 
(PrimaryReplicaRequest) request;
+            return ensureReplicaIsPrimary(primaryReplicaRequest, now);
+        } else if (request instanceof ReadOnlyReplicaRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else if (request instanceof ReplicaSafeTimeSyncRequest) {
+            return isLocalNodePrimaryReplicaAt(now);
+        } else {
+            return completedFuture(ReplicaPrimacy.empty());
+        }
+    }
+
+    private CompletableFuture<ReplicaPrimacy> ensureReplicaIsPrimary(
+            PrimaryReplicaRequest primaryReplicaRequest,
+            HybridTimestamp now
+    ) {
+        Long enlistmentConsistencyToken = 
primaryReplicaRequest.enlistmentConsistencyToken();
+
+        Function<ReplicaMeta, ReplicaPrimacy> validateClo = primaryReplicaMeta 
-> {
+            if (primaryReplicaMeta == null) {
+                throw new PrimaryReplicaMissException(
+                        localNode.name(),
+                        null,
+                        localNode.id(),
+                        null,
+                        enlistmentConsistencyToken,
+                        null,
+                        null
+                );
+            }
+
+            long currentEnlistmentConsistencyToken = 
primaryReplicaMeta.getStartTime().longValue();
+
+            if (enlistmentConsistencyToken != currentEnlistmentConsistencyToken
+                    || 
clockService.before(primaryReplicaMeta.getExpirationTime(), now)
+                    || !isLocalPeer(primaryReplicaMeta.getLeaseholderId())
+            ) {
+                throw new PrimaryReplicaMissException(
+                        localNode.name(),
+                        primaryReplicaMeta.getLeaseholder(),
+                        localNode.id(),
+                        primaryReplicaMeta.getLeaseholderId(),
+                        enlistmentConsistencyToken,
+                        currentEnlistmentConsistencyToken,
+                        null);
+            }
+
+            return 
ReplicaPrimacy.forPrimaryReplicaRequest(primaryReplicaMeta.getStartTime().longValue());
+        };
+
+        ReplicaMeta meta = 
placementDriver.getCurrentPrimaryReplica(replicationGroupId, now);
+
+        if (meta != null) {
+            try {
+                return completedFuture(validateClo.apply(meta));
+            } catch (Exception e) {
+                return failedFuture(e);
+            }
+        }
+
+        return placementDriver.getPrimaryReplica(replicationGroupId, 
now).thenApply(validateClo);
+    }
+
+    private CompletableFuture<ReplicaPrimacy> 
isLocalNodePrimaryReplicaAt(HybridTimestamp timestamp) {
+        return placementDriver.getPrimaryReplica(replicationGroupId, timestamp)
+                .thenApply(primaryReplica -> ReplicaPrimacy.forIsPrimary(
+                        primaryReplica != null && 
isLocalPeer(primaryReplica.getLeaseholderId())
+                ));
+    }
+
+    private boolean isLocalPeer(UUID nodeId) {
+        return localNode.id().equals(nodeId);
+    }
+
+    /**
+     * Checks whether the token is still valid primary.
+     *
+     * @param suspectedEnlistmentConsistencyToken Enlistment consistency token.
+     */
+    public boolean isTokenStillValidPrimary(long 
suspectedEnlistmentConsistencyToken) {

Review Comment:
   Why is it `suspected`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to