sanpwc commented on code in PR #4877: URL: https://github.com/apache/ignite-3/pull/4877#discussion_r1890216229
########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java: ########## @@ -341,4 +371,92 @@ public CompletableFuture<Void> createSnapshotOn(Member targetMember) { public CompletableFuture<Void> transferLeadershipTo(String targetConsistentId) { return raftClient.transferLeadership(new Peer(targetConsistentId)); } + + private CompletableFuture<Boolean> registerFailoverCallback(PrimaryReplicaEventParameters parameters) { + if (!parameters.leaseholderId().equals(localNode.id()) || !(replicaGrpId.equals(parameters.groupId()))) { + return falseCompletedFuture(); + } + + assert onLeaderElectedFailoverCallback == null : format( + "We already have failover subscription [thisGrpId={}, thisNode={}, givenExpiredPrimaryId={}, givenExpiredPrimaryNode={}", + replicaGrpId, + localNode.name(), + parameters.groupId(), + parameters.leaseholder() + ); + + onLeaderElectedFailoverCallback = (leaderNode, term) -> changePeersAndLearnersAsyncIfPendingExists(term); + + return raftClient + .subscribeLeader(onLeaderElectedFailoverCallback) + .exceptionally(e -> { + LOG.error("Rebalance failover subscription on elected primary replica failed [groupId=" + replicaGrpId + "].", e); + + failureManager.process(new FailureContext(CRITICAL_ERROR, e)); + + return null; + }) + .thenApply(v -> false); + } + + private void changePeersAndLearnersAsyncIfPendingExists(long term) { + getPendingAssignmentsSupplier.apply(replicaGrpId).exceptionally(e -> { + LOG.error("Couldn't fetch pending assignments for rebalance failover [groupId={}, term={}].", e, replicaGrpId, term); + + return null; + }).thenCompose(pendingsBytes -> { + if (pendingsBytes == null) { + return nullCompletedFuture(); + } + + PeersAndLearners newConfiguration = fromAssignments(Assignments.fromBytes(pendingsBytes).nodes()); + + LOG.info( + "New leader elected. Going to apply new configuration [tablePartitionId={}, peers={}, learners={}]", Review Comment: Please add term param here. I've missed it while debugging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org