tillrohrmann commented on a change in pull request #14629:
URL: https://github.com/apache/flink/pull/14629#discussion_r581240978



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkPod.java
##########
@@ -48,6 +48,13 @@ public Container getMainContainer() {
         return mainContainer;
     }
 
+    @Override
+    public FlinkPod clone() {

Review comment:
       Do we really want to override `Object.clone` here? Java's clone contract 
is super hard to adhere to. If it is only about creating a method which creates 
a copy of `FlinkPod` I would suggest to add a `copy` method.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
##########
@@ -367,6 +369,54 @@
                                             
code("FlinkKubeClient#checkAndUpdateConfigMap"))
                                     .build());
 
+    public static final ConfigOption<String> JOB_MANAGER_POD_TEMPLATE =
+            key("kubernetes.pod-template-file.jobmanager")
+                    .stringType()
+                    .noDefaultValue()
+                    .withFallbackKeys(KUBERNETES_POD_TEMPLATE_FILE_KEY)
+                    .withDescription(
+                            "Specify a local file that contains the jobmanager 
pod template definition. "
+                                    + "It will be used to initialize the 
jobmanager pod. "
+                                    + "The main container should be defined 
with name '"
+                                    + Constants.MAIN_CONTAINER_NAME
+                                    + "'. If not explicitly configured, config 
option '"
+                                    + KUBERNETES_POD_TEMPLATE_FILE_KEY
+                                    + "' will be used.");
+
+    public static final ConfigOption<String> TASK_MANAGER_POD_TEMPLATE =
+            key("kubernetes.pod-template-file.taskmanager")

Review comment:
       Same here.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -339,6 +339,11 @@ public void close() {
         this.internalClient.close();
     }
 
