This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d8ca8d  [HUDI-3104] Kafka-connect support of hadoop config 
environments and properties (#4451)
0d8ca8d is described below

commit 0d8ca8da4e0f6651bc1f06dba5e7e37881225fdc
Author: Thinking Chen <cdmikec...@hotmail.com>
AuthorDate: Sun Jan 9 15:10:17 2022 +0800

    [HUDI-3104] Kafka-connect support of hadoop config environments and 
properties (#4451)
---
 .../hudi/connect/utils/KafkaConnectUtils.java      | 68 +++++++++++++++++++++
 .../hudi/connect/writers/KafkaConnectConfigs.java  | 29 +++++++++
 .../apache/hudi/connect/TestHdfsConfiguration.java | 69 ++++++++++++++++++++++
 .../src/test/resources/hadoop_conf/core-site.xml   | 33 +++++++++++
 .../src/test/resources/hadoop_conf/hdfs-site.xml   | 30 ++++++++++
 .../resources/hadoop_home/etc/hadoop/core-site.xml | 33 +++++++++++
 .../resources/hadoop_home/etc/hadoop/hdfs-site.xml | 30 ++++++++++
 7 files changed, 292 insertions(+)

diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
index cf60b9e..cc37de2 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java
@@ -49,9 +49,14 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.FileVisitOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -65,6 +70,52 @@ public class KafkaConnectUtils {
 
   private static final Logger LOG = 
LogManager.getLogger(KafkaConnectUtils.class);
   private static final String HOODIE_CONF_PREFIX = "hoodie.";
+  public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
+  public static final String HADOOP_HOME = "HADOOP_HOME";
+  private static final List<Path> DEFAULT_HADOOP_CONF_FILES;
+
+  static {
+    DEFAULT_HADOOP_CONF_FILES = new ArrayList<>();
+    try {
+      String hadoopConfigPath = System.getenv(HADOOP_CONF_DIR);
+      String hadoopHomePath = System.getenv(HADOOP_HOME);
+      DEFAULT_HADOOP_CONF_FILES.addAll(getHadoopConfigFiles(hadoopConfigPath, 
hadoopHomePath));
+      if (!DEFAULT_HADOOP_CONF_FILES.isEmpty()) {
+        LOG.info(String.format("Found Hadoop default config files %s", 
DEFAULT_HADOOP_CONF_FILES));
+      }
+    } catch (IOException e) {
+      LOG.error("An error occurred while getting the default Hadoop 
configuration. "
+              + "Please use hadoop.conf.dir or hadoop.home to configure Hadoop 
environment variables", e);
+    }
+  }
+
+  /**
+   * Get hadoop config files by HADOOP_CONF_DIR or HADOOP_HOME
+   */
+  public static List<Path> getHadoopConfigFiles(String hadoopConfigPath, 
String hadoopHomePath)
+          throws IOException {
+    List<Path> hadoopConfigFiles = new ArrayList<>();
+    if (!StringUtils.isNullOrEmpty(hadoopConfigPath)) {
+      hadoopConfigFiles.addAll(walkTreeForXml(Paths.get(hadoopConfigPath)));
+    }
+    if (hadoopConfigFiles.isEmpty() && 
!StringUtils.isNullOrEmpty(hadoopHomePath)) {
+      hadoopConfigFiles.addAll(walkTreeForXml(Paths.get(hadoopHomePath, "etc", 
"hadoop")));
+    }
+    return hadoopConfigFiles;
+  }
+
+  /**
+   * Files walk to find xml
+   */
+  private static List<Path> walkTreeForXml(Path basePath) throws IOException {
+    if (Files.notExists(basePath)) {
+      return new ArrayList<>();
+    }
+    return Files.walk(basePath, FileVisitOption.FOLLOW_LINKS)
+            .filter(path -> path.toFile().isFile())
+            .filter(path -> path.toString().endsWith(".xml"))
+            .collect(Collectors.toList());
+  }
 
   public static int getLatestNumPartitions(String bootstrapServers, String 
topicName) {
     Properties props = new Properties();
@@ -89,6 +140,23 @@ public class KafkaConnectUtils {
    */
   public static Configuration getDefaultHadoopConf(KafkaConnectConfigs 
connectConfigs) {
     Configuration hadoopConf = new Configuration();
+
+    // add hadoop config files
+    if (!StringUtils.isNullOrEmpty(connectConfigs.getHadoopConfDir())
+            || !StringUtils.isNullOrEmpty(connectConfigs.getHadoopConfHome())) 
{
+      try {
+        List<Path> configFiles = 
getHadoopConfigFiles(connectConfigs.getHadoopConfDir(),
+                connectConfigs.getHadoopConfHome());
+        configFiles.forEach(f ->
+                hadoopConf.addResource(new 
org.apache.hadoop.fs.Path(f.toAbsolutePath().toUri())));
+      } catch (Exception e) {
+        throw new HoodieException("Failed to read hadoop configuration!", e);
+      }
+    } else {
+      DEFAULT_HADOOP_CONF_FILES.forEach(f ->
+              hadoopConf.addResource(new 
org.apache.hadoop.fs.Path(f.toAbsolutePath().toUri())));
+    }
+
     connectConfigs.getProps().keySet().stream().filter(prop -> {
       // In order to prevent printing unnecessary warn logs, here filter out 
the hoodie
       // configuration items before passing to hadoop/hive configs
diff --git 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
index ec03451..1200779 100644
--- 
a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
+++ 
b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java
@@ -93,6 +93,17 @@ public class KafkaConnectConfigs extends HoodieConfig {
       .defaultValue(true)
       .withDocumentation("Commit even when some records failed to be written");
 
+  // Reference 
https://docs.confluent.io/kafka-connect-hdfs/current/configuration_options.html#hdfs
+  public static final ConfigProperty<String> HADOOP_CONF_DIR = ConfigProperty
+          .key("hadoop.conf.dir")
+          .noDefaultValue()
+          .withDocumentation("The Hadoop configuration directory.");
+
+  public static final ConfigProperty<String> HADOOP_HOME = ConfigProperty
+          .key("hadoop.home")
+          .noDefaultValue()
+          .withDocumentation("The Hadoop home directory.");
+
   protected KafkaConnectConfigs() {
     super();
   }
@@ -145,6 +156,14 @@ public class KafkaConnectConfigs extends HoodieConfig {
     return getBoolean(ALLOW_COMMIT_ON_ERRORS);
   }
 
+  public String getHadoopConfDir() {
+    return getString(HADOOP_CONF_DIR);
+  }
+
+  public String getHadoopConfHome() {
+    return getString(HADOOP_HOME);
+  }
+
   public static class Builder {
 
     protected final KafkaConnectConfigs connectConfigs = new 
KafkaConnectConfigs();
@@ -185,6 +204,16 @@ public class KafkaConnectConfigs extends HoodieConfig {
       return this;
     }
 
+    public Builder withHadoopConfDir(String hadoopConfDir) {
+      connectConfigs.setValue(HADOOP_CONF_DIR, String.valueOf(hadoopConfDir));
+      return this;
+    }
+
+    public Builder withHadoopHome(String hadoopHome) {
+      connectConfigs.setValue(HADOOP_HOME, String.valueOf(hadoopHome));
+      return this;
+    }
+
     protected void setDefaults() {
       // Check for mandatory properties
       connectConfigs.setDefaults(KafkaConnectConfigs.class.getName());
diff --git 
a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestHdfsConfiguration.java
 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestHdfsConfiguration.java
new file mode 100644
index 0000000..dca8f57
--- /dev/null
+++ 
b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestHdfsConfiguration.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.connect;
+
+import org.apache.hudi.connect.utils.KafkaConnectUtils;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.hudi.connect.writers.KafkaConnectConfigs;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Path;
+import java.util.List;
+
+public class TestHdfsConfiguration {
+
+  private boolean checkFiles(List<Path> paths) {
+    paths.removeIf(p -> {
+      String fileName = p.toFile().getName();
+      return fileName.equals("core-site.xml") || 
fileName.equals("hdfs-site.xml");
+    });
+    return paths.isEmpty();
+  }
+
+  @Test
+  public void testHadoopConfigEnvs() throws Exception {
+    List<Path> paths = KafkaConnectUtils.getHadoopConfigFiles(
+        "src/test/resources/hadoop_conf", "");
+    assertEquals(paths.size(), 2);
+    assertTrue(checkFiles(paths));
+  }
+
+  @Test
+  public void testHadoopHomeEnvs() throws Exception {
+    List<Path> paths = KafkaConnectUtils.getHadoopConfigFiles(
+        "","src/test/resources/hadoop_home");
+    assertEquals(paths.size(), 2);
+    assertTrue(checkFiles(paths));
+  }
+
+  @Test
+  public void testKafkaConfig() throws Exception {
+    KafkaConnectConfigs connectConfigs = KafkaConnectConfigs.newBuilder()
+        .withHadoopHome("src/test/resources/hadoop_home")
+        .build();
+    List<Path> paths = KafkaConnectUtils.getHadoopConfigFiles(
+        connectConfigs.getHadoopConfDir(),
+        connectConfigs.getHadoopConfHome()
+    );
+    assertEquals(paths.size(), 2);
+    assertTrue(checkFiles(paths));
+  }
+}
diff --git a/hudi-kafka-connect/src/test/resources/hadoop_conf/core-site.xml 
b/hudi-kafka-connect/src/test/resources/hadoop_conf/core-site.xml
new file mode 100644
index 0000000..26efcea
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/hadoop_conf/core-site.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration>
+
+    <property>
+        <name>fs.defaultFS</name>
+        <value>hdfs://test-hudi-path:9000</value>
+        <description>The name of the default file system.  A URI whose
+            scheme and authority determine the FileSystem implementation.  The
+            uri's scheme determines the config property (fs.SCHEME.impl) naming
+            the FileSystem implementation class.  The uri's authority is used 
to
+            determine the host, port, etc. for a filesystem.</description>
+    </property>
+
+</configuration>
diff --git a/hudi-kafka-connect/src/test/resources/hadoop_conf/hdfs-site.xml 
b/hudi-kafka-connect/src/test/resources/hadoop_conf/hdfs-site.xml
new file mode 100644
index 0000000..0e5daec
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/hadoop_conf/hdfs-site.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+
+    <property>
+        <name>dfs.namenode.http-address</name>
+        <value>http://test-hudi-path:50070</value>
+        <description>
+            The address and the base port where the dfs namenode web ui will 
listen on.
+        </description>
+    </property>
+
+</configuration>
diff --git 
a/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/core-site.xml 
b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/core-site.xml
new file mode 100644
index 0000000..26efcea
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/core-site.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration>
+
+    <property>
+        <name>fs.defaultFS</name>
+        <value>hdfs://test-hudi-path:9000</value>
+        <description>The name of the default file system.  A URI whose
+            scheme and authority determine the FileSystem implementation.  The
+            uri's scheme determines the config property (fs.SCHEME.impl) naming
+            the FileSystem implementation class.  The uri's authority is used 
to
+            determine the host, port, etc. for a filesystem.</description>
+    </property>
+
+</configuration>
diff --git 
a/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/hdfs-site.xml 
b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/hdfs-site.xml
new file mode 100644
index 0000000..0e5daec
--- /dev/null
+++ b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/hdfs-site.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+
+    <property>
+        <name>dfs.namenode.http-address</name>
+        <value>http://test-hudi-path:50070</value>
+        <description>
+            The address and the base port where the dfs namenode web ui will 
listen on.
+        </description>
+    </property>
+
+</configuration>

Reply via email to