sk0x50 commented on code in PR #5197:
URL: https://github.com/apache/ignite-3/pull/5197#discussion_r1956186675


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java:
##########
@@ -82,44 +99,113 @@ public ZonePartitionReplicaListener(
                 schemaSyncService,
                 catalogService,
                 raftClient,
-                replicationGroupId
-        );
+                replicationGroupId);
+
+        minimumActiveTxTimeReplicaRequestHandler = new 
MinimumActiveTxTimeReplicaRequestHandler(
+                clockService,
+                raftCommandApplicator);
     }
 
     @Override
     public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, 
UUID senderId) {
-        if (!(request instanceof TableAware)) {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 
implement ReplicaSafeTimeSyncRequest processing.
-            if (request instanceof TxFinishReplicaRequest) {
-                return 
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
-                        .thenApply(res -> new ReplicaResult(res, null));
-            } else {
-                if (request instanceof ReplicaSafeTimeSyncRequest) {
-                    LOG.debug("Non table request is not supported by the zone 
partition yet " + request);
-                } else {
-                    LOG.warn("Non table request is not supported by the zone 
partition yet " + request);
-                }
-            }
+        return ensureReplicaIsPrimary(request)
+                .thenCompose(res -> processRequest(request, res.get1(), 
senderId, res.get2()))
+                .thenApply(res -> {
+                    if (res instanceof ReplicaResult) {
+                        return (ReplicaResult) res;
+                    } else {
+                        return new ReplicaResult(res, null);
+                    }
+                });
+    }
+
+    private CompletableFuture<?> processRequest(
+            ReplicaRequest request,
+            @Nullable Boolean isPrimary,
+            UUID senderId,
+            @Nullable Long leaseStartTime
+    ) {
+        if (request instanceof TableAware) {
+            // This type of request propagates to the table processor directly.
+            return processTableAwareRequest(request, senderId);
+        }
 
-            return completedFuture(new ReplicaResult(null, null));
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement 
ReplicaSafeTimeSyncRequest processing.
+        if (request instanceof TxFinishReplicaRequest) {
+            return 
txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request)
+                    .thenApply(res -> new ReplicaResult(res, null));
+        }
+
+        return processZoneReplicaRequest(request, isPrimary, senderId, 
leaseStartTime);
+    }
+
+    /**
+     * Ensure that the primary replica was not changed.
+     *
+     * @param request Replica request.
+     * @return Future with {@link IgniteBiTuple} containing {@code boolean} 
(whether the replica is primary) and the start time of current
+     *     lease. The boolean is not {@code null} only for {@link 
ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The
+     *     lease start time is not {@code null} in case of {@link 
PrimaryReplicaRequest}.
+     */
+    private CompletableFuture<IgniteBiTuple<Boolean, Long>> 
ensureReplicaIsPrimary(ReplicaRequest request) {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-24380
+        // Move PartitionReplicaListener#ensureReplicaIsPrimary to 
ZonePartitionReplicaListener.
+        return completedFuture(new IgniteBiTuple<>(null, null));
+    }
+
+    /**
+     * Processes {@link TableAware} request.
+     *
+     * @param request Request to be processed.
+     * @param senderId Node sender id.
+     * @return Future with the result of the request.
+     */
+    private CompletableFuture<ReplicaResult> 
processTableAwareRequest(ReplicaRequest request, UUID senderId) {
+        assert request instanceof TableAware : "Request should be TableAware 
[request=" + request.getClass().getSimpleName() + ']';
+
+        int partitionId;
+
+        ReplicationGroupId replicationGroupId = 
request.groupId().asReplicationGroupId();
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine 
this code when the zone based replication will be done.
+        if (replicationGroupId instanceof  TablePartitionId) {
+            partitionId = ((TablePartitionId) 
replicationGroupId).partitionId();
+        } else if (replicationGroupId instanceof ZonePartitionId) {
+            partitionId = ((ZonePartitionId) replicationGroupId).partitionId();
+        } else {
+            throw new IllegalArgumentException("Requests with replication 
group type "
+                    + request.groupId().getClass() + " is not supported");
+        }
+
+        return replicas.get(new TablePartitionId(((TableAware) 
request).tableId(), partitionId))
+                .invoke(request, senderId);
+    }
+
+    /**
+     * Processes zone replica request.
+     *
+     * @param request Request to be processed.
+     * @param isPrimary {@code true} if the current node is the primary for 
the partition, {@code false} otherwise.
+     * @param senderId Node sender id.
+     * @param leaseStartTime Lease start time.
+     * @return Future with the result of the processing.
+     */
+    private CompletableFuture<?> processZoneReplicaRequest(
+            ReplicaRequest request,
+            @Nullable Boolean isPrimary,
+            UUID senderId,
+            @Nullable Long leaseStartTime
+    ) {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-24526

Review Comment:
   :ok_hand: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to