JAkutenshi commented on code in PR #5197: URL: https://github.com/apache/ignite-3/pull/5197#discussion_r1955355371
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -82,44 +108,130 @@ public ZonePartitionReplicaListener( schemaSyncService, catalogService, raftClient, - replicationGroupId - ); + replicationGroupId); + + minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( + clockService, + raftCommandApplicator); } @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { - if (!(request instanceof TableAware)) { - // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement ReplicaSafeTimeSyncRequest processing. - if (request instanceof TxFinishReplicaRequest) { - return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request) - .thenApply(res -> new ReplicaResult(res, null)); - } else { - if (request instanceof ReplicaSafeTimeSyncRequest) { - LOG.debug("Non table request is not supported by the zone partition yet " + request); - } else { - LOG.warn("Non table request is not supported by the zone partition yet " + request); - } - } + return ensureReplicaIsPrimary(request) + .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) + .thenApply(res -> { + if (res instanceof ReplicaResult) { + return (ReplicaResult) res; + } else { + return new ReplicaResult(res, null); + } + }); + } - return completedFuture(new ReplicaResult(null, null)); + private CompletableFuture<?> processRequest( + ReplicaRequest request, + @Nullable Boolean isPrimary, + UUID senderId, + @Nullable Long leaseStartTime + ) { + if (request instanceof TableAware) { + // This type of request propagates to the table processor directly. + return processTableAwareRequest(request, senderId); Review Comment: Good solution 👍🏻 ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java: ########## @@ -82,44 +108,130 @@ public ZonePartitionReplicaListener( schemaSyncService, catalogService, raftClient, - replicationGroupId - ); + replicationGroupId); + + minimumActiveTxTimeReplicaRequestHandler = new MinimumActiveTxTimeReplicaRequestHandler( + clockService, + raftCommandApplicator); } @Override public CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, UUID senderId) { - if (!(request instanceof TableAware)) { - // TODO: https://issues.apache.org/jira/browse/IGNITE-22620 implement ReplicaSafeTimeSyncRequest processing. - if (request instanceof TxFinishReplicaRequest) { - return txFinishReplicaRequestHandler.handle((TxFinishReplicaRequest) request) - .thenApply(res -> new ReplicaResult(res, null)); - } else { - if (request instanceof ReplicaSafeTimeSyncRequest) { - LOG.debug("Non table request is not supported by the zone partition yet " + request); - } else { - LOG.warn("Non table request is not supported by the zone partition yet " + request); - } - } + return ensureReplicaIsPrimary(request) + .thenCompose(res -> processRequest(request, res.get1(), senderId, res.get2())) + .thenApply(res -> { + if (res instanceof ReplicaResult) { + return (ReplicaResult) res; + } else { + return new ReplicaResult(res, null); + } + }); + } - return completedFuture(new ReplicaResult(null, null)); + private CompletableFuture<?> processRequest( + ReplicaRequest request, + @Nullable Boolean isPrimary, + UUID senderId, + @Nullable Long leaseStartTime + ) { + if (request instanceof TableAware) { + // This type of request propagates to the table processor directly. + return processTableAwareRequest(request, senderId); Review Comment: Good solution 👍🏻 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org