This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e3001207c8 [Feature][Connector-V2][Iceberg] Support Iceberg Kerberos 
(#7246)
e3001207c8 is described below

commit e3001207c887f73cd44648b4adb79173572bb4a9
Author: 卢宗柱 <luzong...@sina.com>
AuthorDate: Wed Aug 14 22:34:47 2024 +0800

    [Feature][Connector-V2][Iceberg] Support Iceberg Kerberos (#7246)
---
 .../seatunnel/api/kerberos/KerberosConfig.java     | 43 ++++++++++++++
 .../common/exception/CommonErrorCode.java          |  3 +-
 .../seatunnel/file/config/BaseSinkConfig.java      | 22 +------
 .../seatunnel/iceberg/IcebergCatalogLoader.java    | 67 +++++++++++++++++++++-
 .../iceberg/catalog/IcebergCatalogFactory.java     |  7 ++-
 .../seatunnel/iceberg/config/CommonConfig.java     | 18 +++++-
 .../seatunnel/iceberg/data/IcebergTypeMapper.java  |  1 -
 .../seatunnel/iceberg/sink/IcebergSinkFactory.java |  3 +
 .../source/enumerator/AbstractSplitEnumerator.java |  2 +-
 .../iceberg/source/reader/IcebergSourceReader.java |  2 +-
 .../seatunnel/iceberg/utils/SchemaUtils.java       | 11 ++--
 .../iceberg/catalog/IcebergCatalogTest.java        |  7 +++
 12 files changed, 150 insertions(+), 36 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/kerberos/KerberosConfig.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/kerberos/KerberosConfig.java
new file mode 100644
index 0000000000..d501a3ea49
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/kerberos/KerberosConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.kerberos;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class KerberosConfig {
+
+    public static final Option<String> KERBEROS_PRINCIPAL =
+            Options.key("kerberos_principal")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("When use kerberos, we should set 
kerberos user principal");
+
+    public static final Option<String> KRB5_PATH =
+            Options.key("krb5_path")
+                    .stringType()
+                    .defaultValue("/etc/krb5.conf")
+                    .withDescription(
+                            "When use kerberos, we should set krb5 path file 
path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf'");
+
+    public static final Option<String> KERBEROS_KEYTAB_PATH =
+            Options.key("kerberos_keytab_path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("When using kerberos, We should specify 
the keytab path");
+}
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
index 79621c4216..99cb7353cf 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java
@@ -49,11 +49,9 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
     WRITE_SEATUNNEL_ROW_ERROR(
             "COMMON-23",
             "<connector> write SeaTunnelRow failed, the SeaTunnelRow value is 
'<seaTunnelRow>'."),
-
     SQL_TEMPLATE_HANDLED_ERROR(
             "COMMON-24",
             "The table of <tableName> has no <keyName>, but the template \n 
<template> \n which has the place holder named <placeholder>. Please use the 
option named <optionName> to specify sql template"),
-
     VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is 
unsupported."),
     OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is 
unsupported."),
     CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR(
@@ -78,6 +76,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {
             "COMMON-33",
             "The datetime format '<datetime>' of field '<field>' is not 
supported. Please check the datetime format."),
     UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method 
'<methodName>'"),
+    KERBEROS_AUTHORIZED_FAILED("COMMON-35", "Kerberos authorized failed"),
     ;
 
     private final String code;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 0759baf9e4..35f1e4ba16 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.kerberos.KerberosConfig;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
@@ -34,7 +35,7 @@ import static 
org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA;
 import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
 import static 
org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
 
-public class BaseSinkConfig {
+public class BaseSinkConfig extends KerberosConfig {
     public static final String SEATUNNEL = "seatunnel";
     public static final String NON_PARTITION = "NON_PARTITION";
     public static final String TRANSACTION_ID_SPLIT = "_";
@@ -228,25 +229,6 @@ public class BaseSinkConfig {
                     .noDefaultValue()
                     .withDescription("The remote user name of hdfs");
 
-    public static final Option<String> KRB5_PATH =
-            Options.key("krb5_path")
-                    .stringType()
-                    .defaultValue("/etc/krb5.conf")
-                    .withDescription(
-                            "When use kerberos, we should set krb5 path file 
path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");
-
-    public static final Option<String> KERBEROS_PRINCIPAL =
-            Options.key("kerberos_principal")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Kerberos principal");
-
-    public static final Option<String> KERBEROS_KEYTAB_PATH =
-            Options.key("kerberos_keytab_path")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Kerberos keytab file path");
-
     public static final Option<Integer> MAX_ROWS_IN_MEMORY =
             Options.key("max_rows_in_memory")
                     .intType()
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
index 63596c88be..0f4610783a 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java
@@ -19,14 +19,21 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg;
 
 import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.common.DynClasses;
 import org.apache.iceberg.common.DynMethods;
 
 import lombok.extern.slf4j.Slf4j;
+import sun.security.krb5.KrbException;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -62,7 +69,7 @@ public class IcebergCatalogLoader implements Serializable {
      * @param config
      * @return
      */
-    private Object loadHadoopConfig(CommonConfig config) {
+    public Object loadHadoopConfig(CommonConfig config) {
         Class<?> configClass =
                 DynClasses.builder()
                         .impl("org.apache.hadoop.hdfs.HdfsConfiguration")
@@ -80,7 +87,6 @@ public class IcebergCatalogLoader implements Serializable {
             log.info("Hadoop not found on classpath, not creating Hadoop 
config");
             return null;
         }
-
         try {
             Object result = configClass.getDeclaredConstructor().newInstance();
             DynMethods.BoundMethod addResourceMethod =
@@ -109,6 +115,8 @@ public class IcebergCatalogLoader implements Serializable {
                         });
             }
             config.getHadoopProps().forEach(setMethod::invoke);
+            // kerberos authentication
+            doKerberosLogin((Configuration) result);
             log.info("Hadoop config initialized: {}", configClass.getName());
             return result;
         } catch (InstantiationException
@@ -121,4 +129,59 @@ public class IcebergCatalogLoader implements Serializable {
         }
         return null;
     }
+
+    /**
+     * kerberos authentication
+     *
+     * @param configuration Configuration
+     */
+    private Configuration doKerberosLogin(Configuration configuration) {
+        String kerberosKrb5ConfPath = config.getKerberosKrb5ConfPath();
+        String kerberosKeytabPath = config.getKerberosKeytabPath();
+        String kerberosPrincipal = config.getKerberosPrincipal();
+
+        if (StringUtils.isNotEmpty(kerberosPrincipal)
+                && StringUtils.isNotEmpty(kerberosKrb5ConfPath)
+                && StringUtils.isNotEmpty(kerberosKeytabPath)) {
+            try {
+                System.setProperty("java.security.krb5.conf", 
kerberosKrb5ConfPath);
+                System.setProperty("krb.principal", kerberosPrincipal);
+                doKerberosAuthentication(configuration, kerberosPrincipal, 
kerberosKeytabPath);
+            } catch (Exception e) {
+                throw new IcebergConnectorException(
+                        CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
+                        String.format("Kerberos authentication failed: %s", 
e.getMessage()));
+            }
+        } else {
+            log.warn(
+                    "Kerberos authentication is not configured, it will skip 
kerberos authentication");
+        }
+
+        return configuration;
+    }
+
+    public static void doKerberosAuthentication(
+            Configuration configuration, String principal, String keytabPath) {
+        if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) 
{
+            log.warn(
+                    "Principal [{}] or keytabPath [{}] is empty, it will skip 
kerberos authentication",
+                    principal,
+                    keytabPath);
+        } else {
+            configuration.set("hadoop.security.authentication", "kerberos");
+            UserGroupInformation.setConfiguration(configuration);
+            try {
+                log.info(
+                        "Start Kerberos authentication using principal {} and 
keytab {}",
+                        principal,
+                        keytabPath);
+                sun.security.krb5.Config.refresh();
+                UserGroupInformation.loginUserFromKeytab(principal, 
keytabPath);
+                UserGroupInformation loginUser = 
UserGroupInformation.getLoginUser();
+                log.info("Kerberos authentication successful,UGI {}", 
loginUser);
+            } catch (IOException | KrbException e) {
+                throw new SeaTunnelException("check connectivity failed, " + 
e.getMessage(), e);
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java
index 1259699068..182286e75f 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java
@@ -48,7 +48,12 @@ public class IcebergCatalogFactory implements CatalogFactory 
{
                         CommonConfig.KEY_NAMESPACE,
                         CommonConfig.KEY_TABLE,
                         CommonConfig.CATALOG_PROPS)
-                .optional(CommonConfig.HADOOP_PROPS, KEY_CASE_SENSITIVE)
+                .optional(
+                        CommonConfig.HADOOP_PROPS,
+                        CommonConfig.KERBEROS_PRINCIPAL,
+                        CommonConfig.KERBEROS_KEYTAB_PATH,
+                        CommonConfig.KRB5_PATH,
+                        KEY_CASE_SENSITIVE)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
index a7503e6e30..ce76f5a512 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.iceberg.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.kerberos.KerberosConfig;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
 
 import lombok.Getter;
@@ -33,7 +34,7 @@ import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
 
 @Getter
 @ToString
-public class CommonConfig implements Serializable {
+public class CommonConfig extends KerberosConfig implements Serializable {
     private static final long serialVersionUID = 239821141534421580L;
 
     public static final Option<String> KEY_CATALOG_NAME =
@@ -89,6 +90,12 @@ public class CommonConfig implements Serializable {
     private Map<String, String> hadoopProps;
     private String hadoopConfPath;
 
+    // kerberos
+
+    private String kerberosPrincipal;
+    private String kerberosKeytabPath;
+    private String kerberosKrb5ConfPath;
+
     public CommonConfig(ReadonlyConfig pluginConfig) {
         this.catalogName = 
checkArgumentNotNull(pluginConfig.get(KEY_CATALOG_NAME));
         this.namespace = pluginConfig.get(KEY_NAMESPACE);
@@ -99,6 +106,15 @@ public class CommonConfig implements Serializable {
         if (pluginConfig.toConfig().hasPath(KEY_CASE_SENSITIVE.key())) {
             this.caseSensitive = pluginConfig.get(KEY_CASE_SENSITIVE);
         }
+        if (pluginConfig.getOptional(KERBEROS_PRINCIPAL).isPresent()) {
+            this.kerberosPrincipal = 
pluginConfig.getOptional(KERBEROS_PRINCIPAL).get();
+        }
+        if (pluginConfig.getOptional(KRB5_PATH).isPresent()) {
+            this.kerberosKrb5ConfPath = 
pluginConfig.getOptional(KRB5_PATH).get();
+        }
+        if (pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).isPresent()) {
+            this.kerberosKeytabPath = 
pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).get();
+        }
         validate();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
index e1635919d6..4f3f57e415 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
@@ -37,7 +37,6 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class IcebergTypeMapper {
-
     public static SeaTunnelDataType<?> mapping(String field, @NonNull Type 
icebergType) {
         switch (icebergType.typeId()) {
             case BOOLEAN:
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
index 212bb6371d..47cfa33108 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
@@ -52,6 +52,9 @@ public class IcebergSinkFactory implements TableSinkFactory {
                 .optional(
                         SinkConfig.TABLE_PROPS,
                         SinkConfig.HADOOP_PROPS,
+                        SinkConfig.KERBEROS_PRINCIPAL,
+                        SinkConfig.KERBEROS_KEYTAB_PATH,
+                        SinkConfig.KRB5_PATH,
                         SinkConfig.WRITE_PROPS,
                         SinkConfig.AUTO_CREATE_PROPS,
                         SinkConfig.TABLE_PRIMARY_KEYS,
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
index ef3beacd26..73cb71f45f 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
@@ -42,7 +42,7 @@ import java.util.Set;
 public abstract class AbstractSplitEnumerator
         implements SourceSplitEnumerator<IcebergFileScanTaskSplit, 
IcebergSplitEnumeratorState> {
 
-    protected final SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> 
context;
+    protected final Context<IcebergFileScanTaskSplit> context;
     protected final SourceConfig sourceConfig;
     protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits;
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
index ebc75bae9b..83f42879d0 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java
@@ -46,7 +46,7 @@ public class IcebergSourceReader implements 
SourceReader<SeaTunnelRow, IcebergFi
 
     private static final long POLL_WAIT_MS = 1000;
 
-    private final SourceReader.Context context;
+    private final Context context;
     private final Queue<IcebergFileScanTaskSplit> pendingSplits;
     private final Deserializer deserializer;
     private final Schema tableSchema;
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index 9d56072c92..6c99eb409c 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -274,7 +274,7 @@ public class SchemaUtils {
         log.info("Schema for table {} updated with new columns", table.name());
     }
 
-    private static boolean columnExists(org.apache.iceberg.Schema schema, 
SchemaAddColumn update) {
+    private static boolean columnExists(Schema schema, SchemaAddColumn update) 
{
         Types.StructType struct =
                 update.parentName() == null
                         ? schema.asStruct()
@@ -282,13 +282,11 @@ public class SchemaUtils {
         return struct.field(update.name()) != null;
     }
 
-    private static boolean typeMatches(
-            org.apache.iceberg.Schema schema, SchemaModifyColumn update) {
+    private static boolean typeMatches(Schema schema, SchemaModifyColumn 
update) {
         return schema.findType(update.name()).typeId() == 
update.type().typeId();
     }
 
-    private static boolean findColumns(
-            org.apache.iceberg.Schema schema, SchemaDeleteColumn deleteColumn) 
{
+    private static boolean findColumns(Schema schema, SchemaDeleteColumn 
deleteColumn) {
         return schema.findField(deleteColumn.name()) != null;
     }
 
@@ -300,8 +298,7 @@ public class SchemaUtils {
         return IcebergTypeMapper.toIcebergType(rowType);
     }
 
-    public static PartitionSpec createPartitionSpec(
-            org.apache.iceberg.Schema schema, List<String> partitionBy) {
+    public static PartitionSpec createPartitionSpec(Schema schema, 
List<String> partitionBy) {
         if (partitionBy.isEmpty()) {
             return PartitionSpec.unpartitioned();
         }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
index 777f5a38e9..54aa3326de 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
@@ -75,6 +75,13 @@ class IcebergCatalogTest {
         configs.put(CommonConfig.KEY_CATALOG_NAME.key(), CATALOG_NAME);
         configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps);
         configs.put(SinkConfig.TABLE_DEFAULT_PARTITION_KEYS.key(), "dt_col");
+        // hadoop config directory
+        configs.put(CommonConfig.HADOOP_CONF_PATH_PROP.key(), 
"/tmp/hadoop/conf");
+        // hadoop kerberos config
+        configs.put(CommonConfig.KERBEROS_PRINCIPAL.key(), 
"hive/x...@xxxx.com");
+        configs.put(
+                CommonConfig.KERBEROS_KEYTAB_PATH.key(), 
"/tmp/hadoop/conf/hive.service.keytab");
+        configs.put(CommonConfig.KRB5_PATH.key(), 
"/tmp/hadoop/conf/krb5.conf");
         icebergCatalog = new IcebergCatalog(CATALOG_NAME, 
ReadonlyConfig.fromMap(configs));
         icebergCatalog.open();
     }

Reply via email to