XComp commented on code in PR #25679: URL: https://github.com/apache/flink/pull/25679#discussion_r1863572414
########## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java: ########## @@ -244,38 +246,45 @@ private void removeContender(EmbeddedLeaderElection embeddedLeaderElection) { } /** Callback from leader contenders when they confirm a leader grant. */ + @GuardedBy("lock") private void confirmLeader( final EmbeddedLeaderElection embeddedLeaderElection, final UUID leaderSessionId, final String leaderAddress) { - synchronized (lock) { - // if the leader election was shut down in the meantime, ignore this confirmation - if (!embeddedLeaderElection.running || shutdown) { - return; - } - - try { - // check if the confirmation is for the same grant, or whether it is a stale grant - if (embeddedLeaderElection == currentLeaderProposed - && currentLeaderSessionId.equals(leaderSessionId)) { - LOG.info( - "Received confirmation of leadership for leader {} , session={}", - leaderAddress, - leaderSessionId); - - // mark leadership - currentLeaderConfirmed = embeddedLeaderElection; - currentLeaderAddress = leaderAddress; - currentLeaderProposed = null; + Preconditions.checkState( + currentLeaderProposed == embeddedLeaderElection, + "The confirmLeader method should only be called when having the leadership acquired."); + LOG.info( + "Received confirmation of leadership for leader {} , session={}", + leaderAddress, + leaderSessionId); + + // mark leadership + currentLeaderConfirmed = embeddedLeaderElection; + currentLeaderAddress = leaderAddress; + currentLeaderProposed = null; + + // notify all listeners + notifyAllListeners(leaderAddress, leaderSessionId); + } - // notify all listeners - notifyAllListeners(leaderAddress, leaderSessionId); - } else { - LOG.debug( - "Received confirmation of leadership for a stale leadership grant. Ignoring."); + private void runAsLeader( + EmbeddedLeaderElection embeddedLeaderElection, + UUID leaderSessionId, + ThrowingRunnable<? extends Throwable> runnable) + throws LeadershipLostException { + synchronized (lock) { + if (embeddedLeaderElection.running + && !shutdown + && embeddedLeaderElection.isLeader + && currentLeaderSessionId.equals(leaderSessionId)) { + try { + runnable.run(); + } catch (Throwable t) { + fatalError(t); } - } catch (Throwable t) { - fatalError(t); + } else { + throw new LeadershipLostException(leaderSessionId); Review Comment: For me, it was meant as an invalid state: The callback that's passed into the `runAsLeader` method should succeed in the happy scenario. Losing the leadership in the mean time is a an "unusual" state of the system. In this sense, I felt like exceptions being the most appropriate tool to express this. You're suggesting that we should make `runAsLeader` return `Completable<Boolean>` indicating whether the callback was executed or not? That would work as well. But from an interface perspective, `runAsLeader` is just one way to execute the callback. One could also (theoretically) think of a `supplyAsLeader` (as it's implemented in `JobMasterServiceLeadershipRunner` right now). In such a case, utilizing the return value of `CompletableFuture` and relying on an exception for leadership loss feels like the more natural thing to do. ########## flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java: ########## @@ -244,38 +246,45 @@ private void removeContender(EmbeddedLeaderElection embeddedLeaderElection) { } /** Callback from leader contenders when they confirm a leader grant. */ + @GuardedBy("lock") private void confirmLeader( final EmbeddedLeaderElection embeddedLeaderElection, final UUID leaderSessionId, final String leaderAddress) { - synchronized (lock) { - // if the leader election was shut down in the meantime, ignore this confirmation - if (!embeddedLeaderElection.running || shutdown) { - return; - } - - try { - // check if the confirmation is for the same grant, or whether it is a stale grant - if (embeddedLeaderElection == currentLeaderProposed - && currentLeaderSessionId.equals(leaderSessionId)) { - LOG.info( - "Received confirmation of leadership for leader {} , session={}", - leaderAddress, - leaderSessionId); - - // mark leadership - currentLeaderConfirmed = embeddedLeaderElection; - currentLeaderAddress = leaderAddress; - currentLeaderProposed = null; + Preconditions.checkState( Review Comment: This Precondition is there to indicate some bug. If the `EmbeddedLeaderService` isn't properly implemented, the whole setup shouldn't be trusted, no? ########## flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java: ########## @@ -39,17 +44,37 @@ public interface LeaderElection extends AutoCloseable { * * @param leaderSessionID The new leader session ID * @param leaderAddress The address of the new leader + * @return A future that completes successfully if the confirmation succeeded or fails + * exceptionally with a {@link LeadershipLostException} if the leadership was revoked in the + * meantime. + */ + default CompletableFuture<Void> confirmLeadershipAsLeader( + UUID leaderSessionID, String leaderAddress) { + checkNotNull(leaderSessionID); + checkNotNull(leaderAddress); + + return runAsLeader( + leaderSessionID, () -> confirmLeadership(leaderSessionID, leaderAddress)); + } + + /** + * Runs the actual leadership confirmation. This method should only be called if the leadership + * is acquired. Otherwise, a {@link IllegalStateException} should be thrown. */ void confirmLeadership(UUID leaderSessionID, String leaderAddress); /** - * Returns {@code true} if the service's {@link LeaderContender} has the leadership under the - * given leader session ID acquired. + * Runs the given {@code callback} on the leader election's main thread only if the associated + * {@code leaderSessionId} is still valid. * - * @param leaderSessionId identifying the current leader - * @return true if the associated {@link LeaderContender} is the leader, otherwise false + * @param leaderSessionId The session ID that's associated with the given {@code callback}. + * @param callback The callback that shall be executed as a leader. + * @return The future referring to the result of the callback operation. This future would + * complete exceptionally with a {@link LeadershipLostException} if the leadership wasn't + * active anymore. */ - boolean hasLeadership(UUID leaderSessionId); + CompletableFuture<Void> runAsLeader( Review Comment: > Is it that everything called in the LeaderContender callbacks must used this method? It feels very wrong that this method is used to access the JRS for example (even if that internally does things asynchronously), when that has little to do with leader election, which runs counter to the idea of running as few things as possible in the leader election executor. I guess you're right on the JRS access: That one can run even if the leadership is not acquired because we only read here (i.e. no side effects). Even the `JobMasterServiceLeadershipRunner#createNewJobMasterServiceProcess` does not need to be protected by the leadership because we can start a new `JobMasterServiceProcess` even without being the leader. The crucial part is that leadership is still acquired when the `JobMasterServiceProcess` instantiation is finalized and the future results are forwarded. This still needs to happen under leadership because it triggers side effects. `JobMasterServiceLeadershipRunner#handleJobAlreadyDoneAsLeader` is another operation that needs to run under leadership because it triggers side effects. > I also don't quite follow yet why such a big API change is necessary. Couldn't we have saved ourselves a lot of work by just changing hasLeadership to return a CompletableFuture<Boolean>? That's a good point. I thought about it and I think it wouldn't work: An asynchronous `hasLeadership` would then run on the leaderOperations thread (in the `DefaultLeaderElectionService`). But the chained callback that is called after the async call completes can run concurrently to a leadership revocation again. So that's not a viable solution. The operations that need to run under leadership need to be performed on the leaderOperations thread. These operations need to be light-weight (i.e. ideally only future completions). And some of the operations that are currently performed under leadership (like the JRS read trigger mentioned above) can be moved out of the leadership context. > Now we have this strange API where a sync confirmLeadership method exists but must basically never be called from one side because it risks dead-locks. Yeah, the synchronous `confirmLeadership` method in the `LeaderElection` interface doesn't feel right/inconsistent I have to admit. 🤔 The problem is that we need to check the running state when executing the leadership confirmation because the `JobMasterServiceLeadershipRunner#close` closes the `LeaderElection` outside of the lock. I.e. the confirm leadership call could be executed after the runner was already shut down. But looking at the `DefaultDispatcherRunner` implementation I notice that we're not relying on the lock for the leadership confirmation there despite having the same problem. Maybe, I have a misconception here. 🤔 > Can't we solve the lock acquisition problem by removing the need to call hasLeadership? We already have an API that signals whether we are the leader or not; can't we write that outside of the contender lock into an AtomicBoolean and use that instead (while ensuring that grant/revoke don't do any heavy lifting)? No, we need to run certain operations as the leader. Otherwise, it could happen that the job is marked as done despite some other leader picking up the work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org