tillrohrmann commented on a change in pull request #14629:
URL: https://github.com/apache/flink/pull/14629#discussion_r584529334



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
##########
@@ -49,64 +50,104 @@
 public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
 
     private final KubernetesJobManagerParameters 
kubernetesJobManagerParameters;
+    private final Configuration flinkConfig;
 
     public InitJobManagerDecorator(KubernetesJobManagerParameters 
kubernetesJobManagerParameters) {
         this.kubernetesJobManagerParameters = 
checkNotNull(kubernetesJobManagerParameters);
+        this.flinkConfig = 
checkNotNull(kubernetesJobManagerParameters.getFlinkConfiguration());
     }
 
     @Override
     public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
-        final Pod basicPod =
-                new PodBuilder(flinkPod.getPod())
-                        .withApiVersion(API_VERSION)
-                        .editOrNewMetadata()
-                        .withLabels(kubernetesJobManagerParameters.getLabels())
-                        
.withAnnotations(kubernetesJobManagerParameters.getAnnotations())
-                        .endMetadata()
-                        .editOrNewSpec()
-                        
.withServiceAccountName(kubernetesJobManagerParameters.getServiceAccount())
-                        
.withImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets())
-                        
.withNodeSelector(kubernetesJobManagerParameters.getNodeSelector())
-                        .withTolerations(
-                                
kubernetesJobManagerParameters.getTolerations().stream()
-                                        .map(
-                                                e ->
-                                                        
KubernetesToleration.fromMap(e)
-                                                                
.getInternalResource())
-                                        .collect(Collectors.toList()))
-                        .endSpec()
-                        .build();
+        final PodBuilder basicPodBuilder = new 
PodBuilder(flinkPod.getPodWithoutMainContainer());
+
+        // Overwrite fields
+        final String serviceAccountName =
+                KubernetesUtils.resolveUserDefinedValue(
+                        flinkConfig,
+                        KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT,
+                        kubernetesJobManagerParameters.getServiceAccount(),
+                        
flinkPod.getPodWithoutMainContainer().getSpec().getServiceAccount(),
+                        "service account name");
+        if (flinkPod.getPodWithoutMainContainer().getSpec().getRestartPolicy() 
!= null) {
+            logger.info(
+                    "The restart policy of JobManager pod will be overwritten 
to 'always' "
+                            + "since it is controlled by the Kubernetes 
deployment.");

Review comment:
       Alright, thanks for the clarification.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to