zentol commented on code in PR #25679:
URL: https://github.com/apache/flink/pull/25679#discussion_r1863208829


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java:
##########
@@ -39,17 +44,37 @@ public interface LeaderElection extends AutoCloseable {
      *
      * @param leaderSessionID The new leader session ID
      * @param leaderAddress The address of the new leader
+     * @return A future that completes successfully if the confirmation 
succeeded or fails
+     *     exceptionally with a {@link LeadershipLostException} if the 
leadership was revoked in the
+     *     meantime.
+     */
+    default CompletableFuture<Void> confirmLeadershipAsLeader(
+            UUID leaderSessionID, String leaderAddress) {
+        checkNotNull(leaderSessionID);
+        checkNotNull(leaderAddress);
+
+        return runAsLeader(
+                leaderSessionID, () -> confirmLeadership(leaderSessionID, 
leaderAddress));
+    }
+
+    /**
+     * Runs the actual leadership confirmation. This method should only be 
called if the leadership
+     * is acquired. Otherwise, a {@link IllegalStateException} should be 
thrown.
      */
     void confirmLeadership(UUID leaderSessionID, String leaderAddress);
 
     /**
-     * Returns {@code true} if the service's {@link LeaderContender} has the 
leadership under the
-     * given leader session ID acquired.
+     * Runs the given {@code callback} on the leader election's main thread 
only if the associated
+     * {@code leaderSessionId} is still valid.
      *
-     * @param leaderSessionId identifying the current leader
-     * @return true if the associated {@link LeaderContender} is the leader, 
otherwise false
+     * @param leaderSessionId The session ID that's associated with the given 
{@code callback}.
+     * @param callback The callback that shall be executed as a leader.
+     * @return The future referring to the result of the callback operation. 
This future would
+     *     complete exceptionally with a {@link LeadershipLostException} if 
the leadership wasn't
+     *     active anymore.
      */
-    boolean hasLeadership(UUID leaderSessionId);
+    CompletableFuture<Void> runAsLeader(

Review Comment:
   yeeeeah I'm not sold on this.
   
   I struggle to understand the contract as to what should be run via the 
method.
   
   Is it that everything called in the LeaderContender callbacks must used this 
method?
   It feels very wrong that this method is used to access the JRS for example 
(even if that internally does things asynchronously), when that has little to 
do with leader _election_, which runs counter to the idea of running as few 
things as possible in the leader election executor.
   
   I also don't quite follow yet why such a big API change is necessary.
   Couldn't we have saved ourselves a lot of work by just changing 
hasLeadership to return a `CompletableFuture<Boolean>`?
   Now we have this strange API where a sync confirmLeadership method exists 
but must basically never be called from one side because it risks dead-locks.
   
   Can't we solve the lock acquisition problem by removing the need to call 
hasLeadership?
   We already have an API that signals whether we are the leader or not; can't 
we write that outside of the contender lock into an AtomicBoolean and use that 
instead (while ensuring that grant/revoke don't do any heavy lifting)?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTestUtils.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class LeaderElectionTestUtils {
+
+    private LeaderElectionTestUtils() {}
+
+    public static void assertRunAsLeaderSuccessfully(
+            LeaderElection testInstance, UUID leaderSessionId) {
+        final AtomicBoolean callbackTriggered = new AtomicBoolean(false);
+        final CompletableFuture<Void> resultFuture =
+                testInstance.runAsLeader(leaderSessionId, () -> 
callbackTriggered.set(true));
+
+        assertThatFuture(resultFuture).eventuallySucceeds();
+        assertThat(callbackTriggered).isTrue();
+    }
+
+    public static void assertRunAsLeaderOmitsCallbackDueToLostLeadership(

Review Comment:
   ```suggestion
       public static void assertRunAsLeaderSkipsRunnableDueToLostLeadership(
   ```
   I think this is a bit clearer.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java:
##########
@@ -244,38 +246,45 @@ private void removeContender(EmbeddedLeaderElection 
embeddedLeaderElection) {
     }
 
     /** Callback from leader contenders when they confirm a leader grant. */
+    @GuardedBy("lock")
     private void confirmLeader(
             final EmbeddedLeaderElection embeddedLeaderElection,
             final UUID leaderSessionId,
             final String leaderAddress) {
-        synchronized (lock) {
-            // if the leader election was shut down in the meantime, ignore 
this confirmation
-            if (!embeddedLeaderElection.running || shutdown) {
-                return;
-            }
-
-            try {
-                // check if the confirmation is for the same grant, or whether 
it is a stale grant
-                if (embeddedLeaderElection == currentLeaderProposed
-                        && currentLeaderSessionId.equals(leaderSessionId)) {
-                    LOG.info(
-                            "Received confirmation of leadership for leader {} 
, session={}",
-                            leaderAddress,
-                            leaderSessionId);
-
-                    // mark leadership
-                    currentLeaderConfirmed = embeddedLeaderElection;
-                    currentLeaderAddress = leaderAddress;
-                    currentLeaderProposed = null;
+        Preconditions.checkState(

Review Comment:
   I'm quite worried about this because it'll just blow up an entire JobManager 
if leadership is lost at the wrong time, when HA would be capable of sorting 
this out on it's own.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java:
##########
@@ -108,6 +113,16 @@ void testHasLeadership() throws Exception {
         }
     }
 
+    private static void assertRunAsLeaderSuccessfully(

Review Comment:
   duplicate with LeaderElectionTestUtils



##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java:
##########
@@ -244,38 +246,45 @@ private void removeContender(EmbeddedLeaderElection 
embeddedLeaderElection) {
     }
 
     /** Callback from leader contenders when they confirm a leader grant. */
+    @GuardedBy("lock")
     private void confirmLeader(
             final EmbeddedLeaderElection embeddedLeaderElection,
             final UUID leaderSessionId,
             final String leaderAddress) {
-        synchronized (lock) {
-            // if the leader election was shut down in the meantime, ignore 
this confirmation
-            if (!embeddedLeaderElection.running || shutdown) {
-                return;
-            }
-
-            try {
-                // check if the confirmation is for the same grant, or whether 
it is a stale grant
-                if (embeddedLeaderElection == currentLeaderProposed
-                        && currentLeaderSessionId.equals(leaderSessionId)) {
-                    LOG.info(
-                            "Received confirmation of leadership for leader {} 
, session={}",
-                            leaderAddress,
-                            leaderSessionId);
-
-                    // mark leadership
-                    currentLeaderConfirmed = embeddedLeaderElection;
-                    currentLeaderAddress = leaderAddress;
-                    currentLeaderProposed = null;
+        Preconditions.checkState(
+                currentLeaderProposed == embeddedLeaderElection,
+                "The confirmLeader method should only be called when having 
the leadership acquired.");
+        LOG.info(
+                "Received confirmation of leadership for leader {} , 
session={}",
+                leaderAddress,
+                leaderSessionId);
+
+        // mark leadership
+        currentLeaderConfirmed = embeddedLeaderElection;
+        currentLeaderAddress = leaderAddress;
+        currentLeaderProposed = null;
+
+        // notify all listeners
+        notifyAllListeners(leaderAddress, leaderSessionId);
+    }
 
-                    // notify all listeners
-                    notifyAllListeners(leaderAddress, leaderSessionId);
-                } else {
-                    LOG.debug(
-                            "Received confirmation of leadership for a stale 
leadership grant. Ignoring.");
+    private void runAsLeader(
+            EmbeddedLeaderElection embeddedLeaderElection,
+            UUID leaderSessionId,
+            ThrowingRunnable<? extends Throwable> runnable)
+            throws LeadershipLostException {
+        synchronized (lock) {
+            if (embeddedLeaderElection.running
+                    && !shutdown
+                    && embeddedLeaderElection.isLeader
+                    && currentLeaderSessionId.equals(leaderSessionId)) {
+                try {
+                    runnable.run();
+                } catch (Throwable t) {
+                    fatalError(t);
                 }
-            } catch (Throwable t) {
-                fatalError(t);
+            } else {
+                throw new LeadershipLostException(leaderSessionId);

Review Comment:
   I'm not quite sold on this. It feels like we're using a special exception to 
do control flow (such that users can ignore certain exceptions).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to