zhengcanbin commented on a change in pull request #11415: [FLINK-15667][k8s] 
Support to mount custom Hadoop configurations
URL: https://github.com/apache/flink/pull/11415#discussion_r396219238
 
 

 ##########
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/HadoopConfMountDecorator.java
 ##########
 @@ -0,0 +1,163 @@
+package org.apache.flink.kubernetes.kubeclient.decorators;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import 
org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Files;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.KeyToPath;
+import io.fabric8.kubernetes.api.model.KeyToPathBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Mount the custom Hadoop Configuration to the JobManager(s)/TaskManagers. We 
provide two options:
+ * 1. Mount a existing ConfigMap containing custom Hadoop Configuration.
+ * 2. Create and mount a dedicated ConfigMap containing the custom Hadoop 
configuration from a local directory
+ *    specified via the HADOOP_CONF_DIR or HADOOP_HOME environment variable.
+ */
+public class HadoopConfMountDecorator extends AbstractKubernetesStepDecorator {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopConfMountDecorator.class);
+
+       private final AbstractKubernetesParameters kubernetesParameters;
+
+       public HadoopConfMountDecorator(AbstractKubernetesParameters 
kubernetesParameters) {
+               this.kubernetesParameters = checkNotNull(kubernetesParameters);
+       }
+
+       @Override
+       public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
+               Volume hadoopConfVolume;
+
+               final Optional<String> existingConfigMap = 
kubernetesParameters.getExistingHadoopConfigurationConfigMap();
+               if (existingConfigMap.isPresent()) {
+                       hadoopConfVolume = new VolumeBuilder()
+                               .withName(Constants.HADOOP_CONF_VOLUME)
+                               .withNewConfigMap()
+                                       .withName(existingConfigMap.get())
+                                       .endConfigMap()
+                               .build();
+               } else {
+                       final Optional<String> 
localHadoopConfigurationDirectory = 
kubernetesParameters.getLocalHadoopConfigurationDirectory();
+                       if (!localHadoopConfigurationDirectory.isPresent()) {
+                               return flinkPod;
+                       }
+
+                       final List<File> hadoopConfigurationFileItems = 
getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
+                       if (hadoopConfigurationFileItems.isEmpty()) {
+                               LOG.warn("Found 0 files in directory {}, skip 
to mount the Hadoop Configuration ConfigMap.",
+                                       
localHadoopConfigurationDirectory.get());
+                               return flinkPod;
+                       }
+
+                       final List<KeyToPath> keyToPaths = 
hadoopConfigurationFileItems.stream()
+                               .map(file -> new KeyToPathBuilder()
+                                       .withKey(file.getName())
+                                       .withPath(file.getName())
+                                       .build())
+                               .collect(Collectors.toList());
+
+                       hadoopConfVolume = new VolumeBuilder()
+                               .withName(Constants.HADOOP_CONF_VOLUME)
+                               .withNewConfigMap()
+                                       
.withName(getHadoopConfConfigMapName(kubernetesParameters.getClusterId()))
+                                       .withItems(keyToPaths)
+                                       .endConfigMap()
+                               .build();
+               }
+
+               final Pod podWithHadoopConf = new PodBuilder(flinkPod.getPod())
+                       .editOrNewSpec()
+                               .addNewVolumeLike(hadoopConfVolume)
+                                       .endVolume()
+                               .endSpec()
+                       .build();
+
+               final Container containerWithHadoopConf = new 
ContainerBuilder(flinkPod.getMainContainer())
+                       .addNewVolumeMount()
+                               .withName(Constants.HADOOP_CONF_VOLUME)
+                               .withMountPath(Constants.HADOOP_CONF_DIR_IN_POD)
+                               .endVolumeMount()
+                       .addNewEnv()
+                               .withName(Constants.ENV_HADOOP_CONF_DIR)
+                               .withValue(Constants.HADOOP_CONF_DIR_IN_POD)
+                               .endEnv()
+                       .build();
+
+               return new FlinkPod.Builder(flinkPod)
+                       .withPod(podWithHadoopConf)
+                       .withMainContainer(containerWithHadoopConf)
+                       .build();
+       }
+
+       @Override
+       public List<HasMetadata> buildAccompanyingKubernetesResources() throws 
IOException {
+               if 
(kubernetesParameters.getExistingHadoopConfigurationConfigMap().isPresent()) {
+                       return Collections.emptyList();
+               }
+
+               final Optional<String> localHadoopConfigurationDirectory = 
kubernetesParameters.getLocalHadoopConfigurationDirectory();
+               if (!localHadoopConfigurationDirectory.isPresent()) {
+                       return Collections.emptyList();
+               }
+
+               final List<File> hadoopConfigurationFileItems = 
getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
+               if (hadoopConfigurationFileItems.isEmpty()) {
+                       LOG.warn("Found 0 files in directory {}, skip to create 
the Hadoop Configuration ConfigMap.", localHadoopConfigurationDirectory.get());
+                       return Collections.emptyList();
+               }
+
+               final Map<String, String> data = new HashMap<>();
+               for (File file: hadoopConfigurationFileItems) {
+                       data.put(file.getName(), Files.toString(file, 
StandardCharsets.UTF_8));
+               }
+
+               final ConfigMap hadoopConfigMap = new ConfigMapBuilder()
+                       .withApiVersion(Constants.API_VERSION)
+                       .withNewMetadata()
+                               
.withName(getHadoopConfConfigMapName(kubernetesParameters.getClusterId()))
+                               
.withLabels(kubernetesParameters.getCommonLabels())
+                               .endMetadata()
+                       .addToData(data)
+                       .build();
+
+               return Collections.singletonList(hadoopConfigMap);
+       }
+
+       private List<File> getHadoopConfigurationFileItems(String 
localHadoopConfigurationDirectory) {
+               final File file = new File(localHadoopConfigurationDirectory);
+               if (file.exists() && file.isDirectory()) {
+                       return 
Arrays.stream(file.listFiles()).filter(File::isFile).collect(Collectors.toList());
 
 Review comment:
   Sounds reasonable! Remove the default files.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to