sanpwc commented on code in PR #4877:
URL: https://github.com/apache/ignite-3/pull/4877#discussion_r1890216229


##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaImpl.java:
##########
@@ -341,4 +371,92 @@ public CompletableFuture<Void> createSnapshotOn(Member 
targetMember) {
     public CompletableFuture<Void> transferLeadershipTo(String 
targetConsistentId) {
         return raftClient.transferLeadership(new Peer(targetConsistentId));
     }
+
+    private CompletableFuture<Boolean> 
registerFailoverCallback(PrimaryReplicaEventParameters parameters) {
+        if (!parameters.leaseholderId().equals(localNode.id()) || 
!(replicaGrpId.equals(parameters.groupId()))) {
+            return falseCompletedFuture();
+        }
+
+        assert onLeaderElectedFailoverCallback == null : format(
+                "We already have failover subscription [thisGrpId={}, 
thisNode={}, givenExpiredPrimaryId={}, givenExpiredPrimaryNode={}",
+                replicaGrpId,
+                localNode.name(),
+                parameters.groupId(),
+                parameters.leaseholder()
+        );
+
+        onLeaderElectedFailoverCallback = (leaderNode, term) -> 
changePeersAndLearnersAsyncIfPendingExists(term);
+
+        return raftClient
+                .subscribeLeader(onLeaderElectedFailoverCallback)
+                .exceptionally(e -> {
+                    LOG.error("Rebalance failover subscription on elected 
primary replica failed [groupId=" + replicaGrpId + "].", e);
+
+                    failureManager.process(new FailureContext(CRITICAL_ERROR, 
e));
+
+                    return null;
+                })
+                .thenApply(v -> false);
+    }
+
+    private void changePeersAndLearnersAsyncIfPendingExists(long term) {
+        getPendingAssignmentsSupplier.apply(replicaGrpId).exceptionally(e -> {
+            LOG.error("Couldn't fetch pending assignments for rebalance 
failover [groupId={}, term={}].", e, replicaGrpId, term);
+
+            return null;
+        }).thenCompose(pendingsBytes -> {
+            if (pendingsBytes == null) {
+                return nullCompletedFuture();
+            }
+
+            PeersAndLearners newConfiguration = 
fromAssignments(Assignments.fromBytes(pendingsBytes).nodes());
+
+            LOG.info(
+                    "New leader elected. Going to apply new configuration 
[tablePartitionId={}, peers={}, learners={}]",

Review Comment:
   Please add term param here. I've missed it while debugging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to