XComp commented on code in PR #25679:
URL: https://github.com/apache/flink/pull/25679#discussion_r1863572414


##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java:
##########
@@ -244,38 +246,45 @@ private void removeContender(EmbeddedLeaderElection 
embeddedLeaderElection) {
     }
 
     /** Callback from leader contenders when they confirm a leader grant. */
+    @GuardedBy("lock")
     private void confirmLeader(
             final EmbeddedLeaderElection embeddedLeaderElection,
             final UUID leaderSessionId,
             final String leaderAddress) {
-        synchronized (lock) {
-            // if the leader election was shut down in the meantime, ignore 
this confirmation
-            if (!embeddedLeaderElection.running || shutdown) {
-                return;
-            }
-
-            try {
-                // check if the confirmation is for the same grant, or whether 
it is a stale grant
-                if (embeddedLeaderElection == currentLeaderProposed
-                        && currentLeaderSessionId.equals(leaderSessionId)) {
-                    LOG.info(
-                            "Received confirmation of leadership for leader {} 
, session={}",
-                            leaderAddress,
-                            leaderSessionId);
-
-                    // mark leadership
-                    currentLeaderConfirmed = embeddedLeaderElection;
-                    currentLeaderAddress = leaderAddress;
-                    currentLeaderProposed = null;
+        Preconditions.checkState(
+                currentLeaderProposed == embeddedLeaderElection,
+                "The confirmLeader method should only be called when having 
the leadership acquired.");
+        LOG.info(
+                "Received confirmation of leadership for leader {} , 
session={}",
+                leaderAddress,
+                leaderSessionId);
+
+        // mark leadership
+        currentLeaderConfirmed = embeddedLeaderElection;
+        currentLeaderAddress = leaderAddress;
+        currentLeaderProposed = null;
+
+        // notify all listeners
+        notifyAllListeners(leaderAddress, leaderSessionId);
+    }
 
-                    // notify all listeners
-                    notifyAllListeners(leaderAddress, leaderSessionId);
-                } else {
-                    LOG.debug(
-                            "Received confirmation of leadership for a stale 
leadership grant. Ignoring.");
+    private void runAsLeader(
+            EmbeddedLeaderElection embeddedLeaderElection,
+            UUID leaderSessionId,
+            ThrowingRunnable<? extends Throwable> runnable)
+            throws LeadershipLostException {
+        synchronized (lock) {
+            if (embeddedLeaderElection.running
+                    && !shutdown
+                    && embeddedLeaderElection.isLeader
+                    && currentLeaderSessionId.equals(leaderSessionId)) {
+                try {
+                    runnable.run();
+                } catch (Throwable t) {
+                    fatalError(t);
                 }
-            } catch (Throwable t) {
-                fatalError(t);
+            } else {
+                throw new LeadershipLostException(leaderSessionId);

Review Comment:
   For me, it was meant as an invalid state: The callback that's passed into 
the `runAsLeader` method should succeed in the happy scenario. Losing the 
leadership in the mean time is a an "unusual" state of the system. In this 
sense, I felt like exceptions being the most appropriate tool to express this.
   
   You're suggesting that we should make `runAsLeader` return 
`Completable<Boolean>` indicating whether the callback was executed or not? 
That would work as well. But from an interface perspective, `runAsLeader` is 
just one way to execute the callback. One could also (theoretically) think of a 
`supplyAsLeader` (as it's implemented in `JobMasterServiceLeadershipRunner` 
right now). In such a case, utilizing the return value of `CompletableFuture` 
and relying on an exception for leadership loss feels like the more natural 
thing to do.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java:
##########
@@ -244,38 +246,45 @@ private void removeContender(EmbeddedLeaderElection 
embeddedLeaderElection) {
     }
 
     /** Callback from leader contenders when they confirm a leader grant. */
+    @GuardedBy("lock")
     private void confirmLeader(
             final EmbeddedLeaderElection embeddedLeaderElection,
             final UUID leaderSessionId,
             final String leaderAddress) {
-        synchronized (lock) {
-            // if the leader election was shut down in the meantime, ignore 
this confirmation
-            if (!embeddedLeaderElection.running || shutdown) {
-                return;
-            }
-
-            try {
-                // check if the confirmation is for the same grant, or whether 
it is a stale grant
-                if (embeddedLeaderElection == currentLeaderProposed
-                        && currentLeaderSessionId.equals(leaderSessionId)) {
-                    LOG.info(
-                            "Received confirmation of leadership for leader {} 
, session={}",
-                            leaderAddress,
-                            leaderSessionId);
-
-                    // mark leadership
-                    currentLeaderConfirmed = embeddedLeaderElection;
-                    currentLeaderAddress = leaderAddress;
-                    currentLeaderProposed = null;
+        Preconditions.checkState(

Review Comment:
   This Precondition is there to indicate some bug. If the 
`EmbeddedLeaderService` isn't properly implemented, the whole setup shouldn't 
be trusted, no?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java:
##########
@@ -39,17 +44,37 @@ public interface LeaderElection extends AutoCloseable {
      *
      * @param leaderSessionID The new leader session ID
      * @param leaderAddress The address of the new leader
+     * @return A future that completes successfully if the confirmation 
succeeded or fails
+     *     exceptionally with a {@link LeadershipLostException} if the 
leadership was revoked in the
+     *     meantime.
+     */
+    default CompletableFuture<Void> confirmLeadershipAsLeader(
+            UUID leaderSessionID, String leaderAddress) {
+        checkNotNull(leaderSessionID);
+        checkNotNull(leaderAddress);
+
+        return runAsLeader(
+                leaderSessionID, () -> confirmLeadership(leaderSessionID, 
leaderAddress));
+    }
+
+    /**
+     * Runs the actual leadership confirmation. This method should only be 
called if the leadership
+     * is acquired. Otherwise, a {@link IllegalStateException} should be 
thrown.
      */
     void confirmLeadership(UUID leaderSessionID, String leaderAddress);
 
     /**
-     * Returns {@code true} if the service's {@link LeaderContender} has the 
leadership under the
-     * given leader session ID acquired.
+     * Runs the given {@code callback} on the leader election's main thread 
only if the associated
+     * {@code leaderSessionId} is still valid.
      *
-     * @param leaderSessionId identifying the current leader
-     * @return true if the associated {@link LeaderContender} is the leader, 
otherwise false
+     * @param leaderSessionId The session ID that's associated with the given 
{@code callback}.
+     * @param callback The callback that shall be executed as a leader.
+     * @return The future referring to the result of the callback operation. 
This future would
+     *     complete exceptionally with a {@link LeadershipLostException} if 
the leadership wasn't
+     *     active anymore.
      */
-    boolean hasLeadership(UUID leaderSessionId);
+    CompletableFuture<Void> runAsLeader(

Review Comment:
   > Is it that everything called in the LeaderContender callbacks must used 
this method?
   It feels very wrong that this method is used to access the JRS for example 
(even if that internally does things asynchronously), when that has little to 
do with leader election, which runs counter to the idea of running as few 
things as possible in the leader election executor.
   
   I guess you're right on the JRS access: That one can run even if the 
leadership is not acquired because we only read here (i.e. no side effects). 
   
   Even the `JobMasterServiceLeadershipRunner#createNewJobMasterServiceProcess` 
does not need to be protected by the leadership because we can start a new 
`JobMasterServiceProcess` even without being the leader. 
   
   The crucial part is that leadership is still acquired when the 
`JobMasterServiceProcess` instantiation is finalized and the future results are 
forwarded. This still needs to happen under leadership because it triggers side 
effects.
   
   `JobMasterServiceLeadershipRunner#handleJobAlreadyDoneAsLeader` is another 
operation that needs to run under leadership because it triggers side effects.
   
   > I also don't quite follow yet why such a big API change is necessary.
   Couldn't we have saved ourselves a lot of work by just changing 
hasLeadership to return a CompletableFuture<Boolean>?
   
   That's a good point. I thought about it and I think it wouldn't work: An 
asynchronous `hasLeadership` would then run on the leaderOperations thread (in 
the `DefaultLeaderElectionService`). But the chained callback that is called 
after the async call completes can run concurrently to a leadership revocation 
again. So that's not a viable solution.
   
   The operations that need to run under leadership need to be performed on the 
leaderOperations thread. These operations need to be light-weight (i.e. ideally 
only future completions). And some of the operations that are currently 
performed under leadership (like the JRS read trigger mentioned above) can be 
moved out of the leadership context.
   
   > Now we have this strange API where a sync confirmLeadership method exists 
but must basically never be called from one side because it risks dead-locks.
   
   Yeah, the synchronous `confirmLeadership` method in the `LeaderElection` 
interface doesn't feel right/inconsistent I have to admit. 🤔 The problem is 
that we need to check the running state when executing the leadership 
confirmation because the `JobMasterServiceLeadershipRunner#close` closes the 
`LeaderElection` outside of the lock. I.e. the confirm leadership call could be 
executed after the runner was already shut down.
   
   But looking at the `DefaultDispatcherRunner` implementation I notice that 
we're not relying on the lock for the leadership confirmation there despite 
having the same problem. Maybe, I have a misconception here. 🤔 
   
   > Can't we solve the lock acquisition problem by removing the need to call 
hasLeadership?
   We already have an API that signals whether we are the leader or not; can't 
we write that outside of the contender lock into an AtomicBoolean and use that 
instead (while ensuring that grant/revoke don't do any heavy lifting)?
   
   No, we need to run certain operations as the leader. Otherwise, it could 
happen that the job is marked as done despite some other leader picking up the 
work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to