This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c23a577f34 [Hotfix][Hive Connector] Fix Hive hdfs-site.xml and 
hive-site.xml not be load error (#7069)
c23a577f34 is described below

commit c23a577f3442a65fc8e396a6feaaf1ce4e261ca9
Author: Eric <gaojun2...@gmail.com>
AuthorDate: Mon Jul 1 14:33:55 2024 +0800

    [Hotfix][Hive Connector] Fix Hive hdfs-site.xml and hive-site.xml not be 
load error (#7069)
---
 docs/en/connector-v2/sink/Hive.md                  |   2 +
 docs/en/connector-v2/source/Hive.md                | 164 +++++++++++++++++++--
 .../seatunnel/file/config/HadoopConf.java          |   3 +-
 .../hive/exception/HiveConnectorErrorCode.java     |   1 +
 .../seatunnel/hive/storage/AbstractStorage.java    |  95 ++++++------
 .../seatunnel/hive/utils/HiveMetaStoreProxy.java   |  58 ++++++--
 6 files changed, 245 insertions(+), 78 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hive.md 
b/docs/en/connector-v2/sink/Hive.md
index dac4a814c2..023bb38ddb 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -176,6 +176,7 @@ sink {
     metastore_uri = "thrift://ctyun7:9083"
     hive.hadoop.conf = {
       bucket = "s3a://mybucket"
+      
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
     }
 }
 ```
@@ -258,6 +259,7 @@ sink {
     hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
     hive.hadoop.conf = {
        bucket="s3://ws-package"
+       
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
     }
   }
 }
diff --git a/docs/en/connector-v2/source/Hive.md 
b/docs/en/connector-v2/source/Hive.md
index bb7b851409..da70cf7aa3 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -33,19 +33,21 @@ Read all the data in a split in a pollNext call. What 
splits are read will be sa
 
 ## Options
 
-|         name         |  type  | required | default value  |
-|----------------------|--------|----------|----------------|
-| table_name           | string | yes      | -              |
-| metastore_uri        | string | yes      | -              |
-| krb5_path            | string | no       | /etc/krb5.conf |
-| kerberos_principal   | string | no       | -              |
-| kerberos_keytab_path | string | no       | -              |
-| hdfs_site_path       | string | no       | -              |
-| hive_site_path       | string | no       | -              |
-| read_partitions      | list   | no       | -              |
-| read_columns         | list   | no       | -              |
-| compress_codec       | string | no       | none           |
-| common-options       |        | no       | -              |
+|         name          |  type  | required | default value  |
+|-----------------------|--------|----------|----------------|
+| table_name            | string | yes      | -              |
+| metastore_uri         | string | yes      | -              |
+| krb5_path             | string | no       | /etc/krb5.conf |
+| kerberos_principal    | string | no       | -              |
+| kerberos_keytab_path  | string | no       | -              |
+| hdfs_site_path        | string | no       | -              |
+| hive_site_path        | string | no       | -              |
+| hive.hadoop.conf      | Map    | no       | -              |
+| hive.hadoop.conf-path | string | no       | -              |
+| read_partitions       | list   | no       | -              |
+| read_columns          | list   | no       | -              |
+| compress_codec        | string | no       | none           |
+| common-options        |        | no       | -              |
 
 ### table_name [string]
 
@@ -59,6 +61,14 @@ Hive metastore uri
 
 The path of `hdfs-site.xml`, used to load ha configuration of namenodes
 
+### hive.hadoop.conf [map]
+
+Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml')
+
+### hive.hadoop.conf-path [string]
+
+The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files
+
 ### read_partitions [list]
 
 The target partitions that user want to read from hive table, if user does not 
set this parameter, it will read all the data from hive table.
@@ -128,6 +138,134 @@ Source plugin common parameters, please refer to [Source 
Common Options](common-
 
 ```
 
+## Hive on s3
+
+### Step 1
+
+Create the lib dir for hive of emr.
+
+```shell
+mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 2
+
+Get the jars from maven center to the lib.
+
+```shell
+cd ${SEATUNNEL_HOME}/plugins/Hive/lib
+wget 
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
+wget 
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
+```
+
+### Step 3
+
+Copy the jars from your environment on emr to the lib dir.
+
+```shell
+cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 4
+
+Run the case.
+
+```shell
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_s3"
+    metastore_uri = 
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+    hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+    hive.hadoop.conf = {
+       bucket="s3://ws-package"
+       
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
+    }
+    read_columns = ["pk_id", "name", "score"]
+  }
+}
+
+sink {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_s3_sink"
+    metastore_uri = 
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+    hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+    hive.hadoop.conf = {
+       bucket="s3://ws-package"
+       
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
+    }
+  }
+}
+```
+
+## Hive on oss
+
+### Step 1
+
+Create the lib dir for hive of emr.
+
+```shell
+mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 2
+
+Get the jars from maven center to the lib.
+
+```shell
+cd ${SEATUNNEL_HOME}/plugins/Hive/lib
+wget 
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
+```
+
+### Step 3
+
+Copy the jars from your environment on emr to the lib dir and delete the 
conflicting jar.
+
+```shell
+cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
+```
+
+### Step 4
+
+Run the case.
+
+```shell
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_oss"
+    metastore_uri = 
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    hive.hadoop.conf = {
+        bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+    }
+  }
+}
+
+sink {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_oss_sink"
+    metastore_uri = 
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    hive.hadoop.conf = {
+        bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+    }
+  }
+}
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index 3a88fa3b3d..c7a2182753 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.Path;
@@ -62,7 +63,7 @@ public class HadoopConf implements Serializable {
             removeUnwantedOverwritingProps(extraOptions);
             extraOptions.forEach(configuration::set);
         }
