denis-chudov commented on code in PR #6198: URL: https://github.com/apache/ignite-3/pull/6198#discussion_r2189510153
########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java: ########## @@ -255,63 +257,81 @@ private void enqueuePrimaryReplicaEvents( } } - private void awaitPrimaryReplica( - ReplicationGroupId groupId, - HybridTimestamp timestamp, - CompletableFuture<ReplicaMeta> resultFuture - ) { - inBusyLockAsync(busyLock, () -> getOrCreatePrimaryReplicaWaiter(groupId).waitFor(timestamp) - .thenAccept(replicaMeta -> { - ClusterNode leaseholderNode = clusterNodeResolver.getById(replicaMeta.getLeaseholderId()); - - if (leaseholderNode == null && !resultFuture.isDone()) { - awaitPrimaryReplica( - groupId, - replicaMeta.getExpirationTime().tick(), - resultFuture - ); - } else { - resultFuture.complete(replicaMeta); - } - }) - ); - } - @Override public CompletableFuture<ReplicaMeta> awaitPrimaryReplica( ReplicationGroupId groupId, HybridTimestamp timestamp, long timeout, TimeUnit unit ) { - if (!busyLock.enterBusy()) { - throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()); - } - try { + return inBusyLockAsync(busyLock, () -> { ReplicaMeta currentMeta = getCurrentPrimaryReplica(groupId, timestamp); if (currentMeta != null && clusterNodeResolver.getById(currentMeta.getLeaseholderId()) != null) { return completedFuture(currentMeta); } - } finally { - busyLock.leaveBusy(); - } - CompletableFuture<ReplicaMeta> future = new CompletableFuture<>(); + return awaitPrimaryReplicaImpl(groupId, timestamp, timeout, unit); + }); + } + + private CompletableFuture<ReplicaMeta> awaitPrimaryReplicaImpl( + ReplicationGroupId groupId, + HybridTimestamp timestamp, + long timeout, + TimeUnit unit + ) { + var resultFuture = new CompletableFuture<ReplicaMeta>().orTimeout(timeout, unit); - awaitPrimaryReplica(groupId, timestamp, future); + awaitPrimaryReplicaImpl(groupId, timestamp, resultFuture) + .whenComplete(copyStateTo(resultFuture)); - return future - .orTimeout(timeout, unit) + return resultFuture .exceptionally(e -> { if (e instanceof TimeoutException) { throw new PrimaryReplicaAwaitTimeoutException(groupId, timestamp, leases.leaseByGroupId().get(groupId), e); + } else if (hasCause(e, TrackerClosedException.class)) { + // TrackerClosedException is thrown when trackers are closed on node stop. + throw new CompletionException(new NodeStoppingException(e)); + } else { + throw new PrimaryReplicaAwaitException(groupId, timestamp, e); } - - throw new PrimaryReplicaAwaitException(groupId, timestamp, e); }); } + /** + * Returns a future that completes when the target primary replica appears. + * + * <p>{@code timeoutFuture} here is not used for storing the operation result, but rather works in conjunction with the + * {@link #awaitPrimaryReplicaImpl(ReplicationGroupId, HybridTimestamp, long, TimeUnit)} method which passes a future that timeouts + * after a configurable amount of time. This future is therefore used to stop waiting if the timeout has been reached. + */ + private CompletableFuture<ReplicaMeta> awaitPrimaryReplicaImpl( + ReplicationGroupId groupId, + HybridTimestamp timestamp, + CompletableFuture<?> timeoutFuture Review Comment: This approach with returning the future and accepting the result future looks kinda tricky, could you simplify it somehow? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org