This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3d3f7ba370 [Feature][E2E][Kerberos] Support for Kerberos in e2e  
(#8108)
3d3f7ba370 is described below

commit 3d3f7ba370090a75ba562d6fce11727c6715b599
Author: Jast <shengh...@apache.org>
AuthorDate: Mon Nov 25 14:35:12 2024 +0800

    [Feature][E2E][Kerberos] Support for Kerberos in e2e  (#8108)
---
 docs/en/connector-v2/sink/Hive.md                  |  23 ++
 docs/en/connector-v2/source/Hive.md                |  24 ++
 .../e2e/connector/hive/HiveContainer.java          |  51 +++-
 .../seatunnel/e2e/connector/hive/HiveIT.java       |   1 +
 .../e2e/connector/hive/HiveKerberosIT.java         | 339 +++++++++++++++++++++
 .../fake_to_hive_on_hdfs_with_kerberos.conf        |  62 ++++
 .../hive_on_hdfs_to_assert_with_kerberos.conf      |  77 +++++
 .../src/test/resources/kerberos/core-site.xml      |  29 ++
 .../src/test/resources/kerberos/hive-site.xml      |  84 +++++
 .../src/test/resources/kerberos/krb5.conf          |  33 ++
 .../src/test/resources/kerberos/krb5_local.conf    |  33 ++
 .../e2e/common/container/TestContainer.java        |   2 +
 .../flink/AbstractTestFlinkContainer.java          |   6 +
 .../ConnectorPackageServiceContainer.java          |   5 +
 .../container/seatunnel/SeaTunnelContainer.java    |  20 ++
 .../spark/AbstractTestSparkContainer.java          |   6 +
 16 files changed, 789 insertions(+), 6 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hive.md 
b/docs/en/connector-v2/sink/Hive.md
index 147fd766a9..df5b493884 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -182,6 +182,29 @@ sink {
 }
 ```
 
+### example2: Kerberos
+
+```bash
+sink {
+  Hive {
+    table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+    metastore_uri = "thrift://metastore:9083"
+    hive_site_path = "/tmp/hive-site.xml"
+    kerberos_principal = "hive/metastore.seatun...@example.com"
+    kerberos_keytab_path = "/tmp/hive.keytab"
+    krb5_path = "/tmp/krb5.conf"
+  }
+}
+```
+
+Description:
+
+- `hive_site_path`: The path to the `hive-site.xml` file.
+- `kerberos_principal`: The principal for Kerberos authentication.
+- `kerberos_keytab_path`: The keytab file path for Kerberos authentication.
+- `krb5_path`: The path to the `krb5.conf` file used for Kerberos 
authentication.
+
+
 ## Hive on s3
 
 ### Step 1
diff --git a/docs/en/connector-v2/source/Hive.md 
b/docs/en/connector-v2/source/Hive.md
index 5669906c3b..6667ccc8ee 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -138,6 +138,30 @@ Source plugin common parameters, please refer to [Source 
Common Options](../sour
 
 ```
 
+### Example3 : Kerberos
+
+```bash
+source {
+  Hive {
+    table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+    metastore_uri = "thrift://metastore:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    result_table_name = hive_source
+    hive_site_path = "/tmp/hive-site.xml"
+    kerberos_principal = "hive/metastore.seatun...@example.com"
+    kerberos_keytab_path = "/tmp/hive.keytab"
+    krb5_path = "/tmp/krb5.conf"
+  }
+}
+```
+
+Description:
+
+- `hive_site_path`: The path to the `hive-site.xml` file.
+- `kerberos_principal`: The principal for Kerberos authentication.
+- `kerberos_keytab_path`: The keytab file path for Kerberos authentication.
+- `krb5_path`: The path to the `krb5.conf` file used for Kerberos 
authentication.
+
 ## Hive on s3
 
 ### Step 1
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
index 486ad0b8b6..cadf95c110 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
@@ -17,9 +17,13 @@
 
 package org.apache.seatunnel.e2e.connector.hive;
 
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -28,6 +32,7 @@ import 
org.testcontainers.containers.wait.strategy.WaitStrategy;
 import org.testcontainers.utility.DockerImageName;
 import org.testcontainers.utility.DockerLoggerFactory;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.SQLException;
@@ -70,24 +75,58 @@ public class HiveContainer extends 
GenericContainer<HiveContainer> {
         return String.format("thrift://%s:%s", getHost(), 
getMappedPort(HMS_PORT));
     }
 
-    public String getHiveJdbcUri() {
-        return String.format(
-                "jdbc:hive2://%s:%s/default", getHost(), 
getMappedPort(HIVE_SERVER_PORT));
+    public String getHiveJdbcUri(boolean enableKerberos) {
+        if (enableKerberos) {
+            return String.format(
+                    
"jdbc:hive2://%s:%s/default;principal=hive/metastore.seatun...@example.com",
+                    getHost(), getMappedPort(HIVE_SERVER_PORT));
+        } else {
+            return String.format(
+                    "jdbc:hive2://%s:%s/default", getHost(), 
getMappedPort(HIVE_SERVER_PORT));
+        }
     }
 
     public HiveMetaStoreClient createMetaStoreClient() throws MetaException {
+        return this.createMetaStoreClient(false);
+    }
+
+    public HiveMetaStoreClient createMetaStoreClient(boolean enableKerberos) 
throws MetaException {
         HiveConf conf = new HiveConf();
         conf.set("hive.metastore.uris", getMetastoreUri());
-
+        if (enableKerberos) {
+            conf.addResource("kerberos/hive-site.xml");
+        }
         return new HiveMetaStoreClient(conf);
     }
 
     public Connection getConnection()
             throws ClassNotFoundException, InstantiationException, 
IllegalAccessException,
                     SQLException {
-        Driver driver = loadHiveJdbcDriver();
+        return getConnection(false);
+    }
 
-        return driver.connect(getHiveJdbcUri(), getJdbcConnectionConfig());
+    public Connection getConnection(boolean enableKerberos)
+            throws ClassNotFoundException, InstantiationException, 
IllegalAccessException,
+                    SQLException {
+        Driver driver = loadHiveJdbcDriver();
+        if (!enableKerberos) {
+            return driver.connect(getHiveJdbcUri(false), 
getJdbcConnectionConfig());
+        }
+        Configuration authConf = new Configuration();
+        authConf.set("hadoop.security.authentication", "kerberos");
+        Configuration configuration = new Configuration();
+        System.setProperty(
+                "java.security.krb5.conf",
+                
ContainerUtil.getResourcesFile("/kerberos/krb5_local.conf").getPath());
+        configuration.set("hadoop.security.authentication", "KERBEROS");
+        try {
+            UserGroupInformation.setConfiguration(configuration);
+            UserGroupInformation.loginUserFromKeytab(
+                    "hive/metastore.seatun...@example.com", 
"/tmp/hive.keytab");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return driver.connect(getHiveJdbcUri(true), getJdbcConnectionConfig());
     }
 
     public Driver loadHiveJdbcDriver()
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
index 5973d69758..bfa83dfb3b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
@@ -180,6 +180,7 @@ public class HiveIT extends TestSuiteBase implements 
TestResource {
                 .await()
                 .atMost(360, TimeUnit.SECONDS)
                 .pollDelay(Duration.ofSeconds(10L))
+                .pollInterval(Duration.ofSeconds(3L))
                 .untilAsserted(this::initializeConnection);
         prepareTable();
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
new file mode 100644
index 0000000000..c2fca452fa
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hive;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK})
+@Slf4j
+public class HiveKerberosIT extends SeaTunnelContainer {
+
+    // It is necessary to set up a separate network with a fixed name, 
otherwise network issues may
+    // cause Kerberos authentication failure
+    Network NETWORK =
+            Network.builder()
+                    .createNetworkCmdModifier(cmd -> cmd.withName("SEATUNNEL"))
+                    .enableIpv6(false)
+                    .build();
+
+    private static final String CREATE_SQL =
+            "CREATE TABLE test_hive_sink_on_hdfs_with_kerberos"
+                    + "("
+                    + "    pk_id  BIGINT,"
+                    + "    name   STRING,"
+                    + "    score  INT"
+                    + ")";
+
+    private static final String HMS_HOST = "metastore";
+    private static final String HIVE_SERVER_HOST = "hiveserver2";
+    private GenericContainer<?> kerberosContainer;
+    private static final String KERBEROS_IMAGE_NAME = 
"zhangshenghang/kerberos-server:1.0";
+
+    private String hiveExeUrl() {
+        return 
"https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar";;
+    }
+
+    private String libFb303Url() {
+        return 
"https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar";;
+    }
+
+    private String hadoopAwsUrl() {
+        return 
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar";;
+    }
+
+    private String aliyunSdkOssUrl() {
+        return 
"https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.4.1/aliyun-sdk-oss-3.4.1.jar";;
+    }
+
+    private String jdomUrl() {
+        return "https://repo1.maven.org/maven2/org/jdom/jdom/1.1/jdom-1.1.jar";;
+    }
+
+    private String hadoopAliyunUrl() {
+        return 
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.1.4/hadoop-aliyun-3.1.4.jar";;
+    }
+
+    private String hadoopCosUrl() {
+        return 
"https://repo1.maven.org/maven2/com/qcloud/cos/hadoop-cos/2.6.5-8.0.2/hadoop-cos-2.6.5-8.0.2.jar";;
+    }
+
+    private HiveContainer hiveServerContainer;
+    private HiveContainer hmsContainer;
+    private Connection hiveConnection;
+    private String pluginHiveDir = "/tmp/seatunnel/plugins/Hive/lib";
+
+    protected void downloadHivePluginJar() throws IOException, 
InterruptedException {
+        Container.ExecResult downloadHiveExeCommands =
+                server.execInContainer(
+                        "sh",
+                        "-c",
+                        "mkdir -p "
+                                + pluginHiveDir
+                                + " && cd "
+                                + pluginHiveDir
+                                + " && wget "
+                                + hiveExeUrl());
+        Assertions.assertEquals(
+                0, downloadHiveExeCommands.getExitCode(), 
downloadHiveExeCommands.getStderr());
+        Container.ExecResult downloadLibFb303Commands =
+                server.execInContainer(
+                        "sh", "-c", "cd " + pluginHiveDir + " && wget " + 
libFb303Url());
+        Assertions.assertEquals(
+                0, downloadLibFb303Commands.getExitCode(), 
downloadLibFb303Commands.getStderr());
+        // The jar of s3
+        Container.ExecResult downloadS3Commands =
+                server.execInContainer(
+                        "sh", "-c", "cd " + pluginHiveDir + " && wget " + 
hadoopAwsUrl());
+        Assertions.assertEquals(
+                0, downloadS3Commands.getExitCode(), 
downloadS3Commands.getStderr());
+        // The jar of oss
+        Container.ExecResult downloadOssCommands =
+                server.execInContainer(
+                        "sh",
+                        "-c",
+                        "cd "
+                                + pluginHiveDir
+                                + " && wget "
+                                + aliyunSdkOssUrl()
+                                + " && wget "
+                                + jdomUrl()
+                                + " && wget "
+                                + hadoopAliyunUrl());
+        Assertions.assertEquals(
+                0, downloadOssCommands.getExitCode(), 
downloadOssCommands.getStderr());
+        // The jar of cos
+        Container.ExecResult downloadCosCommands =
+                server.execInContainer(
+                        "sh", "-c", "cd " + pluginHiveDir + " && wget " + 
hadoopCosUrl());
+        Assertions.assertEquals(
+                0, downloadCosCommands.getExitCode(), 
downloadCosCommands.getStderr());
+    };
+
+    @BeforeEach
+    @Override
+    public void startUp() throws Exception {
+
+        kerberosContainer =
+                new GenericContainer<>(KERBEROS_IMAGE_NAME)
+                        .withNetwork(NETWORK)
+                        .withExposedPorts(88, 749)
+                        .withCreateContainerCmdModifier(cmd -> 
cmd.withHostName("kerberos"))
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(KERBEROS_IMAGE_NAME)));
+        kerberosContainer.setPortBindings(Arrays.asList("88/udp:88/udp", 
"749:749"));
+        Startables.deepStart(Stream.of(kerberosContainer)).join();
+        log.info("Kerberos just started");
+
+        // Copy the keytab file from kerberos container to local
+        given().ignoreExceptions()
+                .await()
+                .atMost(30, TimeUnit.SECONDS)
+                .pollDelay(Duration.ofSeconds(1L))
+                .untilAsserted(
+                        () ->
+                                kerberosContainer.copyFileFromContainer(
+                                        "/tmp/hive.keytab", 
"/tmp/hive.keytab"));
+
+        hmsContainer =
+                HiveContainer.hmsStandalone()
+                        .withCreateContainerCmdModifier(cmd -> 
cmd.withName(HMS_HOST))
+                        .withNetwork(NETWORK)
+                        .withFileSystemBind(
+                                
ContainerUtil.getResourcesFile("/kerberos/krb5.conf").getPath(),
+                                "/etc/krb5.conf")
+                        .withFileSystemBind("/tmp/hive.keytab", 
"/tmp/hive.keytab")
+                        .withFileSystemBind(
+                                
ContainerUtil.getResourcesFile("/kerberos/hive-site.xml").getPath(),
+                                "/opt/hive/conf/hive-site.xml")
+                        .withFileSystemBind(
+                                
ContainerUtil.getResourcesFile("/kerberos/core-site.xml").getPath(),
+                                "/opt/hive/conf/core-site.xml")
+                        .withNetworkAliases(HMS_HOST);
+        hmsContainer.setPortBindings(Collections.singletonList("9083:9083"));
+
+        Startables.deepStart(Stream.of(hmsContainer)).join();
+        log.info("HMS just started");
+
+        hiveServerContainer =
+                HiveContainer.hiveServer()
+                        .withNetwork(NETWORK)
+                        .withCreateContainerCmdModifier(cmd -> 
cmd.withName(HIVE_SERVER_HOST))
+                        .withNetworkAliases(HIVE_SERVER_HOST)
+                        .withFileSystemBind(
+                                
ContainerUtil.getResourcesFile("/kerberos/krb5.conf").getPath(),
+                                "/etc/krb5.conf")
+                        .withFileSystemBind("/tmp/hive.keytab", 
"/tmp/hive.keytab")
+                        .withFileSystemBind(
+                                
ContainerUtil.getResourcesFile("/kerberos/hive-site.xml").getPath(),
+                                "/opt/hive/conf/hive-site.xml")
+                        .withFileSystemBind(
+                                
ContainerUtil.getResourcesFile("/kerberos/core-site.xml").getPath(),
+                                "/opt/hive/conf/core-site.xml")
+                        .withFileSystemBind("/tmp/data", "/opt/hive/data")
+                        //  If there are any issues, you can open the kerberos 
debug log to view
+                        // more information: -Dsun.security.krb5.debug=true
+                        .withEnv("SERVICE_OPTS", 
"-Dhive.metastore.uris=thrift://metastore:9083")
+                        .withEnv("IS_RESUME", "true")
+                        .dependsOn(hmsContainer);
+        
hiveServerContainer.setPortBindings(Collections.singletonList("10000:10000"));
+
+        Startables.deepStart(Stream.of(hiveServerContainer)).join();
+
+        log.info("HiveServer2 just started");
+
+        given().ignoreExceptions()
+                .await()
+                .atMost(3600, TimeUnit.SECONDS)
+                .pollDelay(Duration.ofSeconds(10L))
+                .pollInterval(Duration.ofSeconds(3L))
+                .untilAsserted(this::initializeConnection);
+
+        prepareTable();
+
+        // Set the fixed network to SeatunnelContainer
+        super.startUp(this.NETWORK);
+        // Load the hive plugin jar
+        this.downloadHivePluginJar();
+    }
+
+    @AfterEach
+    @Override
+    public void tearDown() throws Exception {
+        if (hmsContainer != null) {
+            log.info(hmsContainer.execInContainer("cat", 
"/tmp/hive/hive.log").getStdout());
+            hmsContainer.close();
+        }
+        if (hiveServerContainer != null) {
+            log.info(hiveServerContainer.execInContainer("cat", 
"/tmp/hive/hive.log").getStdout());
+            hiveServerContainer.close();
+        }
+    }
+
+    private void initializeConnection()
+            throws ClassNotFoundException, InstantiationException, 
IllegalAccessException,
+                    SQLException {
+        this.hiveConnection = this.hiveServerContainer.getConnection(true);
+    }
+
+    private void prepareTable() throws Exception {
+        log.info(
+                String.format(
+                        "Databases are %s",
+                        
this.hmsContainer.createMetaStoreClient(true).getAllDatabases()));
+        try (Statement statement = this.hiveConnection.createStatement()) {
+            statement.execute(CREATE_SQL);
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw exception;
+        }
+    }
+
+    private void executeJob(TestContainer container, String job1, String job2)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob(job1);
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Container.ExecResult readResult = container.executeJob(job2);
+        Assertions.assertEquals(0, readResult.getExitCode());
+    }
+
+    @Test
+    public void testFakeSinkHiveOnHDFS() throws Exception {
+        copyAbsolutePathToContainer("/tmp/hive.keytab", "/tmp/hive.keytab");
+        copyFileToContainer("/kerberos/krb5.conf", "/tmp/krb5.conf");
+        copyFileToContainer("/kerberos/hive-site.xml", "/tmp/hive-site.xml");
+
+        Container.ExecResult fakeToHiveWithKerberosResult =
+                executeJob("/fake_to_hive_on_hdfs_with_kerberos.conf");
+        Assertions.assertEquals(0, fakeToHiveWithKerberosResult.getExitCode());
+
+        Container.ExecResult hiveToAssertWithKerberosResult =
+                executeJob("/hive_on_hdfs_to_assert_with_kerberos.conf");
+        Assertions.assertEquals(0, 
hiveToAssertWithKerberosResult.getExitCode());
+
+        Container.ExecResult fakeToHiveResult = 
executeJob("/fake_to_hive_on_hdfs.conf");
+        Assertions.assertEquals(1, fakeToHiveResult.getExitCode());
+        Assertions.assertTrue(
+                fakeToHiveResult
+                        .getStderr()
+                        .contains("Get hive table information from hive 
metastore service failed"));
+
+        Container.ExecResult hiveToAssertResult = 
executeJob("/hive_on_hdfs_to_assert.conf");
+        Assertions.assertEquals(1, hiveToAssertResult.getExitCode());
+        Assertions.assertTrue(
+                hiveToAssertResult
+                        .getStderr()
+                        .contains("Get hive table information from hive 
metastore service failed"));
+    }
+
+    @TestTemplate
+    @Disabled(
+            "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this 
test, please set up your own environment in the test case file, 
hadoop_hive_conf_path_local and ip below}")
+    public void testFakeSinkHiveOnS3(TestContainer container) throws Exception 
{
+        executeJob(container, "/fake_to_hive_on_s3.conf", 
"/hive_on_s3_to_assert.conf");
+    }
+
+    @TestTemplate
+    @Disabled(
+            "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this 
test, please set up your own environment in the test case file, 
hadoop_hive_conf_path_local and ip below}")
+    public void testFakeSinkHiveOnOSS(TestContainer container) throws 
Exception {
+        executeJob(container, "/fake_to_hive_on_oss.conf", 
"/hive_on_oss_to_assert.conf");
+    }
+
+    @TestTemplate
+    @Disabled(
+            "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this 
test, please set up your own environment in the test case file, 
hadoop_hive_conf_path_local and ip below}")
+    public void testFakeSinkHiveOnCos(TestContainer container) throws 
Exception {
+        executeJob(container, "/fake_to_hive_on_cos.conf", 
"/hive_on_cos_to_assert.conf");
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs_with_kerberos.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs_with_kerberos.conf
new file mode 100644
index 0000000000..d74b396a62
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs_with_kerberos.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Hive {
+    table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+    metastore_uri = "thrift://metastore:9083"
+    hive_site_path = "/tmp/hive-site.xml"
+    kerberos_principal = "hive/metastore.seatun...@example.com"
+    kerberos_keytab_path = "/tmp/hive.keytab"
+    krb5_path = "/tmp/krb5.conf"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert_with_kerberos.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert_with_kerberos.conf
new file mode 100644
index 0000000000..59c768e4fb
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert_with_kerberos.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hive {
+    table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+    metastore_uri = "thrift://metastore:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    result_table_name = hive_source
+    hive_site_path = "/tmp/hive-site.xml"
+    kerberos_principal = "hive/metastore.seatun...@example.com"
+    kerberos_keytab_path = "/tmp/hive.keytab"
+    krb5_path = "/tmp/krb5.conf"
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = hive_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 3
+        }
+      ],
+      field_rules = [
+        {
+          field_name = pk_id
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = score
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/core-site.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/core-site.xml
new file mode 100644
index 0000000000..ed5df9b0f5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/core-site.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<configuration>
+    <property>
+        <name>hadoop.security.authorization</name>
+        <value>true</value>
+    </property>
+    <property>
+        <name>hadoop.security.authentication</name>
+        <value>kerberos</value>
+    </property>
+</configuration>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/hive-site.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/hive-site.xml
new file mode 100644
index 0000000000..2dd37b5256
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/hive-site.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <property>
+        <name>hive.server2.authentication</name>
+        <value>KERBEROS</value>
+    </property>
+    <property>
+        <name>hive.server2.authentication.kerberos.principal</name>
+        <value>hive/metastore.seatun...@example.com</value>
+    </property>
+    <property>
+        <name>hive.server2.authentication.kerberos.keytab</name>
+        <value>/tmp/hive.keytab</value>
+    </property>
+    <property>
+        <name>hive.security.authenticator.manager</name>
+        
<value>org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator</value>
+    </property>
+    <property>
+        <name>hive.metastore.sasl.enabled</name>
+        <value>true</value>
+    </property>
+    <property>
+        <name>hive.metastore.kerberos.keytab.file</name>
+        <value>/tmp/hive.keytab</value>
+    </property>
+    <property>
+        <name>hive.metastore.kerberos.principal</name>
+        <value>hive/metastore.seatun...@example.com</value>
+    </property>
+    <property>
+        <name>hive.exec.scratchdir</name>
+        <value>/opt/hive/scratch_dir</value>
+    </property>
+    <property>
+        <name>hive.user.install.directory</name>
+        <value>/opt/hive/install_dir</value>
+    </property>
+    <property>
+        <name>tez.runtime.optimize.local.fetch</name>
+        <value>true</value>
+    </property>
+    <property>
+        <name>hive.exec.submit.local.task.via.child</name>
+        <value>false</value>
+    </property>
+    <property>
+        <name>mapreduce.framework.name</name>
+        <value>local</value>
+    </property>
+    <property>
+        <name>tez.local.mode</name>
+        <value>true</value>
+    </property>
+    <property>
+        <name>hive.execution.engine</name>
+        <value>tez</value>
+    </property>
+    <property>
+        <name>metastore.warehouse.dir</name>
+        <value>/opt/hive/data/warehouse</value>
+    </property>
+    <property>
+        <name>metastore.metastore.event.db.notification.api.auth</name>
+        <value>false</value>
+    </property>
+</configuration>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5.conf
new file mode 100755
index 0000000000..2b09b1c3e5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[libdefaults]
+    default_realm = EXAMPLE.COM
+    dns_lookup_realm = true
+    dns_lookup_kdc = true
+    ticket_lifetime = 24h
+    forwardable = true
+
+[realms]
+    EXAMPLE.COM = {
+        kdc = kerberos:88
+        admin_server = kerberos:749
+    }
+
+[domain_realm]
+    .example.com = EXAMPLE.COM
+    example.com = EXAMPLE.COM
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5_local.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5_local.conf
new file mode 100755
index 0000000000..bd136e9e8d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5_local.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[libdefaults]
+    default_realm = EXAMPLE.COM
+    dns_lookup_realm = true
+    dns_lookup_kdc = true
+    ticket_lifetime = 24h
+    forwardable = true
+
+[realms]
+    EXAMPLE.COM = {
+        kdc = localhost:88
+        admin_server = localhost:749
+    }
+
+[domain_realm]
+    .example.com = EXAMPLE.COM
+    example.com = EXAMPLE.COM
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index fd49c7b46e..f815ecb6b2 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -80,4 +80,6 @@ public interface TestContainer extends TestResource {
     String getServerLogs();
 
     void copyFileToContainer(String path, String targetPath);
+
+    void copyAbsolutePathToContainer(String path, String targetPath);
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 47b3de5ff5..007a5de4c7 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -34,6 +34,7 @@ import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -180,4 +181,9 @@ public abstract class AbstractTestFlinkContainer extends 
AbstractTestContainer {
         ContainerUtil.copyFileIntoContainers(
                 ContainerUtil.getResourcesFile(path).toPath(), targetPath, 
jobManager);
     }
+
+    @Override
+    public void copyAbsolutePathToContainer(String path, String targetPath) {
+        ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath, 
jobManager);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
index ea8bcd8788..54804d1057 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
@@ -245,4 +245,9 @@ public class ConnectorPackageServiceContainer extends 
AbstractTestContainer {
         ContainerUtil.copyFileIntoContainers(
                 ContainerUtil.getResourcesFile(path).toPath(), targetPath, 
server1);
     }
+
+    @Override
+    public void copyAbsolutePathToContainer(String path, String targetPath) {
+        ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath, 
server1);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index b9ff54b6c3..51947c9e9d 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -39,6 +39,7 @@ import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.utility.DockerLoggerFactory;
@@ -86,7 +87,21 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
         server = createSeaTunnelServer();
     }
 
+    /**
+     * Start up the seatunnel server with the given network.
+     *
+     * @param NETWORK the network to use
+     */
+    public void startUp(Network NETWORK) throws Exception {
+        server = createSeaTunnelServer(NETWORK);
+    }
+
     private GenericContainer<?> createSeaTunnelServer() throws IOException, 
InterruptedException {
+        return createSeaTunnelServer(NETWORK);
+    }
+
+    private GenericContainer<?> createSeaTunnelServer(Network NETWORK)
+            throws IOException, InterruptedException {
         GenericContainer<?> server =
                 new GenericContainer<>(getDockerImage())
                         .withNetwork(NETWORK)
@@ -523,4 +538,9 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
         ContainerUtil.copyFileIntoContainers(
                 ContainerUtil.getResourcesFile(path).toPath(), targetPath, 
server);
     }
+
+    @Override
+    public void copyAbsolutePathToContainer(String path, String targetPath) {
+        ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath, 
server);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index b13851582c..d6c08f1231 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -31,6 +31,7 @@ import org.testcontainers.utility.DockerLoggerFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
@@ -131,4 +132,9 @@ public abstract class AbstractTestSparkContainer extends 
AbstractTestContainer {
         ContainerUtil.copyFileIntoContainers(
                 ContainerUtil.getResourcesFile(path).toPath(), targetPath, 
master);
     }
+
+    @Override
+    public void copyAbsolutePathToContainer(String path, String targetPath) {
+        ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath, 
master);
+    }
 }


Reply via email to