sk0x50 commented on code in PR #5197: URL: https://github.com/apache/ignite-3/pull/5197#discussion_r1956186675
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -82,44 +99,113 @@ public ZonePartitionReplicaListener( schemaSyncService, catalogService, raftClient, - replicationGroupId - ); + replicationGroupId); + + minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( + clockService, + raftCommandApplicator); } @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { - if (!(request instanceof TableAware)) { - // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement ReplicaSafeTimeSyncRequest processing. - if (request instanceof TxFinishReplicaRequest) { - return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request) - .thenApply(res -> new ReplicaResult(res, null)); - } else { - if (request instanceof ReplicaSafeTimeSyncRequest) { - LOG.debug("Non table request is not supported by the zone partition yet " + request); - } else { - LOG.warn("Non table request is not supported by the zone partition yet " + request); - } - } + return ensureReplicaIsPrimary(request) + .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) + .thenApply(res -> { + if (res instanceof ReplicaResult) { + return (ReplicaResult) res; + } else { + return new ReplicaResult(res, null); + } + }); + } + + private CompletableFuture<?> processRequest( + ReplicaRequest request, + @Nullable Boolean isPrimary, + UUID senderId, + @Nullable Long leaseStartTime + ) { + if (request instanceof TableAware) { + // This type of request propagates to the table processor directly. + return processTableAwareRequest(request, senderId); + } - return completedFuture(new ReplicaResult(null, null)); + // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement ReplicaSafeTimeSyncRequest processing. + if (request instanceof TxFinishReplicaRequest) { + return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request) + .thenApply(res -> new ReplicaResult(res, null)); + } + + return processZoneReplicaRequest(request, isPrimary, senderId, leaseStartTime); + } + + /** + * Ensure that the primary replica was not changed. + * + * @param request Replica request. + * @return Future with {@link IgniteBiTuple} containing {@code boolean} (whether the replica is primary) and the start time of current + * lease. The boolean is not {@code null} only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The + * lease start time is not {@code null} in case of {@link PrimaryReplicaRequest}. + */ + private CompletableFuture<IgniteBiTuple<Boolean, Long>> ensureReplicaIsPrimary(ReplicaRequest request) { + // TODO https://issues.apache.org/jira/browse/IGNITE-24380 + // Move PartitionReplicaListener#ensureReplicaIsPrimary to ZonePartitionReplicaListener. + return completedFuture(new IgniteBiTuple<>(null, null)); + } + + /** + * Processes {@link TableAware} request. + * + * @param request Request to be processed. + * @param senderId Node sender id. + * @return Future with the result of the request. + */ + private CompletableFuture<ReplicaResult> processTableAwareRequest(ReplicaRequest request, UUID senderId) { + assert request instanceof TableAware : "Request should be TableAware [request=" + request.getClass().getSimpleName() + ']'; + + int partitionId; + + ReplicationGroupId replicationGroupId = request.groupId().asReplicationGroupId(); + + // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine this code when the zone based replication will be done. + if (replicationGroupId instanceof TablePartitionId) { + partitionId = ((TablePartitionId) replicationGroupId).partitionId(); + } else if (replicationGroupId instanceof ZonePartitionId) { + partitionId = ((ZonePartitionId) replicationGroupId).partitionId(); + } else { + throw new IllegalArgumentException("Requests with replication group type " + + request.groupId().getClass() + " is not supported"); + } + + return replicas.get(new TablePartitionId(((TableAware) request).tableId(), partitionId)) + .invoke(request, senderId); + } + + /** + * Processes zone replica request. + * + * @param request Request to be processed. + * @param isPrimary {@code true} if the current node is the primary for the partition, {@code false} otherwise. + * @param senderId Node sender id. + * @param leaseStartTime Lease start time. + * @return Future with the result of the processing. + */ + private CompletableFuture<?> processZoneReplicaRequest( + ReplicaRequest request, + @Nullable Boolean isPrimary, + UUID senderId, + @Nullable Long leaseStartTime + ) { + // TODO https://issues.apache.org/jira/browse/IGNITE-24526 Review Comment: :ok_hand: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org