tillrohrmann commented on a change in pull request #14301:
URL: https://github.com/apache/flink/pull/14301#discussion_r536125712



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
##########
@@ -29,13 +30,13 @@
  * Simple {@link CheckpointRecoveryFactory} which creates a
  * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter} per 
{@link JobID}.
  */
-public class TestingCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
+public class PerJobCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {

Review comment:
       Why did you call it `PerJobCheckpointRecoveryFactory`? Is it because it 
stores for different jobs the services? To me it sounds that this factory only 
works for a single job.

##########
File path: 
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -76,46 +68,15 @@
        @ClassRule
        public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
-       private static TestingMiniCluster miniCluster;
-
-       private static EmbeddedHaServicesWithLeadershipControl 
highAvailabilityServices;
-
-       @BeforeClass
-       public static void setupMiniCluster() throws Exception  {
-               highAvailabilityServices =
-                       new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
-
-               final Configuration configuration = createConfiguration();
-
-               miniCluster = new TestingMiniCluster(
-                       new TestingMiniClusterConfiguration.Builder()
-                               .setConfiguration(configuration)
-                               .setNumTaskManagers(1)
-                               .setNumSlotsPerTaskManager(PARALLELISM)
-                               
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                               .build(),
-                       () -> highAvailabilityServices);
-
-               miniCluster.start();
-       }
-
-       private static Configuration createConfiguration() throws IOException {
-               final Configuration configuration = new Configuration();
-               final String checkPointDir = 
Path.fromLocalFile(TMP_FOLDER.newFolder()).toUri().toString();
-               configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkPointDir);
-               return configuration;
-       }
-
-       @AfterClass
-       public static void shutdownMiniCluster() throws Exception {
-               if (miniCluster != null) {
-                       miniCluster.close();
-               }
-               if (highAvailabilityServices != null) {
-                       highAvailabilityServices.closeAndCleanupAllData();
-                       highAvailabilityServices = null;
-               }
-       }
+       @ClassRule
+       public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE 
= new MiniClusterWithClientResource(
+               new MiniClusterResourceConfiguration
+                       .Builder()
+                       .setNumberTaskManagers(1)
+                       .setNumberSlotsPerTaskManager(PARALLELISM)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .enableEmbeddedHaLeadershipControl()

Review comment:
       maybe call `withHaLeadershipControl()`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -938,6 +962,13 @@ private void terminateMiniClusterServices() throws 
Exception {
                }
        }
 
+       @Nullable
+       private static BiFunction<Configuration, Executor, 
HighAvailabilityServices> createHighAvailabilityServicesFactory(
+                       boolean enableEmbeddedHaLeadershipControl) {
+               return enableEmbeddedHaLeadershipControl ?
+                       (conf, executor) -> new 
EmbeddedHaServicesWithLeadershipControl(executor) : null;
+       }

Review comment:
       For what do we need this method here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
##########
@@ -176,6 +184,11 @@ public Builder setCommonBindAddress(String 
commonBindAddress) {
                        return this;
                }
 
+               public Builder enableEmbeddedHaLeadershipControl(boolean 
enableEmbeddedHaLeadershipControl) {

Review comment:
       maybe rename into `withHaLeadershipControl` and add a description that 
this overrides the HA config option.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
##########
@@ -20,27 +20,47 @@
 
 import org.apache.flink.api.common.JobID;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 /**
- * Simple {@link CheckpointRecoveryFactory} which is initialized with a
- * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter}.
+ * Simple {@link CheckpointRecoveryFactory} which creates a
+ * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter} per 
{@link JobID}.
  */
 public class TestingCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
+       private final Function<Integer, CompletedCheckpointStore> 
completedCheckpointStorePerJobFactory;
+       private final Supplier<CheckpointIDCounter> 
checkpointIDCounterPerJobFactory;
+       private final Map<JobID, CompletedCheckpointStore> store;
+       private final Map<JobID, CheckpointIDCounter> counter;
 
-       private final CompletedCheckpointStore store;
-       private final CheckpointIDCounter counter;
-
-       public TestingCheckpointRecoveryFactory(CompletedCheckpointStore store, 
CheckpointIDCounter counter) {
-               this.store = store;
-               this.counter = counter;
+       public TestingCheckpointRecoveryFactory(
+                       Function<Integer, CompletedCheckpointStore> 
completedCheckpointStorePerJobFactory,
+                       Supplier<CheckpointIDCounter> 
checkpointIDCounterPerJobFactory) {
+               this.completedCheckpointStorePerJobFactory = 
completedCheckpointStorePerJobFactory;
+               this.checkpointIDCounterPerJobFactory = 
checkpointIDCounterPerJobFactory;
+               this.store = new HashMap<>();
+               this.counter = new HashMap<>();
        }
 
        @Override
-       public CompletedCheckpointStore createCheckpointStore(JobID jobId, int 
maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
-               return store;
+       public CompletedCheckpointStore createCheckpointStore(
+                       JobID jobId,
+                       int maxNumberOfCheckpointsToRetain,
+                       ClassLoader userClassLoader) {
+               return store.computeIfAbsent(jobId, jId ->
+                       
completedCheckpointStorePerJobFactory.apply(maxNumberOfCheckpointsToRetain));
        }
 
        @Override
-       public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) 
throws Exception {
-               return counter;
+       public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
+               return counter.computeIfAbsent(jobId, jId -> 
checkpointIDCounterPerJobFactory.get());
+       }
+
+       public static CheckpointRecoveryFactory createSamePerJob(

Review comment:
       Maybe rename into `useSameServicesForAllJobs()`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
##########
@@ -54,12 +56,13 @@ public MiniClusterConfiguration(
                        Configuration configuration,
                        int numTaskManagers,
                        RpcServiceSharing rpcServiceSharing,
-                       @Nullable String commonBindAddress) {
-
+                       @Nullable String commonBindAddress,
+                       boolean enableEmbeddedHaLeadershipControl) {

Review comment:
       I am always a bit more in favour of passing enum instead of booleans 
because enum values are more descriptive than `true` and `false`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -428,11 +434,29 @@ DispatcherResourceManagerComponentFactory 
createDispatcherResourceManagerCompone
        }
 
        @VisibleForTesting
-       protected HighAvailabilityServices 
createHighAvailabilityServices(Configuration configuration, Executor executor) 
throws Exception {
+       protected HighAvailabilityServices createHighAvailabilityServices(
+                       Configuration configuration,
+                       Executor executor) throws Exception {
                LOG.info("Starting high-availability services");
-               return 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
-                       configuration,
-                       executor);
+               return 
miniClusterConfiguration.embeddedHaLeadershipControlEnabled() ?
+                       new EmbeddedHaServicesWithLeadershipControl(executor) :
+                       
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration, 
executor);
+       }
+
+       /**
+        * Returns {@link HaLeadershipControl} if enabled.
+        *
+        * <p>{@link HaLeadershipControl} allows granting and revoking 
leadership of HA components,
+        * e.g. JobManager. The method return {@link Optional#empty()} if the 
control is not enabled in
+        * {@link MiniClusterConfiguration}.
+        *
+        * <p>Enabling this feature disables {@link 
HighAvailabilityOptions#HA_MODE} option.

Review comment:
       This paragraph should be added to `enableHaLeadershipControl` method on 
the builder, I guess.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to