JAkutenshi commented on code in PR #5197:
URL: https://github.com/apache/ignite-3/pull/5197#discussion_r1955358749


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -82,44 +108,130 @@ public ZonePartitionReplicaListener(
                 schemaSyncService,
                 catalogService,
                 raftClient,
-                replicationGroupId
-        );
+                replicationGroupId);
+
+        minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
+                clockService,
+                raftCommandApplicator);
     }
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
-        if (!(request instanceof TableAware)) {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 
implement ReplicaSafeTimeSyncRequest processing.
-            if (request instanceof TxFinishReplicaRequest) {
-                return 
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
-                        .thenApply(res -> new ReplicaResult(res, null));
-            } else {
-                if (request instanceof ReplicaSafeTimeSyncRequest) {
-                    LOG.debug("Non table request is not supported by the zone 
partition yet " + request);
-                } else {
-                    LOG.warn("Non table request is not supported by the zone 
partition yet " + request);
-                }
-            }
+        return ensureReplicaIsPrimary(request)
+                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
+                .thenApply(res -> {
+                    if (res instanceof ReplicaResult) {
+                        return (ReplicaResult) res;
+                    } else {
+                        return new ReplicaResult(res, null);
+                    }
+                });
+    }
 
-            return completedFuture(new ReplicaResult(null, null));
+    private CompletableFuture<?> processRequest(
+            ReplicaRequest request,
+            @Nullable Boolean isPrimary,
+            UUID senderId,
+            @Nullable Long leaseStartTime
+    ) {
+        if (request instanceof TableAware) {
+            // This type of request propagates to the table processor directly.
+            return processTableAwareRequest(request, senderId);
+        }
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement 
ReplicaSafeTimeSyncRequest processing.
+        if (request instanceof TxFinishReplicaRequest) {
+            return 
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
+                    .thenApply(res -> new ReplicaResult(res, null));
+        }
+
+        return processZoneReplicaRequest(request, isPrimary, senderId, 
leaseStartTime);
+    }
+
+    /**
+     * Ensure that the primary replica was not changed.
+     *
+     * @param request Replica request.
+     * @return Future with {@link IgniteBiTuple} containing {@code boolean} 
(whether the replica is primary) and the start time of current
+     *     lease. The boolean is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
+     *     lease start time is not {@code null} in case of {@link 
PrimaryReplicaRequest}.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
ensureReplicaIsPrimary(ReplicaRequest request) {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-24380
+        // Move PartitionReplicaListener#ensureReplicaIsPrimary to 
ZonePartitionReplicaListener.
+        return completedFuture(new IgniteBiTuple<>(null, null));
+    }
+
+    /**
+     * Processes {@link TableAware} request.
+     *
+     * @param request Request to be processed.
+     * @param senderId Node sender id.
+     * @return Future with the result of the request.
+     */
+    private CompletableFuture<ReplicaResult> 
processTableAwareRequest(ReplicaRequest request, UUID senderId) {
+        assert request instanceof TableAware : "Request should be TableAware 
[request=" + request.getClass().getSimpleName() + ']';
+
+        int partitionId;
+
+        ReplicationGroupId replicationGroupId = 
request.groupId().asReplicationGroupId();
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine 
this code when the zone based replication will be done.
+        if (replicationGroupId instanceof  TablePartitionId) {
+            partitionId = ((TablePartitionId) 
replicationGroupId).partitionId();
+        } else if (replicationGroupId instanceof ZonePartitionId) {
+            partitionId = ((ZonePartitionId) replicationGroupId).partitionId();
         } else {
-            int partitionId;
+            throw new IllegalArgumentException("Requests with replication 
group type "
+                    + request.groupId().getClass() + " is not supported");
+        }
 
-            ReplicationGroupId replicationGroupId = 
request.groupId().asReplicationGroupId();
+        return replicas.get(new TablePartitionId(((TableAware) 
request).tableId(), partitionId))
+                .invoke(request, senderId);
+    }
 
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine 
this code when the zone based replication will done.
-            if (replicationGroupId instanceof  TablePartitionId) {
-                partitionId = ((TablePartitionId) 
replicationGroupId).partitionId();
-            } else if (replicationGroupId instanceof ZonePartitionId) {
-                partitionId = ((ZonePartitionId) 
replicationGroupId).partitionId();
-            } else {
-                throw new IllegalArgumentException("Requests with replication 
group type "
-                        + request.groupId().getClass() + " is not supported");
-            }
+    /**
+     * Processes zone replica request.
+     *
+     * @param request Request to be processed.
+     * @param isPrimary {@code true} if the current node is the primary for 
the partition, {@code false} otherwise.
+     * @param senderId Node sender id.
+     * @param leaseStartTime Lease start time.
+     * @return Future with the result of the processing.
+     */
+    private CompletableFuture<?> processZoneReplicaRequest(

Review Comment:
   All non-table related requests should be placed there then? 



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/MinimumActiveTxTimeReplicaRequestHandler.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.handlers;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.ClockService;
+import 
org.apache.ignite.internal.partition.replicator.ReplicationRaftCommandApplicator;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.UpdateMinimumActiveTxBeginTimeReplicaRequest;
+import org.apache.ignite.internal.raft.Command;
+
+/**
+ * Handler for {@link UpdateMinimumActiveTxBeginTimeReplicaRequest}.
+ */
+public class MinimumActiveTxTimeReplicaRequestHandler {

Review Comment:
   There're several handlers (replication requests and RAFT commands) but there 
no a common interface -- we couldn't agree before with the one?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -82,44 +108,130 @@ public ZonePartitionReplicaListener(
                 schemaSyncService,
                 catalogService,
                 raftClient,
-                replicationGroupId
-        );
+                replicationGroupId);
+
+        minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
+                clockService,
+                raftCommandApplicator);
     }
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
-        if (!(request instanceof TableAware)) {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 
implement ReplicaSafeTimeSyncRequest processing.
-            if (request instanceof TxFinishReplicaRequest) {
-                return 
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
-                        .thenApply(res -> new ReplicaResult(res, null));
-            } else {
-                if (request instanceof ReplicaSafeTimeSyncRequest) {
-                    LOG.debug("Non table request is not supported by the zone 
partition yet " + request);
-                } else {
-                    LOG.warn("Non table request is not supported by the zone 
partition yet " + request);
-                }
-            }
+        return ensureReplicaIsPrimary(request)
+                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
+                .thenApply(res -> {
+                    if (res instanceof ReplicaResult) {
+                        return (ReplicaResult) res;
+                    } else {
+                        return new ReplicaResult(res, null);
+                    }
+                });
+    }
 
-            return completedFuture(new ReplicaResult(null, null));
+    private CompletableFuture<?> processRequest(
+            ReplicaRequest request,
+            @Nullable Boolean isPrimary,
+            UUID senderId,
+            @Nullable Long leaseStartTime
+    ) {
+        if (request instanceof TableAware) {

Review Comment:
   Good solution 👍🏻



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to