This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0d8ca8d [HUDI-3104] Kafka-connect support of hadoop config environments and properties (#4451) 0d8ca8d is described below commit 0d8ca8da4e0f6651bc1f06dba5e7e37881225fdc Author: Thinking Chen <cdmikec...@hotmail.com> AuthorDate: Sun Jan 9 15:10:17 2022 +0800 [HUDI-3104] Kafka-connect support of hadoop config environments and properties (#4451) --- .../hudi/connect/utils/KafkaConnectUtils.java | 68 +++++++++++++++++++++ .../hudi/connect/writers/KafkaConnectConfigs.java | 29 +++++++++ .../apache/hudi/connect/TestHdfsConfiguration.java | 69 ++++++++++++++++++++++ .../src/test/resources/hadoop_conf/core-site.xml | 33 +++++++++++ .../src/test/resources/hadoop_conf/hdfs-site.xml | 30 ++++++++++ .../resources/hadoop_home/etc/hadoop/core-site.xml | 33 +++++++++++ .../resources/hadoop_home/etc/hadoop/hdfs-site.xml | 30 ++++++++++ 7 files changed, 292 insertions(+) diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java index cf60b9e..cc37de2 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java @@ -49,9 +49,14 @@ import org.apache.log4j.Logger; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.FileVisitOption; +import java.nio.file.Path; +import java.nio.file.Paths; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -65,6 +70,52 @@ public class KafkaConnectUtils { private static final Logger LOG = LogManager.getLogger(KafkaConnectUtils.class); private static final String HOODIE_CONF_PREFIX = "hoodie."; + public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR"; + public static final String HADOOP_HOME = "HADOOP_HOME"; + private static final List<Path> DEFAULT_HADOOP_CONF_FILES; + + static { + DEFAULT_HADOOP_CONF_FILES = new ArrayList<>(); + try { + String hadoopConfigPath = System.getenv(HADOOP_CONF_DIR); + String hadoopHomePath = System.getenv(HADOOP_HOME); + DEFAULT_HADOOP_CONF_FILES.addAll(getHadoopConfigFiles(hadoopConfigPath, hadoopHomePath)); + if (!DEFAULT_HADOOP_CONF_FILES.isEmpty()) { + LOG.info(String.format("Found Hadoop default config files %s", DEFAULT_HADOOP_CONF_FILES)); + } + } catch (IOException e) { + LOG.error("An error occurred while getting the default Hadoop configuration. " + + "Please use hadoop.conf.dir or hadoop.home to configure Hadoop environment variables", e); + } + } + + /** + * Get hadoop config files by HADOOP_CONF_DIR or HADOOP_HOME + */ + public static List<Path> getHadoopConfigFiles(String hadoopConfigPath, String hadoopHomePath) + throws IOException { + List<Path> hadoopConfigFiles = new ArrayList<>(); + if (!StringUtils.isNullOrEmpty(hadoopConfigPath)) { + hadoopConfigFiles.addAll(walkTreeForXml(Paths.get(hadoopConfigPath))); + } + if (hadoopConfigFiles.isEmpty() && !StringUtils.isNullOrEmpty(hadoopHomePath)) { + hadoopConfigFiles.addAll(walkTreeForXml(Paths.get(hadoopHomePath, "etc", "hadoop"))); + } + return hadoopConfigFiles; + } + + /** + * Files walk to find xml + */ + private static List<Path> walkTreeForXml(Path basePath) throws IOException { + if (Files.notExists(basePath)) { + return new ArrayList<>(); + } + return Files.walk(basePath, FileVisitOption.FOLLOW_LINKS) + .filter(path -> path.toFile().isFile()) + .filter(path -> path.toString().endsWith(".xml")) + .collect(Collectors.toList()); + } public static int getLatestNumPartitions(String bootstrapServers, String topicName) { Properties props = new Properties(); @@ -89,6 +140,23 @@ public class KafkaConnectUtils { */ public static Configuration getDefaultHadoopConf(KafkaConnectConfigs connectConfigs) { Configuration hadoopConf = new Configuration(); + + // add hadoop config files + if (!StringUtils.isNullOrEmpty(connectConfigs.getHadoopConfDir()) + || !StringUtils.isNullOrEmpty(connectConfigs.getHadoopConfHome())) { + try { + List<Path> configFiles = getHadoopConfigFiles(connectConfigs.getHadoopConfDir(), + connectConfigs.getHadoopConfHome()); + configFiles.forEach(f -> + hadoopConf.addResource(new org.apache.hadoop.fs.Path(f.toAbsolutePath().toUri()))); + } catch (Exception e) { + throw new HoodieException("Failed to read hadoop configuration!", e); + } + } else { + DEFAULT_HADOOP_CONF_FILES.forEach(f -> + hadoopConf.addResource(new org.apache.hadoop.fs.Path(f.toAbsolutePath().toUri()))); + } + connectConfigs.getProps().keySet().stream().filter(prop -> { // In order to prevent printing unnecessary warn logs, here filter out the hoodie // configuration items before passing to hadoop/hive configs diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java index ec03451..1200779 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectConfigs.java @@ -93,6 +93,17 @@ public class KafkaConnectConfigs extends HoodieConfig { .defaultValue(true) .withDocumentation("Commit even when some records failed to be written"); + // Reference https://docs.confluent.io/kafka-connect-hdfs/current/configuration_options.html#hdfs + public static final ConfigProperty<String> HADOOP_CONF_DIR = ConfigProperty + .key("hadoop.conf.dir") + .noDefaultValue() + .withDocumentation("The Hadoop configuration directory."); + + public static final ConfigProperty<String> HADOOP_HOME = ConfigProperty + .key("hadoop.home") + .noDefaultValue() + .withDocumentation("The Hadoop home directory."); + protected KafkaConnectConfigs() { super(); } @@ -145,6 +156,14 @@ public class KafkaConnectConfigs extends HoodieConfig { return getBoolean(ALLOW_COMMIT_ON_ERRORS); } + public String getHadoopConfDir() { + return getString(HADOOP_CONF_DIR); + } + + public String getHadoopConfHome() { + return getString(HADOOP_HOME); + } + public static class Builder { protected final KafkaConnectConfigs connectConfigs = new KafkaConnectConfigs(); @@ -185,6 +204,16 @@ public class KafkaConnectConfigs extends HoodieConfig { return this; } + public Builder withHadoopConfDir(String hadoopConfDir) { + connectConfigs.setValue(HADOOP_CONF_DIR, String.valueOf(hadoopConfDir)); + return this; + } + + public Builder withHadoopHome(String hadoopHome) { + connectConfigs.setValue(HADOOP_HOME, String.valueOf(hadoopHome)); + return this; + } + protected void setDefaults() { // Check for mandatory properties connectConfigs.setDefaults(KafkaConnectConfigs.class.getName()); diff --git a/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestHdfsConfiguration.java b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestHdfsConfiguration.java new file mode 100644 index 0000000..dca8f57 --- /dev/null +++ b/hudi-kafka-connect/src/test/java/org/apache/hudi/connect/TestHdfsConfiguration.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.connect; + +import org.apache.hudi.connect.utils.KafkaConnectUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.hudi.connect.writers.KafkaConnectConfigs; +import org.junit.jupiter.api.Test; + +import java.nio.file.Path; +import java.util.List; + +public class TestHdfsConfiguration { + + private boolean checkFiles(List<Path> paths) { + paths.removeIf(p -> { + String fileName = p.toFile().getName(); + return fileName.equals("core-site.xml") || fileName.equals("hdfs-site.xml"); + }); + return paths.isEmpty(); + } + + @Test + public void testHadoopConfigEnvs() throws Exception { + List<Path> paths = KafkaConnectUtils.getHadoopConfigFiles( + "src/test/resources/hadoop_conf", ""); + assertEquals(paths.size(), 2); + assertTrue(checkFiles(paths)); + } + + @Test + public void testHadoopHomeEnvs() throws Exception { + List<Path> paths = KafkaConnectUtils.getHadoopConfigFiles( + "","src/test/resources/hadoop_home"); + assertEquals(paths.size(), 2); + assertTrue(checkFiles(paths)); + } + + @Test + public void testKafkaConfig() throws Exception { + KafkaConnectConfigs connectConfigs = KafkaConnectConfigs.newBuilder() + .withHadoopHome("src/test/resources/hadoop_home") + .build(); + List<Path> paths = KafkaConnectUtils.getHadoopConfigFiles( + connectConfigs.getHadoopConfDir(), + connectConfigs.getHadoopConfHome() + ); + assertEquals(paths.size(), 2); + assertTrue(checkFiles(paths)); + } +} diff --git a/hudi-kafka-connect/src/test/resources/hadoop_conf/core-site.xml b/hudi-kafka-connect/src/test/resources/hadoop_conf/core-site.xml new file mode 100644 index 0000000..26efcea --- /dev/null +++ b/hudi-kafka-connect/src/test/resources/hadoop_conf/core-site.xml @@ -0,0 +1,33 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + + <property> + <name>fs.defaultFS</name> + <value>hdfs://test-hudi-path:9000</value> + <description>The name of the default file system. A URI whose + scheme and authority determine the FileSystem implementation. The + uri's scheme determines the config property (fs.SCHEME.impl) naming + the FileSystem implementation class. The uri's authority is used to + determine the host, port, etc. for a filesystem.</description> + </property> + +</configuration> diff --git a/hudi-kafka-connect/src/test/resources/hadoop_conf/hdfs-site.xml b/hudi-kafka-connect/src/test/resources/hadoop_conf/hdfs-site.xml new file mode 100644 index 0000000..0e5daec --- /dev/null +++ b/hudi-kafka-connect/src/test/resources/hadoop_conf/hdfs-site.xml @@ -0,0 +1,30 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + + <property> + <name>dfs.namenode.http-address</name> + <value>http://test-hudi-path:50070</value> + <description> + The address and the base port where the dfs namenode web ui will listen on. + </description> + </property> + +</configuration> diff --git a/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/core-site.xml b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/core-site.xml new file mode 100644 index 0000000..26efcea --- /dev/null +++ b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/core-site.xml @@ -0,0 +1,33 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + + <property> + <name>fs.defaultFS</name> + <value>hdfs://test-hudi-path:9000</value> + <description>The name of the default file system. A URI whose + scheme and authority determine the FileSystem implementation. The + uri's scheme determines the config property (fs.SCHEME.impl) naming + the FileSystem implementation class. The uri's authority is used to + determine the host, port, etc. for a filesystem.</description> + </property> + +</configuration> diff --git a/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/hdfs-site.xml b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/hdfs-site.xml new file mode 100644 index 0000000..0e5daec --- /dev/null +++ b/hudi-kafka-connect/src/test/resources/hadoop_home/etc/hadoop/hdfs-site.xml @@ -0,0 +1,30 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + + <property> + <name>dfs.namenode.http-address</name> + <value>http://test-hudi-path:50070</value> + <description> + The address and the base port where the dfs namenode web ui will listen on. + </description> + </property> + +</configuration>