sanpwc commented on code in PR #5197: URL: https://github.com/apache/ignite-3/pull/5197#discussion_r1956032347
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -82,44 +108,130 @@ public ZonePartitionReplicaListener( schemaSyncService, catalogService, raftClient, - replicationGroupId - ); + replicationGroupId); + + minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( + clockService, + raftCommandApplicator); } @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { - if (!(request instanceof TableAware)) { - // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement ReplicaSafeTimeSyncRequest processing. - if (request instanceof TxFinishReplicaRequest) { - return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request) - .thenApply(res -> new ReplicaResult(res, null)); - } else { - if (request instanceof ReplicaSafeTimeSyncRequest) { - LOG.debug("Non table request is not supported by the zone partition yet " + request); - } else { - LOG.warn("Non table request is not supported by the zone partition yet " + request); - } - } + return ensureReplicaIsPrimary(request) + .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) + .thenApply(res -> { + if (res instanceof ReplicaResult) { + return (ReplicaResult) res; + } else { + return new ReplicaResult(res, null); + } + }); + } - return completedFuture(new ReplicaResult(null, null)); + private CompletableFuture<?> processRequest( + ReplicaRequest request, + @Nullable Boolean isPrimary, + UUID senderId, + @Nullable Long leaseStartTime + ) { + if (request instanceof TableAware) { + // This type of request propagates to the table processor directly. + return processTableAwareRequest(request, senderId); + } + + // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement ReplicaSafeTimeSyncRequest processing. + if (request instanceof TxFinishReplicaRequest) { + return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request) + .thenApply(res -> new ReplicaResult(res, null)); + } + + return processZoneReplicaRequest(request, isPrimary, senderId, leaseStartTime); + } + + /** + * Ensure that the primary replica was not changed. + * + * @param request Replica request. + * @return Future with {@link IgniteBiTuple} containing {@code boolean} (whether the replica is primary) and the start time of current + * lease. The boolean is not {@code null} only for {@link ReadOnlyReplicaRequest}. If {@code true}, then replica is primary. The + * lease start time is not {@code null} in case of {@link PrimaryReplicaRequest}. + */ + private CompletableFuture<IgniteBiTuple<Boolean, Long>> ensureReplicaIsPrimary(ReplicaRequest request) { + // TODO https://issues.apache.org/jira/browse/IGNITE-24380 + // Move PartitionReplicaListener#ensureReplicaIsPrimary to ZonePartitionReplicaListener. + return completedFuture(new IgniteBiTuple<>(null, null)); + } + + /** + * Processes {@link TableAware} request. + * + * @param request Request to be processed. + * @param senderId Node sender id. + * @return Future with the result of the request. + */ + private CompletableFuture<ReplicaResult> processTableAwareRequest(ReplicaRequest request, UUID senderId) { + assert request instanceof TableAware : "Request should be TableAware [request=" + request.getClass().getSimpleName() + ']'; + + int partitionId; + + ReplicationGroupId replicationGroupId = request.groupId().asReplicationGroupId(); + + // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine this code when the zone based replication will be done. + if (replicationGroupId instanceof TablePartitionId) { + partitionId = ((TablePartitionId) replicationGroupId).partitionId(); + } else if (replicationGroupId instanceof ZonePartitionId) { + partitionId = ((ZonePartitionId) replicationGroupId).partitionId(); } else { - int partitionId; + throw new IllegalArgumentException("Requests with replication group type " + + request.groupId().getClass() + " is not supported"); + } - ReplicationGroupId replicationGroupId = request.groupId().asReplicationGroupId(); + return replicas.get(new TablePartitionId(((TableAware) request).tableId(), partitionId)) + .invoke(request, senderId); + } - // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 Refine this code when the zone based replication will done. - if (replicationGroupId instanceof TablePartitionId) { - partitionId = ((TablePartitionId) replicationGroupId).partitionId(); - } else if (replicationGroupId instanceof ZonePartitionId) { - partitionId = ((ZonePartitionId) replicationGroupId).partitionId(); - } else { - throw new IllegalArgumentException("Requests with replication group type " - + request.groupId().getClass() + " is not supported"); - } + /** + * Processes zone replica request. + * + * @param request Request to be processed. + * @param isPrimary {@code true} if the current node is the primary for the partition, {@code false} otherwise. + * @param senderId Node sender id. + * @param leaseStartTime Lease start time. + * @return Future with the result of the processing. + */ + private CompletableFuture<?> processZoneReplicaRequest( + ReplicaRequest request, + @Nullable Boolean isPrimary, + UUID senderId, + @Nullable Long leaseStartTime + ) { + boolean hasSchemaVersion = request instanceof SchemaVersionAwareReplicaRequest; - return replicas.get(new TablePartitionId(((TableAware) request).tableId(), partitionId)) - .invoke(request, senderId); + if (hasSchemaVersion) { + assert ((SchemaVersionAwareReplicaRequest) request).schemaVersion() > 0 : + "Schema version is not passed [request=" + request + ']'; } + Review Comment: I believe that we also may move ``` if (request instanceof ReadWriteReplicaRequest) { var req = (ReadWriteReplicaRequest) request; // Saving state is not needed for full transactions. if (!req.full()) { txManager.updateTxMeta(req.transactionId(), old -> new TxStateMeta( PENDING, req.coordinatorId(), req.commitPartitionId().asTablePartitionId(), null, old == null ? null : old.tx() )); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org