[ https://issues.apache.org/jira/browse/FLINK-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16621743#comment-16621743 ]
ASF GitHub Bot commented on FLINK-8660: --------------------------------------- tillrohrmann closed pull request #5530: [FLINK-8660] Enable the user to provide custom HAServices implementation URL: https://github.com/apache/flink/pull/5530 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/ops/jobmanager_high_availability.md b/docs/ops/jobmanager_high_availability.md index a99455849c4..669919e6618 100644 --- a/docs/ops/jobmanager_high_availability.md +++ b/docs/ops/jobmanager_high_availability.md @@ -65,6 +65,7 @@ By default, the job manager will pick a *random port* for inter process communic In order to start an HA-cluster add the following configuration keys to `conf/flink-conf.yaml`: - **high-availability mode** (required): The *high-availability mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode. +Alternatively this option can be set to FQN of factory class Flink should use to create HighAvailabilityServices instance. <pre>high-availability: zookeeper</pre> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java index 8ef605bb328..51c21053df9 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java @@ -39,13 +39,14 @@ * Defines high-availability mode used for the cluster execution. * A value of "NONE" signals no highly available setup. * To enable high-availability, set this mode to "ZOOKEEPER". + * Can also be set to FQN of HighAvailability factory class. */ public static final ConfigOption<String> HA_MODE = key("high-availability") .defaultValue("NONE") .withDeprecatedKeys("recovery.mode") .withDescription("Defines high-availability mode used for the cluster execution." + - " To enable high-availability, set this mode to \"ZOOKEEPER\"."); + " To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class."); /** * The ID of the Flink cluster, used to separate multiple Flink clusters @@ -67,7 +68,6 @@ .withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir") .withDescription("File system path (URI) where Flink persists metadata in high-availability setups."); - // ------------------------------------------------------------------------ // Recovery Options // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index a21c7d61fcb..77b617bbdcc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -87,14 +87,10 @@ * thrown if the (distributed) file storage cannot be created */ public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException { - HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(config); - - if (highAvailabilityMode == HighAvailabilityMode.NONE) { - return new VoidBlobStore(); - } else if (highAvailabilityMode == HighAvailabilityMode.ZOOKEEPER) { + if (HighAvailabilityMode.isHighAvailabilityModeActivated(config)) { return createFileSystemBlobStore(config); } else { - throw new IllegalConfigurationException("Unexpected high availability mode '" + highAvailabilityMode + "'."); + return new VoidBlobStore(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java new file mode 100644 index 00000000000..a0e897739c1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.configuration.Configuration; + +import java.util.concurrent.Executor; + +/** + * Factory interface for {@link HighAvailabilityServices} + */ +public interface HighAvailabilityServicesFactory { + + /** + * @param config flink configuration + * @param executor background task executor + * @return instance of {@link HighAvailabilityServices} + * @throws Exception when HAServices cannot be created + */ + HighAvailabilityServices createHAServices(Configuration config, Executor executor) throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 4f12f2bc481..54c0ecd2913 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.SecurityOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.runtime.blob.BlobStoreService; import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.dispatcher.Dispatcher; @@ -62,11 +63,14 @@ public static HighAvailabilityServices createAvailableOrEmbeddedServices( config, blobStoreService); + case FACTORY_CLASS: + return createCustomHAServices(config, executor); + default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); } } - + public static HighAvailabilityServices createHighAvailabilityServices( Configuration configuration, Executor executor, @@ -74,7 +78,7 @@ public static HighAvailabilityServices createHighAvailabilityServices( HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration); - switch(highAvailabilityMode) { + switch (highAvailabilityMode) { case NONE: final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration); @@ -115,6 +119,10 @@ public static HighAvailabilityServices createHighAvailabilityServices( executor, configuration, blobStoreService); + + case FACTORY_CLASS: + return createCustomHAServices(configuration, executor); + default: throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); } @@ -147,6 +155,22 @@ public static HighAvailabilityServices createHighAvailabilityServices( return Tuple2.of(hostname, port); } + private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws Exception { + Class<HighAvailabilityServicesFactory> factoryClass; + try { + factoryClass = config.getClass( + HighAvailabilityOptions.HA_MODE.key(), null, Thread.currentThread().getContextClassLoader()); + } catch (ClassNotFoundException e) { + throw new Exception("Custom HA FactoryClass not found"); + } + + if (factoryClass != null && HighAvailabilityServicesFactory.class.isAssignableFrom(factoryClass)) { + return factoryClass.newInstance().createHAServices(config, executor); + } else { + throw new Exception("Custom HA FactoryClass is not valid."); + } + } + public enum AddressResolution { TRY_ADDRESS_RESOLUTION, NO_ADDRESS_RESOLUTION diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java index 7dc13c28f84..65c202a3c55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/HighAvailabilityMode.java @@ -30,10 +30,19 @@ * ZooKeeper is used to select a leader among a group of JobManager. This JobManager * is responsible for the job execution. Upon failure of the leader a new leader is elected * which will take over the responsibilities of the old leader + * - FACTORY_CLASS: Use implementation of {@link org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory} + * specified in configuration property high-availability */ public enum HighAvailabilityMode { - NONE, - ZOOKEEPER; + NONE(false), + ZOOKEEPER(true), + FACTORY_CLASS(true); + + private final boolean haActive; + + HighAvailabilityMode(boolean haActive) { + this.haActive = haActive; + } /** * Return the configured {@link HighAvailabilityMode}. @@ -51,7 +60,11 @@ public static HighAvailabilityMode fromConfig(Configuration config) { // Map old default to new default return HighAvailabilityMode.NONE; } else { - return HighAvailabilityMode.valueOf(haMode.toUpperCase()); + try { + return HighAvailabilityMode.valueOf(haMode.toUpperCase()); + } catch (IllegalArgumentException e) { + return FACTORY_CLASS; + } } } @@ -63,14 +76,6 @@ public static HighAvailabilityMode fromConfig(Configuration config) { */ public static boolean isHighAvailabilityModeActivated(Configuration configuration) { HighAvailabilityMode mode = fromConfig(configuration); - switch (mode) { - case NONE: - return false; - case ZOOKEEPER: - return true; - default: - return false; - } - + return mode.haActive; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java new file mode 100644 index 00000000000..9dfe98ce46c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtilsTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.Executor; + +import static org.junit.Assert.*; + +public class HighAvailabilityServicesUtilsTest { + + @Test + public void testCreateCustomHAServices() throws Exception { + Configuration config = new Configuration(); + + HighAvailabilityServices haServices = Mockito.mock(HighAvailabilityServices.class); + TestHAFactory.haServices = haServices; + + Executor executor = Mockito.mock(Executor.class); + + config.setString(HighAvailabilityOptions.HA_MODE, TestHAFactory.class.getName()); + + // when + HighAvailabilityServices actualHaServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor); + + // then + assertSame(haServices, actualHaServices); + + // when + actualHaServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(config, executor, + HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); + // then + assertSame(haServices, actualHaServices); + } + + @Test(expected = Exception.class) + public void testCustomHAServicesFactoryNotDefined() throws Exception { + Configuration config = new Configuration(); + + Executor executor = Mockito.mock(Executor.class); + + config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase()); + + // expect + HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config, executor); + } + + static class TestHAFactory implements HighAvailabilityServicesFactory { + + static HighAvailabilityServices haServices; + + @Override + public HighAvailabilityServices createHAServices(Configuration config, Executor executor) { + return haServices; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java index 91fb5141ff6..db969bac3ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/HighAvailabilityModeTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class HighAvailabilityModeTest { @@ -45,6 +46,10 @@ public void testFromConfig() throws Exception { // Check not equals default config.setString(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); assertEquals(HighAvailabilityMode.ZOOKEEPER, HighAvailabilityMode.fromConfig(config)); + + // Check factory class + config.setString(HighAvailabilityOptions.HA_MODE, "factory.class.FQN"); + assertEquals(HighAvailabilityMode.FACTORY_CLASS, HighAvailabilityMode.fromConfig(config)); } /** @@ -69,4 +74,24 @@ public void testDeprecatedFromConfig() throws Exception { assertEquals(HighAvailabilityMode.NONE, HighAvailabilityMode.fromConfig(config)); } + @Test + public void testCheckHighAvailabilityModeActivated() throws Exception { + Configuration config = new Configuration(); + + // check defaults + assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config)); + + // check NONE + config.setString("high-availability", HighAvailabilityMode.NONE.name().toLowerCase()); + assertTrue(!HighAvailabilityMode.isHighAvailabilityModeActivated(config)); + + // check ZOOKEEPER + config.setString("high-availability", HighAvailabilityMode.ZOOKEEPER.name().toLowerCase()); + assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config)); + + // check FACTORY_CLASS + config.setString("high-availability", HighAvailabilityMode.FACTORY_CLASS.name().toLowerCase()); + assertTrue(HighAvailabilityMode.isHighAvailabilityModeActivated(config)); + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable the user to provide custom HAServices implementation > ------------------------------------------------------------ > > Key: FLINK-8660 > URL: https://issues.apache.org/jira/browse/FLINK-8660 > Project: Flink > Issue Type: Improvement > Components: Cluster Management, Configuration, Distributed > Coordination > Affects Versions: 1.4.0, 1.5.0 > Reporter: Krzysztof Białek > Assignee: Krzysztof Białek > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > At the moment Flink uses ZooKeeper as HA backend. > The goal of this improvement is to make Flink supporting more HA backends, > also maintained as independent projects. > The following changes are required to achieve it: > # Add {{HighAvailabilityServicesFactory}} interface > # Add new option {{HighAvailabilityMode.CUSTOM}} > # Add new configuration property {{high-availability.factoryClass}} > # Use the factory in {{HighAvailabilityServicesUtils}} to instantiate > {{HighAvailabilityServices}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)