tillrohrmann commented on a change in pull request #13644: URL: https://github.com/apache/flink/pull/13644#discussion_r512791010
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ########## @@ -140,15 +161,22 @@ if (throwable instanceof CancellationException) { resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable)); } else { - if (retries > 0) { - retryOperation( - resultFuture, - operation, - retries - 1, - executor); + throwable = ExceptionUtils.stripExecutionException(throwable); + if (!retryPredicate.test(throwable)) { + resultFuture.completeExceptionally(throwable); Review comment: Maybe fail with `RetryException("Stopped retrying the operation because the error is not retriable.", throwable)` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriverFactory.java ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +/** + * Factory for creating {@link LeaderElectionDriver} with different implementation. + */ +public interface LeaderElectionDriverFactory { + + /** + * Create a specific {@link LeaderElectionDriver} and start the necessary services. For example, LeaderLatch + * and NodeCache in Zookeeper, KubernetesLeaderElector and ConfigMap watcher in Kubernetes. + * + * @param leaderEventHandler handler for the leader election driver to process leader events. + * @param leaderContenderDescription leader contender description. Review comment: `@throws` is missing. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +/** + * Interface which should be implemented to response to {@link LeaderInformation} changes in Review comment: typo: response -> respond ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ########## @@ -109,9 +109,28 @@ final int retries, final Executor executor) { + return retry(operation, retries, ignore -> true, executor); + } + + /** + * Retry the given operation the given number of times in case of a failure only when an exception is retryable. + * + * @param operation to executed + * @param retries if the operation failed + * @param retryPredicate Predicate to test whether an exception is retryable + * @param executor to use to run the futures + * @param <T> type of the result + * @return Future containing either the result of the operation or a {@link RetryException} + */ + public static <T> CompletableFuture<T> retry( + final Supplier<CompletableFuture<T>> operation, + final int retries, + final Predicate<Throwable> retryPredicate, + final Executor executor) { + Review comment: This is a very nice solution to the problem @wangyang0918. Well done! ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalService.java ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +import org.apache.flink.runtime.leaderelection.LeaderInformation; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Objects; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The counterpart to the {@link org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService}. + * Composed with different {@link LeaderRetrievalDriver}, we could retrieve the leader information from + * different storage. The leader address as well as the current leader session ID will be retrieved from + * {@link LeaderRetrievalDriver}. + */ +public class DefaultLeaderRetrievalService implements LeaderRetrievalService, LeaderRetrievalEventHandler { + private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderRetrievalService.class); + + private final Object lock = new Object(); + + private final LeaderRetrievalDriverFactory leaderRetrievalDriverFactory; + + @GuardedBy("lock") + private String lastLeaderAddress; + + @GuardedBy("lock") + private UUID lastLeaderSessionID; Review comment: `@Nullable` is missing ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalEventHandler.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +import org.apache.flink.runtime.leaderelection.LeaderInformation; + +/** + * Interface which should be implemented to notify to {@link LeaderInformation} changes in + * {@link LeaderRetrievalDriver}. + */ +public interface LeaderRetrievalEventHandler { + + /** + * Called by specific {@link LeaderRetrievalDriver} to notify leader address. Review comment: line break missing to separate `@param` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +import org.apache.flink.runtime.leaderelection.LeaderInformation; +import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The counterpart to the {@link ZooKeeperLeaderElectionDriver}. + * {@link LeaderRetrievalService} implementation for Zookeeper. It retrieves the current leader which has + * been elected by the {@link ZooKeeperLeaderElectionDriver}. + * The leader address as well as the current leader session ID is retrieved from ZooKeeper. + */ +public class ZooKeeperLeaderRetrievalDriver implements LeaderRetrievalDriver, NodeCacheListener, UnhandledErrorListener { + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderRetrievalDriver.class); + + /** Connection to the used ZooKeeper quorum. */ + private final CuratorFramework client; + + /** Curator recipe to watch changes of a specific ZooKeeper node. */ + private final NodeCache cache; + + private final String retrievalPath; + + private final ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState); + + private final LeaderRetrievalEventHandler leaderRetrievalEventHandler; + + private volatile boolean running; + + /** + * Creates a leader retrieval service which uses ZooKeeper to retrieve the leader information. + * + * @param client Client which constitutes the connection to the ZooKeeper quorum + * @param retrievalPath Path of the ZooKeeper node which contains the leader information + * @param leaderRetrievalEventHandler handler to notify the leader changes. + */ + public ZooKeeperLeaderRetrievalDriver( + CuratorFramework client, + String retrievalPath, + LeaderRetrievalEventHandler leaderRetrievalEventHandler) throws Exception { + this.client = checkNotNull(client, "CuratorFramework client"); + this.cache = new NodeCache(client, retrievalPath); + this.retrievalPath = checkNotNull(retrievalPath); + + this.leaderRetrievalEventHandler = checkNotNull(leaderRetrievalEventHandler); + + client.getUnhandledErrorListenable().addListener(this); + cache.getListenable().addListener(this); + cache.start(); + + client.getConnectionStateListenable().addListener(connectionStateListener); + + running = true; + } + + @Override + public void close() throws Exception { + if (!running) { + return; + } + + running = false; Review comment: I'd suggest to introduce a `lock` to guard access to running (also at other places where we access this field). That way we will make sure that after `close` is called that we won't see any calls coming from the `ZooKeeper` threads. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalEventHandler.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +import org.apache.flink.runtime.leaderelection.LeaderInformation; + +/** + * Interface which should be implemented to notify to {@link LeaderInformation} changes in + * {@link LeaderRetrievalDriver}. + */ +public interface LeaderRetrievalEventHandler { + + /** + * Called by specific {@link LeaderRetrievalDriver} to notify leader address. + * @param leaderInformation the new leader information to notify {@link DefaultLeaderRetrievalService}. It could be + * {@link LeaderInformation#empty()} if the leader address does not exist in the external storage. + */ + void notifyLeaderAddress(LeaderInformation leaderInformation); + + /** + * Handle error by specific {@link LeaderRetrievalDriver}. Review comment: line break is missing ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingRetrievalBase.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +/** + * Base class which provides some convenience functions for testing purposes of {@link LeaderRetrievalListener} and + * {@link org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler}. + */ +public class TestingRetrievalBase { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + private final Object lock = new Object(); + + private String address; + private String oldAddress; + private UUID leaderSessionID; + private Exception exception; + + + public String getAddress() { + return address; + } + + public UUID getLeaderSessionID() { + return leaderSessionID; + } + + public String waitForNewLeader(long timeout) throws Exception { + long start = System.currentTimeMillis(); + long curTimeout; + + synchronized (lock) { + while ( + exception == null && + (address == null || address.equals(oldAddress)) && + (curTimeout = timeout - System.currentTimeMillis() + start) > 0) { + try { + lock.wait(curTimeout); + } catch (InterruptedException e) { + // we got interrupted so check again for the condition + } + } + } Review comment: I am wondering whether we can't solve this problem a bit more elegantly using a blocking array queue. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +/** + * Interface which should be implemented to response to {@link LeaderInformation} changes in + * {@link LeaderElectionDriver}. + */ +public interface LeaderElectionEventHandler { + + /** + * Called by specific {@link LeaderElectionDriver} when the leadership is granted. + */ + void onGrantLeadership(); + + /** + * Called by specific {@link LeaderElectionDriver} when the leadership is revoked. + */ + void onRevokeLeadership(); + + /** + * Called by specific {@link LeaderElectionDriver} when the leader information is changed. Then the + * {@link DefaultLeaderElectionService} could write the leader information again if necessary. + * @param leaderInformation leader information which contains leader session id and leader address. Review comment: Will this method only called if one is still the leader or will the implementor be responsible for ensuring that we are still the leader? I think this kind of contracts need to be stated so that future implementations can respect it. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +/** + * Interface which should be implemented to response to {@link LeaderInformation} changes in + * {@link LeaderElectionDriver}. + */ +public interface LeaderElectionEventHandler { + + /** + * Called by specific {@link LeaderElectionDriver} when the leadership is granted. + */ + void onGrantLeadership(); + + /** + * Called by specific {@link LeaderElectionDriver} when the leadership is revoked. + */ + void onRevokeLeadership(); + + /** + * Called by specific {@link LeaderElectionDriver} when the leader information is changed. Then the + * {@link DefaultLeaderElectionService} could write the leader information again if necessary. + * @param leaderInformation leader information which contains leader session id and leader address. + */ + void onLeaderInformationChange(LeaderInformation leaderInformation); + + /** + * Handle error by specific {@link LeaderElectionDriver}. + * @param ex exception to be handled. + */ + void handleError(Exception ex); Review comment: Theoretically this does not need to be part of the `LeaderElectionEventHandler` and one could instead pass in a `FatalErrorHandler` to the `LeaderElectionDriver`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderInformation.java ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Objects; +import java.util.UUID; + +/** + * Information about leader including the confirmed leader session id and leader address. + */ +public class LeaderInformation implements Serializable { + + private static final long serialVersionUID = 1L; + + @Nullable + private final UUID leaderSessionID; + + @Nullable + private final String leaderAddress; + + private static final LeaderInformation EMPTY = new LeaderInformation(null, null); + + public LeaderInformation(@Nullable UUID leaderSessionID, @Nullable String leaderAddress) { Review comment: Let's make this private and introduce two factory methods `known(String, UUID)` and `empty()` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +/** + * A {@link LeaderRetrievalDriver} is responsible for retrieves the current leader which has been elected by the + * {@link org.apache.flink.runtime.leaderelection.LeaderElectionDriver}. + */ +public interface LeaderRetrievalDriver extends AutoCloseable { + + /** + * Close the services used for leader retrieval. + */ + void close() throws Exception; Review comment: Shouldn't this also be defined in `AutoCloseable`? Maybe `LeaderRetrievalDriver` does not need to extend from `AutoCloseable`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalService.java ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +import org.apache.flink.runtime.leaderelection.LeaderInformation; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Objects; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The counterpart to the {@link org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService}. + * Composed with different {@link LeaderRetrievalDriver}, we could retrieve the leader information from + * different storage. The leader address as well as the current leader session ID will be retrieved from + * {@link LeaderRetrievalDriver}. + */ +public class DefaultLeaderRetrievalService implements LeaderRetrievalService, LeaderRetrievalEventHandler { + private static final Logger LOG = LoggerFactory.getLogger(DefaultLeaderRetrievalService.class); + + private final Object lock = new Object(); + + private final LeaderRetrievalDriverFactory leaderRetrievalDriverFactory; + + @GuardedBy("lock") + private String lastLeaderAddress; Review comment: `@Nullable` missing ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionEventHandler.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +/** + * Interface which should be implemented to response to {@link LeaderInformation} changes in + * {@link LeaderElectionDriver}. Review comment: Maybe one could say that it reacts also to leader changes and not only to leader information changes. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.junit.Test; + +import java.util.UUID; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link DefaultLeaderElectionService}. + */ +public class DefaultLeaderElectionServiceTest { Review comment: `extends TestLogger` is missing ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java ########## @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link LeaderElectionDriver} implementation for Zookeeper. The leading JobManager is elected using + * ZooKeeper. The current leader's address as well as its leader session ID is published via + * ZooKeeper. + */ +public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, LeaderLatchListener, NodeCacheListener, UnhandledErrorListener { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class); + + /** Client to the ZooKeeper quorum. */ + private final CuratorFramework client; + + /** Curator recipe for leader election. */ + private final LeaderLatch leaderLatch; + + /** Curator recipe to watch a given ZooKeeper node for changes. */ + private final NodeCache cache; + + /** ZooKeeper path of the node which stores the current leader information. */ + private final String leaderPath; + + private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); + + private final LeaderElectionEventHandler leaderElectionEventHandler; + + private final String leaderContenderDescription; + + private volatile boolean running; + + /** + * Creates a ZooKeeperLeaderElectionDriver object. + * + * @param client Client which is connected to the ZooKeeper quorum + * @param latchPath ZooKeeper node path for the leader election latch + * @param leaderPath ZooKeeper node path for the node which stores the current leader information + * @param leaderElectionEventHandler event handler for processing leader change events + * @param leaderContenderDescription leader contender description + */ + public ZooKeeperLeaderElectionDriver( + CuratorFramework client, + String latchPath, + String leaderPath, + LeaderElectionEventHandler leaderElectionEventHandler, + String leaderContenderDescription) throws Exception { + this.client = checkNotNull(client); + this.leaderPath = checkNotNull(leaderPath); + this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler); + this.leaderContenderDescription = checkNotNull(leaderContenderDescription); + + leaderLatch = new LeaderLatch(client, checkNotNull(latchPath)); + cache = new NodeCache(client, leaderPath); + + client.getUnhandledErrorListenable().addListener(this); + + leaderLatch.addListener(this); + leaderLatch.start(); + + cache.getListenable().addListener(this); + cache.start(); + + client.getConnectionStateListenable().addListener(listener); + + running = true; + } + + @Override + public void close() throws Exception{ + if (!running) { + return; + } + running = false; + + LOG.info("Closing {}", this); + + client.getUnhandledErrorListenable().removeListener(this); + + client.getConnectionStateListenable().removeListener(listener); + + Exception exception = null; + + try { + cache.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { + leaderLatch.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (exception != null) { + throw new Exception("Could not properly stop the ZooKeeperLeaderElectionDriver.", exception); + } + } + + @Override + public boolean hasLeadership() { + return leaderLatch.hasLeadership(); + } + + @Override + public void isLeader() { + leaderElectionEventHandler.onGrantLeadership(); + } + + @Override + public void notLeader() { + leaderElectionEventHandler.onRevokeLeadership(); + } + + @Override + public void nodeChanged() throws Exception { + if (leaderLatch.hasLeadership()) { + if (running) { + ChildData childData = cache.getCurrentData(); + if (childData != null) { + final byte[] data = childData.getData(); + if (data != null && data.length > 0) { + final ByteArrayInputStream bais = new ByteArrayInputStream(data); + final ObjectInputStream ois = new ObjectInputStream(bais); + + final String leaderAddress = ois.readUTF(); + final UUID leaderSessionID = (UUID) ois.readObject(); + + leaderElectionEventHandler.onLeaderInformationChange( + new LeaderInformation(leaderSessionID, leaderAddress)); + return; + } + } + leaderElectionEventHandler.onLeaderInformationChange(LeaderInformation.empty()); + } else { + LOG.debug("Ignoring node change notification since the service has already been stopped."); + } + } + } + + /** + * Writes the current leader's address as well the given leader session ID to ZooKeeper. + */ + @Override + public void writeLeaderInformation(LeaderInformation leaderInformation) { + // this method does not have to be synchronized because the curator framework client + // is thread-safe + final UUID confirmedLeaderSessionID = leaderInformation.getLeaderSessionID(); + final String confirmedLeaderAddress = leaderInformation.getLeaderAddress(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Write leader information: Leader={}, session ID={}.", + confirmedLeaderAddress, + confirmedLeaderSessionID); + } + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeUTF(confirmedLeaderAddress); + oos.writeObject(confirmedLeaderSessionID); + + oos.close(); + + boolean dataWritten = false; + + while (!dataWritten && leaderLatch.hasLeadership()) { + Stat stat = client.checkExists().forPath(leaderPath); + + if (stat != null) { + long owner = stat.getEphemeralOwner(); + long sessionID = client.getZookeeperClient().getZooKeeper().getSessionId(); + + if (owner == sessionID) { + try { + client.setData().forPath(leaderPath, baos.toByteArray()); + + dataWritten = true; + } catch (KeeperException.NoNodeException noNode) { + // node was deleted in the meantime + } + } else { + try { + client.delete().forPath(leaderPath); + } catch (KeeperException.NoNodeException noNode) { + // node was deleted in the meantime --> try again + } + } + } else { + try { + client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( + leaderPath, + baos.toByteArray()); + + dataWritten = true; + } catch (KeeperException.NodeExistsException nodeExists) { + // node has been created in the meantime --> try again + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "Successfully wrote leader information: Leader={}, session ID={}.", + confirmedLeaderAddress, + confirmedLeaderSessionID); + } + } catch (Exception e) { + leaderElectionEventHandler.handleError( + new Exception("Could not write leader address and leader session ID to " + + "ZooKeeper.", e)); + } + } + + private void handleStateChange(ConnectionState newState) { + switch (newState) { + case CONNECTED: + LOG.debug("Connected to ZooKeeper quorum. Leader election can start."); + break; + case SUSPENDED: + LOG.warn("Connection to ZooKeeper suspended. The contender " + leaderContenderDescription + + " no longer participates in the leader election."); + break; + case RECONNECTED: + LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted."); + break; + case LOST: + // Maybe we have to throw an exception here to terminate the JobManager + LOG.warn("Connection to ZooKeeper lost. The contender " + leaderContenderDescription + + " no longer participates in the leader election."); + break; + } + } + + @Override + public void unhandledError(String message, Throwable e) { + leaderElectionEventHandler.handleError( + new FlinkException("Unhandled error in ZooKeeperLeaderElectionDriver: " + message, e)); + } + + @Override + public String toString() { + return "ZooKeeperLeaderElectionDriver{" + + "leaderPath='" + leaderPath + '\'' + + '}'; + } +} Review comment: Very nice! I like the implementation and I think that it is now quite clear and easy to understand what is happening :-) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalEventHandler.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +import org.apache.flink.runtime.leaderelection.LeaderInformation; + +/** + * Interface which should be implemented to notify to {@link LeaderInformation} changes in + * {@link LeaderRetrievalDriver}. + */ +public interface LeaderRetrievalEventHandler { + + /** + * Called by specific {@link LeaderRetrievalDriver} to notify leader address. + * @param leaderInformation the new leader information to notify {@link DefaultLeaderRetrievalService}. It could be Review comment: Usually one does not refers to implementations in the interface definition because the interface does not know about them. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.junit.Test; + +import java.util.UUID; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link DefaultLeaderElectionService}. + */ +public class DefaultLeaderElectionServiceTest { + + private static final String TEST_URL = "akka//user/jobmanager"; + private static final long timeout = 30L * 1000L; + + @Test + public void testOnGrantAndRevokeLeadership() throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory testingLeaderElectionDriverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + final DefaultLeaderElectionService leaderElectionService = new DefaultLeaderElectionService( + testingLeaderElectionDriverFactory); + final TestingContender testingContender = new TestingContender(TEST_URL, leaderElectionService); + leaderElectionService.start(testingContender); + + // grant leadership + final TestingLeaderElectionDriver testingLeaderElectionDriver = + testingLeaderElectionDriverFactory.getCurrentLeaderDriver(); + assertThat(testingLeaderElectionDriver, is(notNullValue())); + testingLeaderElectionDriver.isLeader(); + + testingContender.waitForLeader(timeout); + assertThat(testingContender.isLeader(), is(true)); + assertThat(testingContender.getDescription(), is(TEST_URL)); + + // Check the external storage + assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(), is(TEST_URL)); + + // revoke leadership + testingLeaderElectionDriver.notLeader(); + testingContender.waitForRevokeLeader(timeout); + assertThat(testingContender.isLeader(), is(false)); + + leaderElectionService.stop(); + } + + @Test + public void testLeaderInformationChangedAndShouldBeCorrected() throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory testingLeaderElectionDriverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + final DefaultLeaderElectionService leaderElectionService = new DefaultLeaderElectionService( + testingLeaderElectionDriverFactory); + final TestingContender testingContender = new TestingContender(TEST_URL, leaderElectionService); + leaderElectionService.start(testingContender); + + final TestingLeaderElectionDriver testingLeaderElectionDriver = + testingLeaderElectionDriverFactory.getCurrentLeaderDriver(); + assertThat(testingLeaderElectionDriver, is(notNullValue())); + testingLeaderElectionDriver.isLeader(); + testingContender.waitForLeader(timeout); + + // Leader information changed and should be corrected + testingLeaderElectionDriver.leaderInformationChanged(LeaderInformation.empty()); + assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(), is(TEST_URL)); Review comment: Shouldn't we also check that the leader session id stays the same? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalDriver.java ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderretrieval; + +import org.apache.flink.runtime.leaderelection.LeaderInformation; +import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The counterpart to the {@link ZooKeeperLeaderElectionDriver}. + * {@link LeaderRetrievalService} implementation for Zookeeper. It retrieves the current leader which has + * been elected by the {@link ZooKeeperLeaderElectionDriver}. + * The leader address as well as the current leader session ID is retrieved from ZooKeeper. + */ +public class ZooKeeperLeaderRetrievalDriver implements LeaderRetrievalDriver, NodeCacheListener, UnhandledErrorListener { + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderRetrievalDriver.class); + + /** Connection to the used ZooKeeper quorum. */ + private final CuratorFramework client; + + /** Curator recipe to watch changes of a specific ZooKeeper node. */ + private final NodeCache cache; + + private final String retrievalPath; + + private final ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState); + + private final LeaderRetrievalEventHandler leaderRetrievalEventHandler; + + private volatile boolean running; + + /** + * Creates a leader retrieval service which uses ZooKeeper to retrieve the leader information. + * + * @param client Client which constitutes the connection to the ZooKeeper quorum + * @param retrievalPath Path of the ZooKeeper node which contains the leader information + * @param leaderRetrievalEventHandler handler to notify the leader changes. + */ + public ZooKeeperLeaderRetrievalDriver( + CuratorFramework client, + String retrievalPath, + LeaderRetrievalEventHandler leaderRetrievalEventHandler) throws Exception { + this.client = checkNotNull(client, "CuratorFramework client"); + this.cache = new NodeCache(client, retrievalPath); + this.retrievalPath = checkNotNull(retrievalPath); + + this.leaderRetrievalEventHandler = checkNotNull(leaderRetrievalEventHandler); + + client.getUnhandledErrorListenable().addListener(this); + cache.getListenable().addListener(this); + cache.start(); + + client.getConnectionStateListenable().addListener(connectionStateListener); + + running = true; + } + + @Override + public void close() throws Exception { + if (!running) { + return; + } + + running = false; + + LOG.info("Closing {}.", this); + + client.getUnhandledErrorListenable().removeListener(this); + client.getConnectionStateListenable().removeListener(connectionStateListener); + + try { + cache.close(); + } catch (IOException e) { + throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalDriver.", e); + } + } + + @Override + public void nodeChanged() { + retrieveLeaderInformationFromZooKeeper(); + } + + private void retrieveLeaderInformationFromZooKeeper() { + if (running) { + try { + LOG.debug("Leader node has changed."); + + ChildData childData = cache.getCurrentData(); + + String leaderAddress; + UUID leaderSessionID; + + if (childData == null) { + leaderAddress = null; + leaderSessionID = null; + } else { + byte[] data = childData.getData(); + + if (data == null || data.length == 0) { + leaderAddress = null; + leaderSessionID = null; + } else { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + leaderAddress = ois.readUTF(); + leaderSessionID = (UUID) ois.readObject(); + } + } + + leaderRetrievalEventHandler.notifyLeaderAddress(new LeaderInformation(leaderSessionID, leaderAddress)); + } catch (Exception e) { + leaderRetrievalEventHandler.handleError(new Exception("Could not handle node changed event.", e)); + ExceptionUtils.checkInterrupted(e); + } + } else { + LOG.debug("Ignoring node change notification since the service has already been stopped."); + } + } + + private void handleStateChange(ConnectionState newState) { + switch (newState) { + case CONNECTED: + LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start."); + break; + case SUSPENDED: + LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from " + + "ZooKeeper."); + leaderRetrievalEventHandler.notifyLeaderAddress(LeaderInformation.empty()); Review comment: I think we should check whether we are still running before calling this method. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.junit.Test; + +import java.util.UUID; + +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link DefaultLeaderElectionService}. + */ +public class DefaultLeaderElectionServiceTest { + + private static final String TEST_URL = "akka//user/jobmanager"; + private static final long timeout = 30L * 1000L; + + @Test + public void testOnGrantAndRevokeLeadership() throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory testingLeaderElectionDriverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + final DefaultLeaderElectionService leaderElectionService = new DefaultLeaderElectionService( + testingLeaderElectionDriverFactory); + final TestingContender testingContender = new TestingContender(TEST_URL, leaderElectionService); + leaderElectionService.start(testingContender); + + // grant leadership + final TestingLeaderElectionDriver testingLeaderElectionDriver = + testingLeaderElectionDriverFactory.getCurrentLeaderDriver(); + assertThat(testingLeaderElectionDriver, is(notNullValue())); + testingLeaderElectionDriver.isLeader(); + + testingContender.waitForLeader(timeout); + assertThat(testingContender.isLeader(), is(true)); + assertThat(testingContender.getDescription(), is(TEST_URL)); + + // Check the external storage + assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(), is(TEST_URL)); + + // revoke leadership + testingLeaderElectionDriver.notLeader(); + testingContender.waitForRevokeLeader(timeout); + assertThat(testingContender.isLeader(), is(false)); + + leaderElectionService.stop(); + } + + @Test + public void testLeaderInformationChangedAndShouldBeCorrected() throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory testingLeaderElectionDriverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + final DefaultLeaderElectionService leaderElectionService = new DefaultLeaderElectionService( + testingLeaderElectionDriverFactory); + final TestingContender testingContender = new TestingContender(TEST_URL, leaderElectionService); + leaderElectionService.start(testingContender); + + final TestingLeaderElectionDriver testingLeaderElectionDriver = + testingLeaderElectionDriverFactory.getCurrentLeaderDriver(); + assertThat(testingLeaderElectionDriver, is(notNullValue())); + testingLeaderElectionDriver.isLeader(); + testingContender.waitForLeader(timeout); + + // Leader information changed and should be corrected + testingLeaderElectionDriver.leaderInformationChanged(LeaderInformation.empty()); + assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(), is(TEST_URL)); + + testingLeaderElectionDriver.leaderInformationChanged( + new LeaderInformation(UUID.randomUUID(), "faulty-address")); + assertThat(testingLeaderElectionDriver.getLeaderInformation().getLeaderAddress(), is(TEST_URL)); Review comment: Same here with the leader session id. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java ########## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +/** + * Base class which provides some convenience functions for testing purposes of {@link LeaderContender} and + * {@link LeaderElectionEventHandler}. + */ +public class TestingLeaderBase { + + protected boolean leader = false; + protected Throwable error = null; + + protected final Object lock = new Object(); + private final Object errorLock = new Object(); + + /** + * Waits until the contender becomes the leader or until the timeout has been exceeded. + * + * @param timeout + * @throws TimeoutException + */ + public void waitForLeader(long timeout) throws TimeoutException { + waitFor(this::isLeader, timeout, "Contender was not elected as the leader within " + timeout + "ms"); + } + + /** + * Waits until the contender revokes the leader or until the timeout has been exceeded. + * + * @param timeout + * @throws TimeoutException + */ + public void waitForRevokeLeader(long timeout) throws TimeoutException { + waitFor(() -> !isLeader(), timeout, "Contender was not revoked within " + timeout + "ms"); + } + + protected void waitFor(Supplier<Boolean> supplier, long timeout, String msg) throws TimeoutException { + long start = System.currentTimeMillis(); + long curTimeout; + + while (!supplier.get() && (curTimeout = timeout - System.currentTimeMillis() + start) > 0) { + synchronized (lock) { + try { + lock.wait(curTimeout); + } catch (InterruptedException e) { + // we got interrupted so check again for the condition + } + } + } + + if (!supplier.get()) { + throw new TimeoutException(msg); + } Review comment: I am wondering whether we can't solve this problem a bit more elegantly using a blocking array queue. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java ########## @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState; +import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.CreateMode; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link LeaderElectionDriver} implementation for Zookeeper. The leading JobManager is elected using + * ZooKeeper. The current leader's address as well as its leader session ID is published via + * ZooKeeper. + */ +public class ZooKeeperLeaderElectionDriver implements LeaderElectionDriver, LeaderLatchListener, NodeCacheListener, UnhandledErrorListener { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class); + + /** Client to the ZooKeeper quorum. */ + private final CuratorFramework client; + + /** Curator recipe for leader election. */ + private final LeaderLatch leaderLatch; + + /** Curator recipe to watch a given ZooKeeper node for changes. */ + private final NodeCache cache; + + /** ZooKeeper path of the node which stores the current leader information. */ + private final String leaderPath; + + private final ConnectionStateListener listener = (client, newState) -> handleStateChange(newState); + + private final LeaderElectionEventHandler leaderElectionEventHandler; + + private final String leaderContenderDescription; + + private volatile boolean running; + + /** + * Creates a ZooKeeperLeaderElectionDriver object. + * + * @param client Client which is connected to the ZooKeeper quorum + * @param latchPath ZooKeeper node path for the leader election latch + * @param leaderPath ZooKeeper node path for the node which stores the current leader information + * @param leaderElectionEventHandler event handler for processing leader change events + * @param leaderContenderDescription leader contender description + */ + public ZooKeeperLeaderElectionDriver( + CuratorFramework client, + String latchPath, + String leaderPath, + LeaderElectionEventHandler leaderElectionEventHandler, + String leaderContenderDescription) throws Exception { + this.client = checkNotNull(client); + this.leaderPath = checkNotNull(leaderPath); + this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler); + this.leaderContenderDescription = checkNotNull(leaderContenderDescription); + + leaderLatch = new LeaderLatch(client, checkNotNull(latchPath)); + cache = new NodeCache(client, leaderPath); + + client.getUnhandledErrorListenable().addListener(this); + + leaderLatch.addListener(this); + leaderLatch.start(); + + cache.getListenable().addListener(this); + cache.start(); + + client.getConnectionStateListenable().addListener(listener); + + running = true; + } + + @Override + public void close() throws Exception{ + if (!running) { + return; + } + running = false; Review comment: I'd suggest to introduce a `lock` to guard accesses to `running` and all other places where it is accessed. That way we will prevent concurrency when shutting this component down and when the `ZooKeeper` threads trigger an update. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org