-        if (hdfsSitePath != null) {
+        if (StringUtils.isNotBlank(hdfsSitePath)) {
             Configuration hdfsSiteConfiguration = new Configuration();
             hdfsSiteConfiguration.addResource(new Path(hdfsSitePath));
             unsetUnwantedOverwritingProps(hdfsSiteConfiguration);
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
index a0923acc1b..bf3268bdbe 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
@@ -25,6 +25,7 @@ public enum HiveConnectorErrorCode implements 
SeaTunnelErrorCode {
     GET_HIVE_TABLE_INFORMATION_FAILED(
             "HIVE-03", "Get hive table information from hive metastore service 
failed"),
     HIVE_TABLE_NAME_ERROR("HIVE-04", "Hive table name is invalid"),
+    LOAD_HIVE_BASE_HADOOP_CONFIG_FAILED("HIVE-05", "Load hive base hadoop 
config failed"),
     ;
 
     private final String code;
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
index 22f9e61880..7f453c936b 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
@@ -23,8 +23,11 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -63,57 +66,49 @@ public abstract class AbstractStorage implements Storage {
      * @return
      */
     protected Configuration loadHiveBaseHadoopConfig(ReadonlyConfig 
readonlyConfig) {
-        Configuration configuration = new Configuration();
-        // Try to load from hadoop_conf_path(The Bucket configuration is 
typically in core-site.xml)
-        Optional<String> hadoopConfPath = 
readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH);
-        if (hadoopConfPath.isPresent()) {
-            HADOOP_CONF_FILES.forEach(
-                    confFile -> {
-                        java.nio.file.Path path = 
Paths.get(hadoopConfPath.get(), confFile);
-                        if (Files.exists(path)) {
-                            try {
-                                
configuration.addResource(path.toUri().toURL());
-                            } catch (IOException e) {
-                                log.warn(
-                                        "Error adding Hadoop resource {}, 
resource was not added",
-                                        path,
-                                        e);
-                            }
-                        }
-                    });
-        }
-        readonlyConfig
-                .getOptional(BaseSinkConfig.HDFS_SITE_PATH)
-                .ifPresent(
-                        hdfsSitePath -> {
-                            try {
-                                configuration.addResource(new 
File(hdfsSitePath).toURI().toURL());
-                            } catch (IOException e) {
-                                log.warn(
-                                        "Error adding Hadoop resource {}, 
resource was not added",
-                                        hdfsSitePath,
-                                        e);
+        try {
+            Configuration configuration = new Configuration();
+            // Try to load from hadoop_conf_path(The Bucket configuration is 
typically in
+            // core-site.xml)
+            Optional<String> hadoopConfPath =
+                    readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH);
+            if (hadoopConfPath.isPresent()) {
+                HADOOP_CONF_FILES.forEach(
+                        confFile -> {
+                            java.nio.file.Path path = 
Paths.get(hadoopConfPath.get(), confFile);
+                            if (Files.exists(path)) {
+                                try {
+                                    
configuration.addResource(path.toUri().toURL());
+                                } catch (IOException e) {
+                                    log.warn(
+                                            "Error adding Hadoop resource {}, 
resource was not added",
+                                            path,
+                                            e);
+                                }
                             }
                         });
-        readonlyConfig
-                .getOptional(HiveConfig.HIVE_SITE_PATH)
-                .ifPresent(
-                        hiveSitePath -> {
-                            try {
-                                configuration.addResource(new 
File(hiveSitePath).toURI().toURL());
-                            } catch (IOException e) {
-                                log.warn(
-                                        "Error adding Hadoop resource {}, 
resource was not added",
-                                        hiveSitePath,
-                                        e);
-                            }
-                        });
-        // Try to load from hadoopConf
-        Optional<Map<String, String>> hadoopConf =
-                readonlyConfig.getOptional(HiveConfig.HADOOP_CONF);
-        if (hadoopConf.isPresent()) {
-            hadoopConf.get().forEach((k, v) -> configuration.set(k, v));
+            }
+            String hiveSitePath = 
readonlyConfig.get(HiveConfig.HIVE_SITE_PATH);
+            String hdfsSitePath = 
readonlyConfig.get(HdfsSourceConfigOptions.HDFS_SITE_PATH);
+            if (StringUtils.isNotBlank(hdfsSitePath)) {
+                configuration.addResource(new 
File(hdfsSitePath).toURI().toURL());
+            }
+
+            if (StringUtils.isNotBlank(hiveSitePath)) {
+                configuration.addResource(new 
File(hiveSitePath).toURI().toURL());
+            }
+            // Try to load from hadoopConf
+            Optional<Map<String, String>> hadoopConf =
+                    readonlyConfig.getOptional(HiveConfig.HADOOP_CONF);
+            if (hadoopConf.isPresent()) {
+                hadoopConf.get().forEach((k, v) -> configuration.set(k, v));
+            }
+            return configuration;
+        } catch (Exception e) {
+            String errorMsg = String.format("Failed to load hadoop 
configuration, please check it");
+            log.error(errorMsg + ":" + ExceptionUtils.getMessage(e));
+            throw new HiveConnectorException(
+                    
HiveConnectorErrorCode.LOAD_HIVE_BASE_HADOOP_CONFIG_FAILED, e);
         }
-        return configuration;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index d4d8ca3b7f..62d917ca0d 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -17,9 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.utils;
 
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
+
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
@@ -37,7 +40,10 @@ import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.MalformedURLException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Objects;
 
@@ -45,36 +51,60 @@ import java.util.Objects;
 public class HiveMetaStoreProxy {
     private HiveMetaStoreClient hiveMetaStoreClient;
     private static volatile HiveMetaStoreProxy INSTANCE = null;
+    private static final List<String> HADOOP_CONF_FILES = 
ImmutableList.of("hive-site.xml");
 
     private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
         String metastoreUri = 
readonlyConfig.get(HiveSourceOptions.METASTORE_URI);
+        String hiveHadoopConfigPath = 
readonlyConfig.get(HiveConfig.HADOOP_CONF_PATH);
+        String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH);
         HiveConf hiveConf = new HiveConf();
         hiveConf.set("hive.metastore.uris", metastoreUri);
         try {
-            if 
(StringUtils.isNotEmpty(readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH))) {
-                String hiveSitePath = 
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH);
+            if (StringUtils.isNotBlank(hiveHadoopConfigPath)) {
+                HADOOP_CONF_FILES.forEach(
+                        confFile -> {
+                            java.nio.file.Path path = 
Paths.get(hiveHadoopConfigPath, confFile);
+                            if (Files.exists(path)) {
+                                try {
+                                    hiveConf.addResource(path.toUri().toURL());
+                                } catch (IOException e) {
+                                    log.warn(
+                                            "Error adding Hadoop resource {}, 
resource was not added",
+                                            path,
+                                            e);
+                                }
+                            }
+                        });
+            }
+
+            if (StringUtils.isNotBlank(hiveSitePath)) {
                 hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
             }
+
+            log.info("hive client conf:{}", hiveConf);
             if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) {
-                Configuration hadoopConfig = new Configuration();
-                hadoopConfig.set("hadoop.security.authentication", "kerberos");
+                // login Kerberos
+                Configuration authConf = new Configuration();
+                authConf.set("hadoop.security.authentication", "kerberos");
                 this.hiveMetaStoreClient =
                         HadoopLoginFactory.loginWithKerberos(
-                                hadoopConfig,
-                                
readonlyConfig.get(BaseSourceConfigOptions.KRB5_PATH),
-                                
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_PRINCIPAL),
-                                
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH),
-                                (configuration, userGroupInformation) ->
-                                        new HiveMetaStoreClient(hiveConf));
+                                authConf,
+                                
readonlyConfig.get(HdfsSourceConfigOptions.KRB5_PATH),
+                                
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL),
+                                
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH),
+                                (conf, userGroupInformation) -> {
+                                    return new HiveMetaStoreClient(hiveConf);
+                                });
                 return;
             }
             if (HiveMetaStoreProxyUtils.enableRemoteUser(readonlyConfig)) {
                 this.hiveMetaStoreClient =
                         HadoopLoginFactory.loginWithRemoteUser(
                                 new Configuration(),
-                                
readonlyConfig.get(BaseSourceConfigOptions.REMOTE_USER),
-                                (configuration, userGroupInformation) ->
-                                        new HiveMetaStoreClient(hiveConf));
+                                
readonlyConfig.get(HdfsSourceConfigOptions.REMOTE_USER),
+                                (conf, userGroupInformation) -> {
+                                    return new HiveMetaStoreClient(hiveConf);
+                                });
                 return;
             }
             this.hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);

Reply via email to