tillrohrmann commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r512791010



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
##########
@@ -140,15 +161,22 @@
                                                if (throwable instanceof 
CancellationException) {
                                                        
resultFuture.completeExceptionally(new RetryException("Operation future was 
cancelled.", throwable));
                                                } else {
-                                                       if (retries > 0) {
-                                                               retryOperation(
-                                                                       
resultFuture,
-                                                                       
operation,
-                                                                       retries 
- 1,
-                                                                       
executor);
+                                                       throwable = 
ExceptionUtils.stripExecutionException(throwable);
+                                                       if 
(!retryPredicate.test(throwable)) {
+                                                               
resultFuture.completeExceptionally(throwable);

Review comment:
       Maybe fail with `RetryException("Stopped retrying the operation because 
the error is not retriable.", throwable)`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriverFactory.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+/**
+ * Factory for creating {@link LeaderElectionDriver} with different 
implementation.
+ */
+public interface LeaderElectionDriverFactory {
+
+       /**
+        * Create a specific {@link LeaderElectionDriver} and start the 
necessary services. For example, LeaderLatch
+        * and NodeCache in Zookeeper, KubernetesLeaderElector and ConfigMap 
watcher in Kubernetes.
+        *
+        * @param leaderEventHandler handler for the leader election driver to 
process leader events.
+        * @param leaderContenderDescription leader contender description.

Review comment:
       `@throws` is missing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+/**
+ * Interface which should be implemented to response to {@link 
LeaderInformation} changes in

Review comment:
       typo: response -> respond

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
##########
@@ -109,9 +109,28 @@
                        final int retries,
                        final Executor executor) {
 
+               return retry(operation, retries, ignore -> true, executor);
+       }
+
+       /**
+        * Retry the given operation the given number of times in case of a 
failure only when an exception is retryable.
+        *
+        * @param operation to executed
+        * @param retries if the operation failed
+        * @param retryPredicate Predicate to test whether an exception is 
retryable
+        * @param executor to use to run the futures
+        * @param <T> type of the result
+        * @return Future containing either the result of the operation or a 
{@link RetryException}
+        */
+       public static <T> CompletableFuture<T> retry(
+               final Supplier<CompletableFuture<T>> operation,
+               final int retries,
+               final Predicate<Throwable> retryPredicate,
+               final Executor executor) {
+

Review comment:
       This is a very nice solution to the problem @wangyang0918. Well done!

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalService.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The counterpart to the {@link 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService}.
+ * Composed with different {@link LeaderRetrievalDriver}, we could retrieve 
the leader information from
+ * different storage. The leader address as well as the current leader session 
ID will be retrieved from
+ * {@link LeaderRetrievalDriver}.
+ */
+public class DefaultLeaderRetrievalService implements LeaderRetrievalService, 
LeaderRetrievalEventHandler {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderRetrievalService.class);
+
+       private final Object lock = new Object();
+
+       private final LeaderRetrievalDriverFactory leaderRetrievalDriverFactory;
+
+       @GuardedBy("lock")
+       private String lastLeaderAddress;
+
+       @GuardedBy("lock")
+       private UUID lastLeaderSessionID;

Review comment:
       `@Nullable` is missing

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalEventHandler.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
+
+/**
+ * Interface which should be implemented to notify to {@link 
LeaderInformation} changes in
+ * {@link LeaderRetrievalDriver}.
+ */
+public interface LeaderRetrievalEventHandler {
+
+       /**
+        * Called by specific {@link LeaderRetrievalDriver} to notify leader 
address.

Review comment:
       line break missing to separate `@param`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
+import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The counterpart to the {@link ZooKeeperLeaderElectionDriver}.
+ * {@link LeaderRetrievalService} implementation for Zookeeper. It retrieves 
the current leader which has
+ * been elected by the {@link ZooKeeperLeaderElectionDriver}.
+ * The leader address as well as the current leader session ID is retrieved 
from ZooKeeper.
+ */
+public class ZooKeeperLeaderRetrievalDriver implements LeaderRetrievalDriver, 
NodeCacheListener, UnhandledErrorListener {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperLeaderRetrievalDriver.class);
+
+       /** Connection to the used ZooKeeper quorum. */
+       private final CuratorFramework client;
+
+       /** Curator recipe to watch changes of a specific ZooKeeper node. */
+       private final NodeCache cache;
+
+       private final String retrievalPath;
+
+       private final ConnectionStateListener connectionStateListener = 
(client, newState) -> handleStateChange(newState);
+
+       private final LeaderRetrievalEventHandler leaderRetrievalEventHandler;
+
+       private volatile boolean running;
+
+       /**
+        * Creates a leader retrieval service which uses ZooKeeper to retrieve 
the leader information.
+        *
+        * @param client Client which constitutes the connection to the 
ZooKeeper quorum
+        * @param retrievalPath Path of the ZooKeeper node which contains the 
leader information
+        * @param leaderRetrievalEventHandler handler to notify the leader 
changes.
+        */
+       public ZooKeeperLeaderRetrievalDriver(
+                       CuratorFramework client,
+                       String retrievalPath,
+                       LeaderRetrievalEventHandler 
leaderRetrievalEventHandler) throws Exception {
+               this.client = checkNotNull(client, "CuratorFramework client");
+               this.cache = new NodeCache(client, retrievalPath);
+               this.retrievalPath = checkNotNull(retrievalPath);
+
+               this.leaderRetrievalEventHandler = 
checkNotNull(leaderRetrievalEventHandler);
+
+               client.getUnhandledErrorListenable().addListener(this);
+               cache.getListenable().addListener(this);
+               cache.start();
+
+               
client.getConnectionStateListenable().addListener(connectionStateListener);
+
+               running = true;
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (!running) {
+                       return;
+               }
+
+               running = false;

Review comment:
       I'd suggest to introduce a `lock` to guard access to running (also at 
other places where we access this field). That way we will make sure that after 
`close` is called that we won't see any calls coming from the `ZooKeeper` 
threads.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalEventHandler.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
+
+/**
+ * Interface which should be implemented to notify to {@link 
LeaderInformation} changes in
+ * {@link LeaderRetrievalDriver}.
+ */
+public interface LeaderRetrievalEventHandler {
+
+       /**
+        * Called by specific {@link LeaderRetrievalDriver} to notify leader 
address.
+        * @param leaderInformation the new leader information to notify {@link 
DefaultLeaderRetrievalService}. It could be
+        * {@link LeaderInformation#empty()} if the leader address does not 
exist in the external storage.
+        */
+       void notifyLeaderAddress(LeaderInformation leaderInformation);
+
+       /**
+        * Handle error by specific {@link LeaderRetrievalDriver}.

Review comment:
       line break is missing

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingRetrievalBase.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Base class which provides some convenience functions for testing purposes 
of {@link LeaderRetrievalListener} and
+ * {@link 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler}.
+ */
+public class TestingRetrievalBase {
+
+       protected final Logger logger = LoggerFactory.getLogger(getClass());
+       private final Object lock = new Object();
+
+       private String address;
+       private String oldAddress;
+       private UUID leaderSessionID;
+       private Exception exception;
+
+
+       public String getAddress() {
+               return address;
+       }
+
+       public UUID getLeaderSessionID() {
+               return leaderSessionID;
+       }
+
+       public String waitForNewLeader(long timeout) throws Exception {
+               long start = System.currentTimeMillis();
+               long curTimeout;
+
+               synchronized (lock) {
+                       while (
+                               exception == null &&
+                                       (address == null || 
address.equals(oldAddress)) &&
+                                       (curTimeout = timeout - 
System.currentTimeMillis() + start) > 0) {
+                               try {
+                                       lock.wait(curTimeout);
+                               } catch (InterruptedException e) {
+                                       // we got interrupted so check again 
for the condition
+                               }
+                       }
+               }

Review comment:
       I am wondering whether we can't solve this problem a bit more elegantly 
using a blocking array queue.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+/**
+ * Interface which should be implemented to response to {@link 
LeaderInformation} changes in
+ * {@link LeaderElectionDriver}.
+ */
+public interface LeaderElectionEventHandler {
+
+       /**
+        * Called by specific {@link LeaderElectionDriver} when the leadership 
is granted.
+        */
+       void onGrantLeadership();
+
+       /**
+        * Called by specific {@link LeaderElectionDriver} when the leadership 
is revoked.
+        */
+       void onRevokeLeadership();
+
+       /**
+        * Called by specific {@link LeaderElectionDriver} when the leader 
information is changed. Then the
+        * {@link DefaultLeaderElectionService} could write the leader 
information again if necessary.
+        * @param leaderInformation leader information which contains leader 
session id and leader address.

Review comment:
       Will this method only called if one is still the leader or will the 
implementor be responsible for ensuring that we are still the leader? I think 
this kind of contracts need to be stated so that future implementations can 
respect it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+/**
+ * Interface which should be implemented to response to {@link 
LeaderInformation} changes in
+ * {@link LeaderElectionDriver}.
+ */
+public interface LeaderElectionEventHandler {
+
+       /**
+        * Called by specific {@link LeaderElectionDriver} when the leadership 
is granted.
+        */
+       void onGrantLeadership();
+
+       /**
+        * Called by specific {@link LeaderElectionDriver} when the leadership 
is revoked.
+        */
+       void onRevokeLeadership();
+
+       /**
+        * Called by specific {@link LeaderElectionDriver} when the leader 
information is changed. Then the
+        * {@link DefaultLeaderElectionService} could write the leader 
information again if necessary.
+        * @param leaderInformation leader information which contains leader 
session id and leader address.
+        */
+       void onLeaderInformationChange(LeaderInformation leaderInformation);
+
+       /**
+        * Handle error by specific {@link LeaderElectionDriver}.
+        * @param ex exception to be handled.
+        */
+       void handleError(Exception ex);

Review comment:
       Theoretically this does not need to be part of the 
`LeaderElectionEventHandler` and one could instead pass in a 
`FatalErrorHandler` to the `LeaderElectionDriver`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderInformation.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * Information about leader including the confirmed leader session id and 
leader address.
+ */
+public class LeaderInformation implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       @Nullable
+       private final UUID leaderSessionID;
+
+       @Nullable
+       private final String leaderAddress;
+
+       private static final LeaderInformation EMPTY = new 
LeaderInformation(null, null);
+
+       public LeaderInformation(@Nullable UUID leaderSessionID, @Nullable 
String leaderAddress) {

Review comment:
       Let's make this private and introduce two factory methods `known(String, 
UUID)` and `empty()`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+/**
+ * A {@link LeaderRetrievalDriver} is responsible for retrieves the current 
leader which has been elected by the
+ * {@link org.apache.flink.runtime.leaderelection.LeaderElectionDriver}.
+ */
+public interface LeaderRetrievalDriver extends AutoCloseable {
+
+       /**
+        * Close the services used for leader retrieval.
+        */
+       void close() throws Exception;

Review comment:
       Shouldn't this also be defined in `AutoCloseable`? Maybe 
`LeaderRetrievalDriver` does not need to extend from `AutoCloseable`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalService.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The counterpart to the {@link 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService}.
+ * Composed with different {@link LeaderRetrievalDriver}, we could retrieve 
the leader information from
+ * different storage. The leader address as well as the current leader session 
ID will be retrieved from
+ * {@link LeaderRetrievalDriver}.
+ */
+public class DefaultLeaderRetrievalService implements LeaderRetrievalService, 
LeaderRetrievalEventHandler {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderRetrievalService.class);
+
+       private final Object lock = new Object();
+
+       private final LeaderRetrievalDriverFactory leaderRetrievalDriverFactory;
+
+       @GuardedBy("lock")
+       private String lastLeaderAddress;

Review comment:
       `@Nullable` missing

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+/**
+ * Interface which should be implemented to response to {@link 
LeaderInformation} changes in
+ * {@link LeaderElectionDriver}.

Review comment:
       Maybe one could say that it reacts also to leader changes and not only 
to leader information changes.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link DefaultLeaderElectionService}.
+ */
+public class DefaultLeaderElectionServiceTest {

Review comment:
       `extends TestLogger` is missing

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link LeaderElectionDriver} implementation for Zookeeper. The leading 
JobManager is elected using
+ * ZooKeeper. The current leader's address as well as its leader session ID is 
published via
+ * ZooKeeper.
+ */
+public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, 
LeaderLatchListener, NodeCacheListener, UnhandledErrorListener {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class);
+
+       /** Client to the ZooKeeper quorum. */
+       private final CuratorFramework client;
+
+       /** Curator recipe for leader election. */
+       private final LeaderLatch leaderLatch;
+
+       /** Curator recipe to watch a given ZooKeeper node for changes. */
+       private final NodeCache cache;
+
+       /** ZooKeeper path of the node which stores the current leader 
information. */
+       private final String leaderPath;
+
+       private final ConnectionStateListener listener = (client, newState) -> 
handleStateChange(newState);
+
+       private final LeaderElectionEventHandler leaderElectionEventHandler;
+
+       private final String leaderContenderDescription;
+
+       private volatile boolean running;
+
+       /**
+        * Creates a ZooKeeperLeaderElectionDriver object.
+        *
+        * @param client Client which is connected to the ZooKeeper quorum
+        * @param latchPath ZooKeeper node path for the leader election latch
+        * @param leaderPath ZooKeeper node path for the node which stores the 
current leader information
+        * @param leaderElectionEventHandler event handler for processing 
leader change events
+        * @param leaderContenderDescription leader contender description
+        */
+       public ZooKeeperLeaderElectionDriver(
+                       CuratorFramework client,
+                       String latchPath,
+                       String leaderPath,
+                       LeaderElectionEventHandler leaderElectionEventHandler,
+                       String leaderContenderDescription) throws Exception {
+               this.client = checkNotNull(client);
+               this.leaderPath = checkNotNull(leaderPath);
+               this.leaderElectionEventHandler = 
checkNotNull(leaderElectionEventHandler);
+               this.leaderContenderDescription = 
checkNotNull(leaderContenderDescription);
+
+               leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
+               cache = new NodeCache(client, leaderPath);
+
+               client.getUnhandledErrorListenable().addListener(this);
+
+               leaderLatch.addListener(this);
+               leaderLatch.start();
+
+               cache.getListenable().addListener(this);
+               cache.start();
+
+               client.getConnectionStateListenable().addListener(listener);
+
+               running = true;
+       }
+
+       @Override
+       public void close() throws Exception{
+               if (!running) {
+                       return;
+               }
+               running = false;
+
+               LOG.info("Closing {}", this);
+
+               client.getUnhandledErrorListenable().removeListener(this);
+
+               client.getConnectionStateListenable().removeListener(listener);
+
+               Exception exception = null;
+
+               try {
+                       cache.close();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
+                       leaderLatch.close();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               if (exception != null) {
+                       throw new Exception("Could not properly stop the 
ZooKeeperLeaderElectionDriver.", exception);
+               }
+       }
+
+       @Override
+       public boolean hasLeadership() {
+               return leaderLatch.hasLeadership();
+       }
+
+       @Override
+       public void isLeader() {
+               leaderElectionEventHandler.onGrantLeadership();
+       }
+
+       @Override
+       public void notLeader() {
+               leaderElectionEventHandler.onRevokeLeadership();
+       }
+
+       @Override
+       public void nodeChanged() throws Exception {
+               if (leaderLatch.hasLeadership()) {
+                       if (running) {
+                               ChildData childData = cache.getCurrentData();
+                               if (childData != null) {
+                                       final byte[] data = childData.getData();
+                                       if (data != null && data.length > 0) {
+                                               final ByteArrayInputStream bais 
= new ByteArrayInputStream(data);
+                                               final ObjectInputStream ois = 
new ObjectInputStream(bais);
+
+                                               final String leaderAddress = 
ois.readUTF();
+                                               final UUID leaderSessionID = 
(UUID) ois.readObject();
+
+                                               
leaderElectionEventHandler.onLeaderInformationChange(
+                                                       new 
LeaderInformation(leaderSessionID, leaderAddress));
+                                               return;
+                                       }
+                               }
+                               
leaderElectionEventHandler.onLeaderInformationChange(LeaderInformation.empty());
+                       } else {
+                               LOG.debug("Ignoring node change notification 
since the service has already been stopped.");
+                       }
+               }
+       }
+
+       /**
+        * Writes the current leader's address as well the given leader session 
ID to ZooKeeper.
+        */
+       @Override
+       public void writeLeaderInformation(LeaderInformation leaderInformation) 
{
+               // this method does not have to be synchronized because the 
curator framework client
+               // is thread-safe
+               final UUID confirmedLeaderSessionID = 
leaderInformation.getLeaderSessionID();
+               final String confirmedLeaderAddress = 
leaderInformation.getLeaderAddress();
+               try {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug(
+                                       "Write leader information: Leader={}, 
session ID={}.",
+                                       confirmedLeaderAddress,
+                                       confirmedLeaderSessionID);
+                       }
+                       final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                       final ObjectOutputStream oos = new 
ObjectOutputStream(baos);
+
+                       oos.writeUTF(confirmedLeaderAddress);
+                       oos.writeObject(confirmedLeaderSessionID);
+
+                       oos.close();
+
+                       boolean dataWritten = false;
+
+                       while (!dataWritten && leaderLatch.hasLeadership()) {
+                               Stat stat = 
client.checkExists().forPath(leaderPath);
+
+                               if (stat != null) {
+                                       long owner = stat.getEphemeralOwner();
+                                       long sessionID = 
client.getZookeeperClient().getZooKeeper().getSessionId();
+
+                                       if (owner == sessionID) {
+                                               try {
+                                                       
client.setData().forPath(leaderPath, baos.toByteArray());
+
+                                                       dataWritten = true;
+                                               } catch 
(KeeperException.NoNodeException noNode) {
+                                                       // node was deleted in 
the meantime
+                                               }
+                                       } else {
+                                               try {
+                                                       
client.delete().forPath(leaderPath);
+                                               } catch 
(KeeperException.NoNodeException noNode) {
+                                                       // node was deleted in 
the meantime --> try again
+                                               }
+                                       }
+                               } else {
+                                       try {
+                                               
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
+                                                               leaderPath,
+                                                               
baos.toByteArray());
+
+                                               dataWritten = true;
+                                       } catch 
(KeeperException.NodeExistsException nodeExists) {
+                                               // node has been created in the 
meantime --> try again
+                                       }
+                               }
+                       }
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug(
+                                       "Successfully wrote leader information: 
Leader={}, session ID={}.",
+                                       confirmedLeaderAddress,
+                                       confirmedLeaderSessionID);
+                       }
+               } catch (Exception e) {
+                       leaderElectionEventHandler.handleError(
+                                       new Exception("Could not write leader 
address and leader session ID to " +
+                                                       "ZooKeeper.", e));
+               }
+       }
+
+       private void handleStateChange(ConnectionState newState) {
+               switch (newState) {
+                       case CONNECTED:
+                               LOG.debug("Connected to ZooKeeper quorum. 
Leader election can start.");
+                               break;
+                       case SUSPENDED:
+                               LOG.warn("Connection to ZooKeeper suspended. 
The contender " + leaderContenderDescription
+                                       + " no longer participates in the 
leader election.");
+                               break;
+                       case RECONNECTED:
+                               LOG.info("Connection to ZooKeeper was 
reconnected. Leader election can be restarted.");
+                               break;
+                       case LOST:
+                               // Maybe we have to throw an exception here to 
terminate the JobManager
+                               LOG.warn("Connection to ZooKeeper lost. The 
contender " + leaderContenderDescription
+                                       + " no longer participates in the 
leader election.");
+                               break;
+               }
+       }
+
+       @Override
+       public void unhandledError(String message, Throwable e) {
+               leaderElectionEventHandler.handleError(
+                       new FlinkException("Unhandled error in 
ZooKeeperLeaderElectionDriver: " + message, e));
+       }
+
+       @Override
+       public String toString() {
+               return "ZooKeeperLeaderElectionDriver{" +
+                       "leaderPath='" + leaderPath + '\'' +
+                       '}';
+       }
+}

Review comment:
       Very nice! I like the implementation and I think that it is now quite 
clear and easy to understand what is happening :-)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalEventHandler.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
+
+/**
+ * Interface which should be implemented to notify to {@link 
LeaderInformation} changes in
+ * {@link LeaderRetrievalDriver}.
+ */
+public interface LeaderRetrievalEventHandler {
+
+       /**
+        * Called by specific {@link LeaderRetrievalDriver} to notify leader 
address.
+        * @param leaderInformation the new leader information to notify {@link 
DefaultLeaderRetrievalService}. It could be

Review comment:
       Usually one does not refers to implementations in the interface 
definition because the interface does not know about them.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link DefaultLeaderElectionService}.
+ */
+public class DefaultLeaderElectionServiceTest {
+
+       private static final String TEST_URL = "akka//user/jobmanager";
+       private static final long timeout = 30L * 1000L;
+
+       @Test
+       public void testOnGrantAndRevokeLeadership() throws Exception {
+               final 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
testingLeaderElectionDriverFactory =
+                       new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+               final DefaultLeaderElectionService leaderElectionService = new 
DefaultLeaderElectionService(
+                       testingLeaderElectionDriverFactory);
+               final TestingContender testingContender = new 
TestingContender(TEST_URL, leaderElectionService);
+               leaderElectionService.start(testingContender);
+
+               // grant leadership
+               final TestingLeaderElectionDriver testingLeaderElectionDriver =
+                       
testingLeaderElectionDriverFactory.getCurrentLeaderDriver();
+               assertThat(testingLeaderElectionDriver, is(notNullValue()));
+               testingLeaderElectionDriver.isLeader();
+
+               testingContender.waitForLeader(timeout);
+               assertThat(testingContender.isLeader(), is(true));
+               assertThat(testingContender.getDescription(), is(TEST_URL));
+
+               // Check the external storage
+               
assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(),
 is(TEST_URL));
+
+               // revoke leadership
+               testingLeaderElectionDriver.notLeader();
+               testingContender.waitForRevokeLeader(timeout);
+               assertThat(testingContender.isLeader(), is(false));
+
+               leaderElectionService.stop();
+       }
+
+       @Test
+       public void testLeaderInformationChangedAndShouldBeCorrected() throws 
Exception {
+               final 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
testingLeaderElectionDriverFactory =
+                       new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+               final DefaultLeaderElectionService leaderElectionService = new 
DefaultLeaderElectionService(
+                       testingLeaderElectionDriverFactory);
+               final TestingContender testingContender = new 
TestingContender(TEST_URL, leaderElectionService);
+               leaderElectionService.start(testingContender);
+
+               final TestingLeaderElectionDriver testingLeaderElectionDriver =
+                       
testingLeaderElectionDriverFactory.getCurrentLeaderDriver();
+               assertThat(testingLeaderElectionDriver, is(notNullValue()));
+               testingLeaderElectionDriver.isLeader();
+               testingContender.waitForLeader(timeout);
+
+               // Leader information changed and should be corrected
+               
testingLeaderElectionDriver.leaderInformationChanged(LeaderInformation.empty());
+               
assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(),
 is(TEST_URL));

Review comment:
       Shouldn't we also check that the leader session id stays the same?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
+import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The counterpart to the {@link ZooKeeperLeaderElectionDriver}.
+ * {@link LeaderRetrievalService} implementation for Zookeeper. It retrieves 
the current leader which has
+ * been elected by the {@link ZooKeeperLeaderElectionDriver}.
+ * The leader address as well as the current leader session ID is retrieved 
from ZooKeeper.
+ */
+public class ZooKeeperLeaderRetrievalDriver implements LeaderRetrievalDriver, 
NodeCacheListener, UnhandledErrorListener {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperLeaderRetrievalDriver.class);
+
+       /** Connection to the used ZooKeeper quorum. */
+       private final CuratorFramework client;
+
+       /** Curator recipe to watch changes of a specific ZooKeeper node. */
+       private final NodeCache cache;
+
+       private final String retrievalPath;
+
+       private final ConnectionStateListener connectionStateListener = 
(client, newState) -> handleStateChange(newState);
+
+       private final LeaderRetrievalEventHandler leaderRetrievalEventHandler;
+
+       private volatile boolean running;
+
+       /**
+        * Creates a leader retrieval service which uses ZooKeeper to retrieve 
the leader information.
+        *
+        * @param client Client which constitutes the connection to the 
ZooKeeper quorum
+        * @param retrievalPath Path of the ZooKeeper node which contains the 
leader information
+        * @param leaderRetrievalEventHandler handler to notify the leader 
changes.
+        */
+       public ZooKeeperLeaderRetrievalDriver(
+                       CuratorFramework client,
+                       String retrievalPath,
+                       LeaderRetrievalEventHandler 
leaderRetrievalEventHandler) throws Exception {
+               this.client = checkNotNull(client, "CuratorFramework client");
+               this.cache = new NodeCache(client, retrievalPath);
+               this.retrievalPath = checkNotNull(retrievalPath);
+
+               this.leaderRetrievalEventHandler = 
checkNotNull(leaderRetrievalEventHandler);
+
+               client.getUnhandledErrorListenable().addListener(this);
+               cache.getListenable().addListener(this);
+               cache.start();
+
+               
client.getConnectionStateListenable().addListener(connectionStateListener);
+
+               running = true;
+       }
+
+       @Override
+       public void close() throws Exception {
+               if (!running) {
+                       return;
+               }
+
+               running = false;
+
+               LOG.info("Closing {}.", this);
+
+               client.getUnhandledErrorListenable().removeListener(this);
+               
client.getConnectionStateListenable().removeListener(connectionStateListener);
+
+               try {
+                       cache.close();
+               } catch (IOException e) {
+                       throw new Exception("Could not properly stop the 
ZooKeeperLeaderRetrievalDriver.", e);
+               }
+       }
+
+       @Override
+       public void nodeChanged() {
+               retrieveLeaderInformationFromZooKeeper();
+       }
+
+       private void retrieveLeaderInformationFromZooKeeper() {
+               if (running) {
+                       try {
+                               LOG.debug("Leader node has changed.");
+
+                               ChildData childData = cache.getCurrentData();
+
+                               String leaderAddress;
+                               UUID leaderSessionID;
+
+                               if (childData == null) {
+                                       leaderAddress = null;
+                                       leaderSessionID = null;
+                               } else {
+                                       byte[] data = childData.getData();
+
+                                       if (data == null || data.length == 0) {
+                                               leaderAddress = null;
+                                               leaderSessionID = null;
+                                       } else {
+                                               ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                                               ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                                               leaderAddress = ois.readUTF();
+                                               leaderSessionID = (UUID) 
ois.readObject();
+                                       }
+                               }
+
+                               
leaderRetrievalEventHandler.notifyLeaderAddress(new 
LeaderInformation(leaderSessionID, leaderAddress));
+                       } catch (Exception e) {
+                               leaderRetrievalEventHandler.handleError(new 
Exception("Could not handle node changed event.", e));
+                               ExceptionUtils.checkInterrupted(e);
+                       }
+               } else {
+                       LOG.debug("Ignoring node change notification since the 
service has already been stopped.");
+               }
+       }
+
+       private void handleStateChange(ConnectionState newState) {
+               switch (newState) {
+                       case CONNECTED:
+                               LOG.debug("Connected to ZooKeeper quorum. 
Leader retrieval can start.");
+                               break;
+                       case SUSPENDED:
+                               LOG.warn("Connection to ZooKeeper suspended. 
Can no longer retrieve the leader from " +
+                                       "ZooKeeper.");
+                               
leaderRetrievalEventHandler.notifyLeaderAddress(LeaderInformation.empty());

Review comment:
       I think we should check whether we are still running before calling this 
method.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link DefaultLeaderElectionService}.
+ */
+public class DefaultLeaderElectionServiceTest {
+
+       private static final String TEST_URL = "akka//user/jobmanager";
+       private static final long timeout = 30L * 1000L;
+
+       @Test
+       public void testOnGrantAndRevokeLeadership() throws Exception {
+               final 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
testingLeaderElectionDriverFactory =
+                       new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+               final DefaultLeaderElectionService leaderElectionService = new 
DefaultLeaderElectionService(
+                       testingLeaderElectionDriverFactory);
+               final TestingContender testingContender = new 
TestingContender(TEST_URL, leaderElectionService);
+               leaderElectionService.start(testingContender);
+
+               // grant leadership
+               final TestingLeaderElectionDriver testingLeaderElectionDriver =
+                       
testingLeaderElectionDriverFactory.getCurrentLeaderDriver();
+               assertThat(testingLeaderElectionDriver, is(notNullValue()));
+               testingLeaderElectionDriver.isLeader();
+
+               testingContender.waitForLeader(timeout);
+               assertThat(testingContender.isLeader(), is(true));
+               assertThat(testingContender.getDescription(), is(TEST_URL));
+
+               // Check the external storage
+               
assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(),
 is(TEST_URL));
+
+               // revoke leadership
+               testingLeaderElectionDriver.notLeader();
+               testingContender.waitForRevokeLeader(timeout);
+               assertThat(testingContender.isLeader(), is(false));
+
+               leaderElectionService.stop();
+       }
+
+       @Test
+       public void testLeaderInformationChangedAndShouldBeCorrected() throws 
Exception {
+               final 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
testingLeaderElectionDriverFactory =
+                       new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+               final DefaultLeaderElectionService leaderElectionService = new 
DefaultLeaderElectionService(
+                       testingLeaderElectionDriverFactory);
+               final TestingContender testingContender = new 
TestingContender(TEST_URL, leaderElectionService);
+               leaderElectionService.start(testingContender);
+
+               final TestingLeaderElectionDriver testingLeaderElectionDriver =
+                       
testingLeaderElectionDriverFactory.getCurrentLeaderDriver();
+               assertThat(testingLeaderElectionDriver, is(notNullValue()));
+               testingLeaderElectionDriver.isLeader();
+               testingContender.waitForLeader(timeout);
+
+               // Leader information changed and should be corrected
+               
testingLeaderElectionDriver.leaderInformationChanged(LeaderInformation.empty());
+               
assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(),
 is(TEST_URL));
+
+               testingLeaderElectionDriver.leaderInformationChanged(
+                       new LeaderInformation(UUID.randomUUID(), 
"faulty-address"));
+               
assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(),
 is(TEST_URL));

Review comment:
       Same here with the leader session id.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+/**
+ * Base class which provides some convenience functions for testing purposes 
of {@link LeaderContender} and
+ * {@link LeaderElectionEventHandler}.
+ */
+public class TestingLeaderBase {
+
+       protected boolean leader = false;
+       protected Throwable error = null;
+
+       protected final Object lock = new Object();
+       private final Object errorLock = new Object();
+
+       /**
+        * Waits until the contender becomes the leader or until the timeout 
has been exceeded.
+        *
+        * @param timeout
+        * @throws TimeoutException
+        */
+       public void waitForLeader(long timeout) throws TimeoutException {
+               waitFor(this::isLeader, timeout, "Contender was not elected as 
the leader within " + timeout + "ms");
+       }
+
+       /**
+        * Waits until the contender revokes the leader or until the timeout 
has been exceeded.
+        *
+        * @param timeout
+        * @throws TimeoutException
+        */
+       public void waitForRevokeLeader(long timeout) throws TimeoutException {
+               waitFor(() -> !isLeader(), timeout, "Contender was not revoked 
within " + timeout + "ms");
+       }
+
+       protected void waitFor(Supplier<Boolean> supplier, long timeout, String 
msg) throws TimeoutException {
+               long start = System.currentTimeMillis();
+               long curTimeout;
+
+               while (!supplier.get() && (curTimeout = timeout - 
System.currentTimeMillis() + start) > 0) {
+                       synchronized (lock) {
+                               try {
+                                       lock.wait(curTimeout);
+                               } catch (InterruptedException e) {
+                                       // we got interrupted so check again 
for the condition
+                               }
+                       }
+               }
+
+               if (!supplier.get()) {
+                       throw new TimeoutException(msg);
+               }

Review comment:
       I am wondering whether we can't solve this problem a bit more elegantly 
using a blocking array queue.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link LeaderElectionDriver} implementation for Zookeeper. The leading 
JobManager is elected using
+ * ZooKeeper. The current leader's address as well as its leader session ID is 
published via
+ * ZooKeeper.
+ */
+public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, 
LeaderLatchListener, NodeCacheListener, UnhandledErrorListener {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class);
+
+       /** Client to the ZooKeeper quorum. */
+       private final CuratorFramework client;
+
+       /** Curator recipe for leader election. */
+       private final LeaderLatch leaderLatch;
+
+       /** Curator recipe to watch a given ZooKeeper node for changes. */
+       private final NodeCache cache;
+
+       /** ZooKeeper path of the node which stores the current leader 
information. */
+       private final String leaderPath;
+
+       private final ConnectionStateListener listener = (client, newState) -> 
handleStateChange(newState);
+
+       private final LeaderElectionEventHandler leaderElectionEventHandler;
+
+       private final String leaderContenderDescription;
+
+       private volatile boolean running;
+
+       /**
+        * Creates a ZooKeeperLeaderElectionDriver object.
+        *
+        * @param client Client which is connected to the ZooKeeper quorum
+        * @param latchPath ZooKeeper node path for the leader election latch
+        * @param leaderPath ZooKeeper node path for the node which stores the 
current leader information
+        * @param leaderElectionEventHandler event handler for processing 
leader change events
+        * @param leaderContenderDescription leader contender description
+        */
+       public ZooKeeperLeaderElectionDriver(
+                       CuratorFramework client,
+                       String latchPath,
+                       String leaderPath,
+                       LeaderElectionEventHandler leaderElectionEventHandler,
+                       String leaderContenderDescription) throws Exception {
+               this.client = checkNotNull(client);
+               this.leaderPath = checkNotNull(leaderPath);
+               this.leaderElectionEventHandler = 
checkNotNull(leaderElectionEventHandler);
+               this.leaderContenderDescription = 
checkNotNull(leaderContenderDescription);
+
+               leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
+               cache = new NodeCache(client, leaderPath);
+
+               client.getUnhandledErrorListenable().addListener(this);
+
+               leaderLatch.addListener(this);
+               leaderLatch.start();
+
+               cache.getListenable().addListener(this);
+               cache.start();
+
+               client.getConnectionStateListenable().addListener(listener);
+
+               running = true;
+       }
+
+       @Override
+       public void close() throws Exception{
+               if (!running) {
+                       return;
+               }
+               running = false;

Review comment:
       I'd suggest to introduce a `lock` to guard accesses to `running` and all 
other places where it is accessed. That way we will prevent concurrency when 
shutting this component down and when the `ZooKeeper` threads trigger an update.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to