+    @Override
+    public KubernetesPod loadPodFromTemplateFile(String file) {
+        return new KubernetesPod(this.internalClient.pods().load(file).get());

Review comment:
       What happens if `file` is an invalid file path?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesPodTemplateTestUtils.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+
+import java.net.URL;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Utilities for the Kubernetes pod template tests. */
+public class KubernetesPodTemplateTestUtils {
+
+    public static final String TESTING_MAIN_CONTAINER_NAME = 
"testing-main-container";
+    public static final String TESTING_SIDE_CAR_CONTAINER_NAME = 
"sidecar-log-collector";
+
+    private static final String TESTING_TEMPLATE_FILE_NAME = 
"testing-pod-template.yaml";
+
+    public static String getPodTemplatePath() {
+        final URL podTemplateUrl =
+                KubernetesPodTemplateTestUtils.class
+                        .getClassLoader()
+                        .getResource(TESTING_TEMPLATE_FILE_NAME);
+        assertThat(podTemplateUrl, not(nullValue()));
+        return podTemplateUrl.getPath();
+    }
+
+    public static Container getInitContainerInPodTemplate() {

Review comment:
       maybe `createInitContainerForPodTemplate` or `createInitContainer`.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
##########
@@ -243,4 +220,63 @@ public void testDecoratedFlinkContainer() {
         assertEquals(Constants.FLINK_CONF_VOLUME, volumeMount.getName());
         assertEquals(FLINK_CONF_DIR_IN_POD, volumeMount.getMountPath());
     }
+
+    @Test
+    public void testConfigMapWithTaskManagerPodTemplate() throws IOException {

Review comment:
       Maybe call it 
`testBuildAccompanyingKubernetesResourcesAddsPodTemplateAsConfigMap`

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesFactoryWithPodTemplateTestBase.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.factory;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.KubernetesPodTemplateTestUtils;
+import org.apache.flink.kubernetes.KubernetesTestBase;
+import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
+import 
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test base for the {@link KubernetesJobManagerFactory} and {@link 
KubernetesTaskManagerFactory}
+ * with pod template. These tests will ensure that annotations, labels, 
imagePullSecrets,
+ * nodeSelector, tolerations, env, init container, sidecar container, volumes 
from pod template
+ * should be kept after all decorators.
+ */
+public abstract class KubernetesFactoryWithPodTemplateTestBase extends 
KubernetesTestBase {
+
+    private static final String ENTRY_POINT_CLASS =
+            KubernetesSessionClusterEntrypoint.class.getCanonicalName();
+    private static final int RESOURCE_MEMORY = 1456;
+
+    /**
+     * The result pod variables should be set via {@link #setResultPod()} by 
implementation class.
+     */
+    protected ObjectMeta resultPodMeta;
+
+    protected PodSpec resultPodSpec;
+
+    @Override
+    protected void setupFlinkConfig() {
+        super.setupFlinkConfig();
+        flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, 
ENTRY_POINT_CLASS);
+
+        // Set fixed ports
+        flinkConfig.set(RestOptions.PORT, Constants.REST_PORT);
+        flinkConfig.set(BlobServerOptions.PORT, 
Integer.toString(Constants.BLOB_SERVER_PORT));
+        flinkConfig.setString(
+                TaskManagerOptions.RPC_PORT, 
String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
+
+        flinkConfig.set(
+                TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(RESOURCE_MEMORY));
+    }
+
+    @Override
+    public final void onSetup() throws Exception {
+        setResultPod();
+    }
+
+    protected abstract void setResultPod() throws Exception;
+
+    @Test
+    public void testInitContainerFromPodTemplate() {
+        assertThat(resultPodSpec.getInitContainers().size(), is(1));
+        assertThat(
+                resultPodSpec.getInitContainers().get(0),
+                
is(KubernetesPodTemplateTestUtils.getInitContainerInPodTemplate()));
+    }
+
+    @Test
+    public void testSideCarContainerFromPodTemplate() {
+        final Container sideCarContainer =
+                KubernetesPodTemplateTestUtils.getContainerWithName(
+                        resultPodSpec,
+                        
KubernetesPodTemplateTestUtils.TESTING_SIDE_CAR_CONTAINER_NAME);
+        assertThat(
+                sideCarContainer,
+                
is(KubernetesPodTemplateTestUtils.getSideCarContainerInPodTemplate()));
+    }
+
+    @Test
+    public void testVolumesFromPodTemplate() {
+        assertThat(
+                resultPodSpec.getVolumes(),
+                
hasItems(KubernetesPodTemplateTestUtils.getVolumesInPodTemplate()));
+    }
+
+    @Test
+    public void testMainContainerVolumeMountsFromPodTemplate() {
+        final Container mainContainer =
+                KubernetesPodTemplateTestUtils.getContainerWithName(
+                        resultPodSpec, Constants.MAIN_CONTAINER_NAME);
+        assertThat(
+                mainContainer.getVolumeMounts(),
+                
hasItems(KubernetesPodTemplateTestUtils.getVolumeMountInPodTemplate()));
+    }

Review comment:
       Looking at the assertions in these tests it is not clear to me why the 
expected values are what they are. Only when looking at the subclasses of this 
class one sees that `KubernetesPodTemplateTestUtils.getPodTemplatePath()` is 
used. This contract is not clear and can easily break. It is much easier to 
reason about tests if one sees how the test subject is set up and the 
assertions in one method.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
##########
@@ -97,4 +97,7 @@
     public static final String CHECKPOINT_COUNTER_KEY = "counter";
     public static final String CHECKPOINT_ID_KEY_PREFIX = "checkpointID-";
     public static final String COMPLETED_CHECKPOINT_FILE_SUFFIX = 
"completedCheckpoint";
+
+    public static final String TASK_MANAGER_POD_TEMPLATE_FILE_NAME =
+            "taskmanager-pod-template.yaml";

Review comment:
       Maybe add a comment that this is the path to the mounted task manager 
pod template in the JobManager pod if there was any specified.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -396,6 +401,42 @@ public static String getCommonStartCommand(
                 .collect(Collectors.toList());
     }
 
+    public static FlinkPod loadPodFromTemplateFile(
+            FlinkKubeClient kubeClient, String path, String mainContainerName) 
{
+        if (!new File(path).exists()) {
+            return new FlinkPod.Builder().build();

Review comment:
       Maybe add a log statement that we default to the standard `FlinkPod` 
because `path` could not be read.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
##########
@@ -367,6 +369,54 @@
                                             
code("FlinkKubeClient#checkAndUpdateConfigMap"))
                                     .build());
 
+    public static final ConfigOption<String> JOB_MANAGER_POD_TEMPLATE =
+            key("kubernetes.pod-template-file.jobmanager")

Review comment:
       nit: you could use `KUBERNETES_POD_TEMPLATE_FILE_KEY + ".jobmanager"`

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -396,6 +401,42 @@ public static String getCommonStartCommand(
                 .collect(Collectors.toList());
     }
 
+    public static FlinkPod loadPodFromTemplateFile(
+            FlinkKubeClient kubeClient, String path, String mainContainerName) 
{
+        if (!new File(path).exists()) {
+            return new FlinkPod.Builder().build();
+        }
+
+        final KubernetesPod pod = kubeClient.loadPodFromTemplateFile(path);
+        final List<Container> otherContainers = new ArrayList<>();
+        Container mainContainer = null;
+
+        for (Container container : 
pod.getInternalResource().getSpec().getContainers()) {
+            if (mainContainerName.equals(container.getName())) {
+                mainContainer = container;
+            } else {
+                otherContainers.add(container);
+            }
+        }
+
+        if (mainContainer == null) {
+            LOG.warn(

Review comment:
       I would make this `info` because it is a valid use case that the user 
does not want to modify the main container, right? Or is it required that the 
user specifies a main container?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -396,6 +401,42 @@ public static String getCommonStartCommand(
                 .collect(Collectors.toList());
     }
 
+    public static FlinkPod loadPodFromTemplateFile(
+            FlinkKubeClient kubeClient, String path, String mainContainerName) 
{

Review comment:
       nit changing `path` to `File` type could let us avoid converting between 
`File -> String -> File` when using the `getTaskManagerPodTemplatePathInPod` 
and it gives us a bit more of type safety.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesPodTemplateTestUtils.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+
+import java.net.URL;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Utilities for the Kubernetes pod template tests. */
+public class KubernetesPodTemplateTestUtils {
+
+    public static final String TESTING_MAIN_CONTAINER_NAME = 
"testing-main-container";
+    public static final String TESTING_SIDE_CAR_CONTAINER_NAME = 
"sidecar-log-collector";
+
+    private static final String TESTING_TEMPLATE_FILE_NAME = 
"testing-pod-template.yaml";
+
+    public static String getPodTemplatePath() {
+        final URL podTemplateUrl =
+                KubernetesPodTemplateTestUtils.class
+                        .getClassLoader()
+                        .getResource(TESTING_TEMPLATE_FILE_NAME);
+        assertThat(podTemplateUrl, not(nullValue()));
+        return podTemplateUrl.getPath();
+    }
+
+    public static Container getInitContainerInPodTemplate() {
+        return new ContainerBuilder()
+                .withName("artifacts-fetcher")
+                .withImage("testing-init-image")
+                .withCommand(
+                        "wget",
+                        "https://path/of/StateMachineExample.jar";,
+                        "-O",
+                        "/flink-artifact/myjob.jar")
+                .withVolumeMounts(
+                        new VolumeMountBuilder()
+                                .withName("flink-artifact")
+                                .withMountPath("/flink-artifact")
+                                .build())
+                .build();
+    }
+
+    public static Container getSideCarContainerInPodTemplate() {

Review comment:
       `createSideCarContainerForPodTemplate` or `createSideCarContainer`.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesFactoryWithPodTemplateTestBase.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.factory;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.KubernetesPodTemplateTestUtils;
+import org.apache.flink.kubernetes.KubernetesTestBase;
+import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
+import 
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test base for the {@link KubernetesJobManagerFactory} and {@link 
KubernetesTaskManagerFactory}
+ * with pod template. These tests will ensure that annotations, labels, 
imagePullSecrets,
+ * nodeSelector, tolerations, env, init container, sidecar container, volumes 
from pod template
+ * should be kept after all decorators.
+ */
+public abstract class KubernetesFactoryWithPodTemplateTestBase extends 
KubernetesTestBase {

Review comment:
       In general I would be cautious with using inheritance for test classes. 
This makes it a lot harder for someone new to this part of the code to 
understand what is actually happening. In most cases, one can avoid test bases 
by using composition (e.g. creating a `KubernetesTestingResource` which sets up 
the main part and can be used by test clases which want to test something for 
K8s).

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesPodTemplateTestUtils.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+
+import java.net.URL;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Utilities for the Kubernetes pod template tests. */
+public class KubernetesPodTemplateTestUtils {
+
+    public static final String TESTING_MAIN_CONTAINER_NAME = 
"testing-main-container";
+    public static final String TESTING_SIDE_CAR_CONTAINER_NAME = 
"sidecar-log-collector";
+
+    private static final String TESTING_TEMPLATE_FILE_NAME = 
"testing-pod-template.yaml";
+
+    public static String getPodTemplatePath() {
+        final URL podTemplateUrl =
+                KubernetesPodTemplateTestUtils.class
+                        .getClassLoader()
+                        .getResource(TESTING_TEMPLATE_FILE_NAME);
+        assertThat(podTemplateUrl, not(nullValue()));
+        return podTemplateUrl.getPath();
+    }
+
+    public static Container getInitContainerInPodTemplate() {
+        return new ContainerBuilder()
+                .withName("artifacts-fetcher")
+                .withImage("testing-init-image")
+                .withCommand(
+                        "wget",
+                        "https://path/of/StateMachineExample.jar";,
+                        "-O",
+                        "/flink-artifact/myjob.jar")
+                .withVolumeMounts(
+                        new VolumeMountBuilder()
+                                .withName("flink-artifact")
+                                .withMountPath("/flink-artifact")
+                                .build())
+                .build();
+    }
+
+    public static Container getSideCarContainerInPodTemplate() {
+        return new ContainerBuilder()
+                .withName("sidecar-log-collector")
+                .withImage("test-sidecar-image")
+                .withCommand("command-to-upload", "/flink-logs/jobmanager.log")
+                .withVolumeMounts(
+                        new VolumeMountBuilder()
+                                .withName("flink-logs")
+                                .withMountPath("/flink-logs")
+                                .build())
+                .build();
+    }
+
+    public static Volume[] getVolumesInPodTemplate() {
+        return new Volume[] {
+            new VolumeBuilder()
+                    .withName("flink-volume-hostpath")
+                    .withNewHostPath()
+                    .withNewPath("/tmp")
+                    .withType("Directory")
+                    .endHostPath()
+                    .build(),
+            new 
VolumeBuilder().withName("flink-artifact").withNewEmptyDir().endEmptyDir().build(),
+            new 
VolumeBuilder().withName("flink-logs").withNewEmptyDir().endEmptyDir().build()
+        };
+    }
+
+    public static VolumeMount[] getVolumeMountInPodTemplate() {

Review comment:
       Same with these names. The `InPodTemplate` does not make a lot of sense 
if we look at these methods on their own.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/utils/KubernetesUtilsTest.java
##########
@@ -75,6 +81,91 @@ public void testCheckWithFixedPort() {
         testCheckAndUpdatePortConfigOption("6123", "16123", "6123");
     }
 
+    @Test
+    public void testLoadPodFromTemplateWithNonExistPathShouldReturnEmptyPod() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        "/path/of/non-exist.yaml",
+                        
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod(), is(EMPTY_POD.getPod()));
+        assertThat(flinkPod.getMainContainer(), 
is(EMPTY_POD.getMainContainer()));
+    }
+
+    @Test
+    public void 
testLoadPodFromTemplateWithNoMainContainerShouldReturnEmptyPod() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesPodTemplateTestUtils.getPodTemplatePath(),
+                        "nonExistMainContainer");
+        assertThat(flinkPod.getMainContainer(), 
is(EMPTY_POD.getMainContainer()));
+        assertThat(flinkPod.getPod().getSpec().getContainers().size(), is(2));
+    }
+
+    @Test
+    public void testLoadPodFromTemplateAndCheckMetaData() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        KubernetesPodTemplateTestUtils.getPodTemplatePath(),
+                        
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod().getMetadata().getName(), 
is("pod-template"));

Review comment:
       It is not really clear to my why the metadata name is `pod-template` if 
I only look at this test.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/utils/KubernetesUtilsTest.java
##########
@@ -75,6 +81,91 @@ public void testCheckWithFixedPort() {
         testCheckAndUpdatePortConfigOption("6123", "16123", "6123");
     }
 
+    @Test
+    public void testLoadPodFromTemplateWithNonExistPathShouldReturnEmptyPod() {
+        final FlinkPod flinkPod =
+                KubernetesUtils.loadPodFromTemplateFile(
+                        flinkKubeClient,
+                        "/path/of/non-exist.yaml",
+                        
KubernetesPodTemplateTestUtils.TESTING_MAIN_CONTAINER_NAME);
+        assertThat(flinkPod.getPod(), is(EMPTY_POD.getPod()));
+        assertThat(flinkPod.getMainContainer(), 
is(EMPTY_POD.getMainContainer()));
+    }
+
+    @Test
+    public void 
testLoadPodFromTemplateWithNoMainContainerShouldReturnEmptyPod() {

Review comment:
       
`testLoadPodFromTemplateWithNoMainContainerShouldReturnEmptyMainContainer`

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesFactoryWithPodTemplateTestBase.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.factory;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.KubernetesPodTemplateTestUtils;
+import org.apache.flink.kubernetes.KubernetesTestBase;
+import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal;
+import 
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test base for the {@link KubernetesJobManagerFactory} and {@link 
KubernetesTaskManagerFactory}
+ * with pod template. These tests will ensure that annotations, labels, 
imagePullSecrets,
+ * nodeSelector, tolerations, env, init container, sidecar container, volumes 
from pod template
+ * should be kept after all decorators.
+ */
+public abstract class KubernetesFactoryWithPodTemplateTestBase extends 
KubernetesTestBase {
+
+    private static final String ENTRY_POINT_CLASS =
+            KubernetesSessionClusterEntrypoint.class.getCanonicalName();
+    private static final int RESOURCE_MEMORY = 1456;
+
+    /**
+     * The result pod variables should be set via {@link #setResultPod()} by 
implementation class.
+     */
+    protected ObjectMeta resultPodMeta;
+
+    protected PodSpec resultPodSpec;

Review comment:
       This is quite an implicit contract which is documented via a comment. I 
am not saying that need to change it but this is in general brittle.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -339,6 +339,11 @@ public void close() {
         this.internalClient.close();
     }
 
+    @Override
+    public KubernetesPod loadPodFromTemplateFile(String file) {
+        return new KubernetesPod(this.internalClient.pods().load(file).get());

Review comment:
       Can `get()` return `null`?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesPodTemplateTestUtils.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+
+import java.net.URL;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Utilities for the Kubernetes pod template tests. */
+public class KubernetesPodTemplateTestUtils {
+
+    public static final String TESTING_MAIN_CONTAINER_NAME = 
"testing-main-container";
+    public static final String TESTING_SIDE_CAR_CONTAINER_NAME = 
"sidecar-log-collector";
+
+    private static final String TESTING_TEMPLATE_FILE_NAME = 
"testing-pod-template.yaml";
+
+    public static String getPodTemplatePath() {
+        final URL podTemplateUrl =
+                KubernetesPodTemplateTestUtils.class
+                        .getClassLoader()
+                        .getResource(TESTING_TEMPLATE_FILE_NAME);
+        assertThat(podTemplateUrl, not(nullValue()));
+        return podTemplateUrl.getPath();
+    }
+
+    public static Container getInitContainerInPodTemplate() {

Review comment:
       Maybe add a comment that these are the containers specified in the 
`test-resources/testing-pod-template.yaml`. Maybe this should go to the class 
JavaDocs.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -396,6 +401,42 @@ public static String getCommonStartCommand(
                 .collect(Collectors.toList());
     }
 
+    public static FlinkPod loadPodFromTemplateFile(
+            FlinkKubeClient kubeClient, String path, String mainContainerName) 
{
+        if (!new File(path).exists()) {
+            return new FlinkPod.Builder().build();

Review comment:
       I am actually wondering whether we shouldn't rather fail hard because 
the user has specified a pod template which does not exist. The user might be 
surprised if he has specified the pod template but it is not respected in the 
end.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
##########
@@ -197,6 +197,11 @@ public void close() {
         closeConsumer.accept(null);
     }
 
+    @Override
+    public KubernetesPod loadPodFromTemplateFile(String file) {
+        return new KubernetesPod(new FlinkPod.Builder().build().getPod());
+    }

Review comment:
       Does any of our tests call this method? If not, then maybe throwing an 
`UnsupportedOperationException` might be better. My concern is that a future 
user of the `TestingFlinkKubeClient` who wants to load a template file might 
not see this implementation and wonder why it only creates a default 
`KubernetesPod`.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/utils/KubernetesUtilsTest.java
##########
@@ -75,6 +81,91 @@ public void testCheckWithFixedPort() {
         testCheckAndUpdatePortConfigOption("6123", "16123", "6123");
     }
 
+    @Test
+    public void testLoadPodFromTemplateWithNonExistPathShouldReturnEmptyPod() {

Review comment:
       Maybe it is better to fail hard.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to