This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new c23a577f34 [Hotfix][Hive Connector] Fix Hive hdfs-site.xml and hive-site.xml not be load error (#7069) c23a577f34 is described below commit c23a577f3442a65fc8e396a6feaaf1ce4e261ca9 Author: Eric <gaojun2...@gmail.com> AuthorDate: Mon Jul 1 14:33:55 2024 +0800 [Hotfix][Hive Connector] Fix Hive hdfs-site.xml and hive-site.xml not be load error (#7069) --- docs/en/connector-v2/sink/Hive.md | 2 + docs/en/connector-v2/source/Hive.md | 164 +++++++++++++++++++-- .../seatunnel/file/config/HadoopConf.java | 3 +- .../hive/exception/HiveConnectorErrorCode.java | 1 + .../seatunnel/hive/storage/AbstractStorage.java | 95 ++++++------ .../seatunnel/hive/utils/HiveMetaStoreProxy.java | 58 ++++++-- 6 files changed, 245 insertions(+), 78 deletions(-) diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md index dac4a814c2..023bb38ddb 100644 --- a/docs/en/connector-v2/sink/Hive.md +++ b/docs/en/connector-v2/sink/Hive.md @@ -176,6 +176,7 @@ sink { metastore_uri = "thrift://ctyun7:9083" hive.hadoop.conf = { bucket = "s3a://mybucket" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" } } ``` @@ -258,6 +259,7 @@ sink { hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf" hive.hadoop.conf = { bucket="s3://ws-package" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" } } } diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md index bb7b851409..da70cf7aa3 100644 --- a/docs/en/connector-v2/source/Hive.md +++ b/docs/en/connector-v2/source/Hive.md @@ -33,19 +33,21 @@ Read all the data in a split in a pollNext call. What splits are read will be sa ## Options -| name | type | required | default value | -|----------------------|--------|----------|----------------| -| table_name | string | yes | - | -| metastore_uri | string | yes | - | -| krb5_path | string | no | /etc/krb5.conf | -| kerberos_principal | string | no | - | -| kerberos_keytab_path | string | no | - | -| hdfs_site_path | string | no | - | -| hive_site_path | string | no | - | -| read_partitions | list | no | - | -| read_columns | list | no | - | -| compress_codec | string | no | none | -| common-options | | no | - | +| name | type | required | default value | +|-----------------------|--------|----------|----------------| +| table_name | string | yes | - | +| metastore_uri | string | yes | - | +| krb5_path | string | no | /etc/krb5.conf | +| kerberos_principal | string | no | - | +| kerberos_keytab_path | string | no | - | +| hdfs_site_path | string | no | - | +| hive_site_path | string | no | - | +| hive.hadoop.conf | Map | no | - | +| hive.hadoop.conf-path | string | no | - | +| read_partitions | list | no | - | +| read_columns | list | no | - | +| compress_codec | string | no | none | +| common-options | | no | - | ### table_name [string] @@ -59,6 +61,14 @@ Hive metastore uri The path of `hdfs-site.xml`, used to load ha configuration of namenodes +### hive.hadoop.conf [map] + +Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml') + +### hive.hadoop.conf-path [string] + +The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files + ### read_partitions [list] The target partitions that user want to read from hive table, if user does not set this parameter, it will read all the data from hive table. @@ -128,6 +138,134 @@ Source plugin common parameters, please refer to [Source Common Options](common- ``` +## Hive on s3 + +### Step 1 + +Create the lib dir for hive of emr. + +```shell +mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib +``` + +### Step 2 + +Get the jars from maven center to the lib. + +```shell +cd ${SEATUNNEL_HOME}/plugins/Hive/lib +wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar +wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar +``` + +### Step 3 + +Copy the jars from your environment on emr to the lib dir. + +```shell +cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib +cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib +cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib +cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib +``` + +### Step 4 + +Run the case. + +```shell +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Hive { + table_name = "test_hive.test_hive_sink_on_s3" + metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083" + hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf" + hive.hadoop.conf = { + bucket="s3://ws-package" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" + } + read_columns = ["pk_id", "name", "score"] + } +} + +sink { + Hive { + table_name = "test_hive.test_hive_sink_on_s3_sink" + metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083" + hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf" + hive.hadoop.conf = { + bucket="s3://ws-package" + fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" + } + } +} +``` + +## Hive on oss + +### Step 1 + +Create the lib dir for hive of emr. + +```shell +mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib +``` + +### Step 2 + +Get the jars from maven center to the lib. + +```shell +cd ${SEATUNNEL_HOME}/plugins/Hive/lib +wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar +``` + +### Step 3 + +Copy the jars from your environment on emr to the lib dir and delete the conflicting jar. + +```shell +cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib +rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar +``` + +### Step 4 + +Run the case. + +```shell +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Hive { + table_name = "test_hive.test_hive_sink_on_oss" + metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083" + hive.hadoop.conf-path = "/tmp/hadoop" + hive.hadoop.conf = { + bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com" + } + } +} + +sink { + Hive { + table_name = "test_hive.test_hive_sink_on_oss_sink" + metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083" + hive.hadoop.conf-path = "/tmp/hadoop" + hive.hadoop.conf = { + bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com" + } + } +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java index 3a88fa3b3d..c7a2182753 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.config; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.Path; @@ -62,7 +63,7 @@ public class HadoopConf implements Serializable { removeUnwantedOverwritingProps(extraOptions); extraOptions.forEach(configuration::set); } - if (hdfsSitePath != null) { + if (StringUtils.isNotBlank(hdfsSitePath)) { Configuration hdfsSiteConfiguration = new Configuration(); hdfsSiteConfiguration.addResource(new Path(hdfsSitePath)); unsetUnwantedOverwritingProps(hdfsSiteConfiguration); diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java index a0923acc1b..bf3268bdbe 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java @@ -25,6 +25,7 @@ public enum HiveConnectorErrorCode implements SeaTunnelErrorCode { GET_HIVE_TABLE_INFORMATION_FAILED( "HIVE-03", "Get hive table information from hive metastore service failed"), HIVE_TABLE_NAME_ERROR("HIVE-04", "Hive table name is invalid"), + LOAD_HIVE_BASE_HADOOP_CONFIG_FAILED("HIVE-05", "Load hive base hadoop config failed"), ; private final String code; diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java index 22f9e61880..7f453c936b 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java @@ -23,8 +23,11 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; +import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -63,57 +66,49 @@ public abstract class AbstractStorage implements Storage { * @return */ protected Configuration loadHiveBaseHadoopConfig(ReadonlyConfig readonlyConfig) { - Configuration configuration = new Configuration(); - // Try to load from hadoop_conf_path(The Bucket configuration is typically in core-site.xml) - Optional<String> hadoopConfPath = readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH); - if (hadoopConfPath.isPresent()) { - HADOOP_CONF_FILES.forEach( - confFile -> { - java.nio.file.Path path = Paths.get(hadoopConfPath.get(), confFile); - if (Files.exists(path)) { - try { - configuration.addResource(path.toUri().toURL()); - } catch (IOException e) { - log.warn( - "Error adding Hadoop resource {}, resource was not added", - path, - e); - } - } - }); - } - readonlyConfig - .getOptional(BaseSinkConfig.HDFS_SITE_PATH) - .ifPresent( - hdfsSitePath -> { - try { - configuration.addResource(new File(hdfsSitePath).toURI().toURL()); - } catch (IOException e) { - log.warn( - "Error adding Hadoop resource {}, resource was not added", - hdfsSitePath, - e); + try { + Configuration configuration = new Configuration(); + // Try to load from hadoop_conf_path(The Bucket configuration is typically in + // core-site.xml) + Optional<String> hadoopConfPath = + readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH); + if (hadoopConfPath.isPresent()) { + HADOOP_CONF_FILES.forEach( + confFile -> { + java.nio.file.Path path = Paths.get(hadoopConfPath.get(), confFile); + if (Files.exists(path)) { + try { + configuration.addResource(path.toUri().toURL()); + } catch (IOException e) { + log.warn( + "Error adding Hadoop resource {}, resource was not added", + path, + e); + } } }); - readonlyConfig - .getOptional(HiveConfig.HIVE_SITE_PATH) - .ifPresent( - hiveSitePath -> { - try { - configuration.addResource(new File(hiveSitePath).toURI().toURL()); - } catch (IOException e) { - log.warn( - "Error adding Hadoop resource {}, resource was not added", - hiveSitePath, - e); - } - }); - // Try to load from hadoopConf - Optional<Map<String, String>> hadoopConf = - readonlyConfig.getOptional(HiveConfig.HADOOP_CONF); - if (hadoopConf.isPresent()) { - hadoopConf.get().forEach((k, v) -> configuration.set(k, v)); + } + String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH); + String hdfsSitePath = readonlyConfig.get(HdfsSourceConfigOptions.HDFS_SITE_PATH); + if (StringUtils.isNotBlank(hdfsSitePath)) { + configuration.addResource(new File(hdfsSitePath).toURI().toURL()); + } + + if (StringUtils.isNotBlank(hiveSitePath)) { + configuration.addResource(new File(hiveSitePath).toURI().toURL()); + } + // Try to load from hadoopConf + Optional<Map<String, String>> hadoopConf = + readonlyConfig.getOptional(HiveConfig.HADOOP_CONF); + if (hadoopConf.isPresent()) { + hadoopConf.get().forEach((k, v) -> configuration.set(k, v)); + } + return configuration; + } catch (Exception e) { + String errorMsg = String.format("Failed to load hadoop configuration, please check it"); + log.error(errorMsg + ":" + ExceptionUtils.getMessage(e)); + throw new HiveConnectorException( + HiveConnectorErrorCode.LOAD_HIVE_BASE_HADOOP_CONFIG_FAILED, e); } - return configuration; } } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java index d4d8ca3b7f..62d917ca0d 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java @@ -17,9 +17,12 @@ package org.apache.seatunnel.connectors.seatunnel.hive.utils; +import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList; + import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory; +import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; @@ -37,7 +40,10 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import java.io.File; +import java.io.IOException; import java.net.MalformedURLException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.List; import java.util.Objects; @@ -45,36 +51,60 @@ import java.util.Objects; public class HiveMetaStoreProxy { private HiveMetaStoreClient hiveMetaStoreClient; private static volatile HiveMetaStoreProxy INSTANCE = null; + private static final List<String> HADOOP_CONF_FILES = ImmutableList.of("hive-site.xml"); private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) { String metastoreUri = readonlyConfig.get(HiveSourceOptions.METASTORE_URI); + String hiveHadoopConfigPath = readonlyConfig.get(HiveConfig.HADOOP_CONF_PATH); + String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH); HiveConf hiveConf = new HiveConf(); hiveConf.set("hive.metastore.uris", metastoreUri); try { - if (StringUtils.isNotEmpty(readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH))) { - String hiveSitePath = readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH); + if (StringUtils.isNotBlank(hiveHadoopConfigPath)) { + HADOOP_CONF_FILES.forEach( + confFile -> { + java.nio.file.Path path = Paths.get(hiveHadoopConfigPath, confFile); + if (Files.exists(path)) { + try { + hiveConf.addResource(path.toUri().toURL()); + } catch (IOException e) { + log.warn( + "Error adding Hadoop resource {}, resource was not added", + path, + e); + } + } + }); + } + + if (StringUtils.isNotBlank(hiveSitePath)) { hiveConf.addResource(new File(hiveSitePath).toURI().toURL()); } + + log.info("hive client conf:{}", hiveConf); if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) { - Configuration hadoopConfig = new Configuration(); - hadoopConfig.set("hadoop.security.authentication", "kerberos"); + // login Kerberos + Configuration authConf = new Configuration(); + authConf.set("hadoop.security.authentication", "kerberos"); this.hiveMetaStoreClient = HadoopLoginFactory.loginWithKerberos( - hadoopConfig, - readonlyConfig.get(BaseSourceConfigOptions.KRB5_PATH), - readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_PRINCIPAL), - readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH), - (configuration, userGroupInformation) -> - new HiveMetaStoreClient(hiveConf)); + authConf, + readonlyConfig.get(HdfsSourceConfigOptions.KRB5_PATH), + readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL), + readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH), + (conf, userGroupInformation) -> { + return new HiveMetaStoreClient(hiveConf); + }); return; } if (HiveMetaStoreProxyUtils.enableRemoteUser(readonlyConfig)) { this.hiveMetaStoreClient = HadoopLoginFactory.loginWithRemoteUser( new Configuration(), - readonlyConfig.get(BaseSourceConfigOptions.REMOTE_USER), - (configuration, userGroupInformation) -> - new HiveMetaStoreClient(hiveConf)); + readonlyConfig.get(HdfsSourceConfigOptions.REMOTE_USER), + (conf, userGroupInformation) -> { + return new HiveMetaStoreClient(hiveConf); + }); return; } this.hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);