This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new e3001207c8 [Feature][Connector-V2][Iceberg] Support Iceberg Kerberos (#7246) e3001207c8 is described below commit e3001207c887f73cd44648b4adb79173572bb4a9 Author: 卢宗柱 <luzong...@sina.com> AuthorDate: Wed Aug 14 22:34:47 2024 +0800 [Feature][Connector-V2][Iceberg] Support Iceberg Kerberos (#7246) --- .../seatunnel/api/kerberos/KerberosConfig.java | 43 ++++++++++++++ .../common/exception/CommonErrorCode.java | 3 +- .../seatunnel/file/config/BaseSinkConfig.java | 22 +------ .../seatunnel/iceberg/IcebergCatalogLoader.java | 67 +++++++++++++++++++++- .../iceberg/catalog/IcebergCatalogFactory.java | 7 ++- .../seatunnel/iceberg/config/CommonConfig.java | 18 +++++- .../seatunnel/iceberg/data/IcebergTypeMapper.java | 1 - .../seatunnel/iceberg/sink/IcebergSinkFactory.java | 3 + .../source/enumerator/AbstractSplitEnumerator.java | 2 +- .../iceberg/source/reader/IcebergSourceReader.java | 2 +- .../seatunnel/iceberg/utils/SchemaUtils.java | 11 ++-- .../iceberg/catalog/IcebergCatalogTest.java | 7 +++ 12 files changed, 150 insertions(+), 36 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/kerberos/KerberosConfig.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/kerberos/KerberosConfig.java new file mode 100644 index 0000000000..d501a3ea49 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/kerberos/KerberosConfig.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.kerberos; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +public class KerberosConfig { + + public static final Option<String> KERBEROS_PRINCIPAL = + Options.key("kerberos_principal") + .stringType() + .noDefaultValue() + .withDescription("When use kerberos, we should set kerberos user principal"); + + public static final Option<String> KRB5_PATH = + Options.key("krb5_path") + .stringType() + .defaultValue("/etc/krb5.conf") + .withDescription( + "When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf'"); + + public static final Option<String> KERBEROS_KEYTAB_PATH = + Options.key("kerberos_keytab_path") + .stringType() + .noDefaultValue() + .withDescription("When using kerberos, We should specify the keytab path"); +} diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index 79621c4216..99cb7353cf 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -49,11 +49,9 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { WRITE_SEATUNNEL_ROW_ERROR( "COMMON-23", "<connector> write SeaTunnelRow failed, the SeaTunnelRow value is '<seaTunnelRow>'."), - SQL_TEMPLATE_HANDLED_ERROR( "COMMON-24", "The table of <tableName> has no <keyName>, but the template \n <template> \n which has the place holder named <placeholder>. Please use the option named <optionName> to specify sql template"), - VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is unsupported."), OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported."), CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR( @@ -78,6 +76,7 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { "COMMON-33", "The datetime format '<datetime>' of field '<field>' is not supported. Please check the datetime format."), UNSUPPORTED_METHOD("COMMON-34", "'<identifier>' unsupported the method '<methodName>'"), + KERBEROS_AUTHORIZED_FAILED("COMMON-35", "Kerberos authorized failed"), ; private final String code; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 0759baf9e4..35f1e4ba16 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.config; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.kerberos.KerberosConfig; import org.apache.seatunnel.api.sink.DataSaveMode; import org.apache.seatunnel.api.sink.SchemaSaveMode; import org.apache.seatunnel.common.utils.DateTimeUtils; @@ -34,7 +35,7 @@ import static org.apache.seatunnel.api.sink.DataSaveMode.APPEND_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA; import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS; -public class BaseSinkConfig { +public class BaseSinkConfig extends KerberosConfig { public static final String SEATUNNEL = "seatunnel"; public static final String NON_PARTITION = "NON_PARTITION"; public static final String TRANSACTION_ID_SPLIT = "_"; @@ -228,25 +229,6 @@ public class BaseSinkConfig { .noDefaultValue() .withDescription("The remote user name of hdfs"); - public static final Option<String> KRB5_PATH = - Options.key("krb5_path") - .stringType() - .defaultValue("/etc/krb5.conf") - .withDescription( - "When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf"); - - public static final Option<String> KERBEROS_PRINCIPAL = - Options.key("kerberos_principal") - .stringType() - .noDefaultValue() - .withDescription("Kerberos principal"); - - public static final Option<String> KERBEROS_KEYTAB_PATH = - Options.key("kerberos_keytab_path") - .stringType() - .noDefaultValue() - .withDescription("Kerberos keytab file path"); - public static final Option<Integer> MAX_ROWS_IN_MEMORY = Options.key("max_rows_in_memory") .intType() diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java index 63596c88be..0f4610783a 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogLoader.java @@ -19,14 +19,21 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg; import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig; +import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.common.DynMethods; import lombok.extern.slf4j.Slf4j; +import sun.security.krb5.KrbException; import java.io.IOException; import java.io.Serializable; @@ -62,7 +69,7 @@ public class IcebergCatalogLoader implements Serializable { * @param config * @return */ - private Object loadHadoopConfig(CommonConfig config) { + public Object loadHadoopConfig(CommonConfig config) { Class<?> configClass = DynClasses.builder() .impl("org.apache.hadoop.hdfs.HdfsConfiguration") @@ -80,7 +87,6 @@ public class IcebergCatalogLoader implements Serializable { log.info("Hadoop not found on classpath, not creating Hadoop config"); return null; } - try { Object result = configClass.getDeclaredConstructor().newInstance(); DynMethods.BoundMethod addResourceMethod = @@ -109,6 +115,8 @@ public class IcebergCatalogLoader implements Serializable { }); } config.getHadoopProps().forEach(setMethod::invoke); + // kerberos authentication + doKerberosLogin((Configuration) result); log.info("Hadoop config initialized: {}", configClass.getName()); return result; } catch (InstantiationException @@ -121,4 +129,59 @@ public class IcebergCatalogLoader implements Serializable { } return null; } + + /** + * kerberos authentication + * + * @param configuration Configuration + */ + private Configuration doKerberosLogin(Configuration configuration) { + String kerberosKrb5ConfPath = config.getKerberosKrb5ConfPath(); + String kerberosKeytabPath = config.getKerberosKeytabPath(); + String kerberosPrincipal = config.getKerberosPrincipal(); + + if (StringUtils.isNotEmpty(kerberosPrincipal) + && StringUtils.isNotEmpty(kerberosKrb5ConfPath) + && StringUtils.isNotEmpty(kerberosKeytabPath)) { + try { + System.setProperty("java.security.krb5.conf", kerberosKrb5ConfPath); + System.setProperty("krb.principal", kerberosPrincipal); + doKerberosAuthentication(configuration, kerberosPrincipal, kerberosKeytabPath); + } catch (Exception e) { + throw new IcebergConnectorException( + CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, + String.format("Kerberos authentication failed: %s", e.getMessage())); + } + } else { + log.warn( + "Kerberos authentication is not configured, it will skip kerberos authentication"); + } + + return configuration; + } + + public static void doKerberosAuthentication( + Configuration configuration, String principal, String keytabPath) { + if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) { + log.warn( + "Principal [{}] or keytabPath [{}] is empty, it will skip kerberos authentication", + principal, + keytabPath); + } else { + configuration.set("hadoop.security.authentication", "kerberos"); + UserGroupInformation.setConfiguration(configuration); + try { + log.info( + "Start Kerberos authentication using principal {} and keytab {}", + principal, + keytabPath); + sun.security.krb5.Config.refresh(); + UserGroupInformation.loginUserFromKeytab(principal, keytabPath); + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + log.info("Kerberos authentication successful,UGI {}", loginUser); + } catch (IOException | KrbException e) { + throw new SeaTunnelException("check connectivity failed, " + e.getMessage(), e); + } + } + } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java index 1259699068..182286e75f 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogFactory.java @@ -48,7 +48,12 @@ public class IcebergCatalogFactory implements CatalogFactory { CommonConfig.KEY_NAMESPACE, CommonConfig.KEY_TABLE, CommonConfig.CATALOG_PROPS) - .optional(CommonConfig.HADOOP_PROPS, KEY_CASE_SENSITIVE) + .optional( + CommonConfig.HADOOP_PROPS, + CommonConfig.KERBEROS_PRINCIPAL, + CommonConfig.KERBEROS_KEYTAB_PATH, + CommonConfig.KRB5_PATH, + KEY_CASE_SENSITIVE) .build(); } } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java index a7503e6e30..ce76f5a512 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg.config; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.kerberos.KerberosConfig; import org.apache.seatunnel.common.config.ConfigRuntimeException; import lombok.Getter; @@ -33,7 +34,7 @@ import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch @Getter @ToString -public class CommonConfig implements Serializable { +public class CommonConfig extends KerberosConfig implements Serializable { private static final long serialVersionUID = 239821141534421580L; public static final Option<String> KEY_CATALOG_NAME = @@ -89,6 +90,12 @@ public class CommonConfig implements Serializable { private Map<String, String> hadoopProps; private String hadoopConfPath; + // kerberos + + private String kerberosPrincipal; + private String kerberosKeytabPath; + private String kerberosKrb5ConfPath; + public CommonConfig(ReadonlyConfig pluginConfig) { this.catalogName = checkArgumentNotNull(pluginConfig.get(KEY_CATALOG_NAME)); this.namespace = pluginConfig.get(KEY_NAMESPACE); @@ -99,6 +106,15 @@ public class CommonConfig implements Serializable { if (pluginConfig.toConfig().hasPath(KEY_CASE_SENSITIVE.key())) { this.caseSensitive = pluginConfig.get(KEY_CASE_SENSITIVE); } + if (pluginConfig.getOptional(KERBEROS_PRINCIPAL).isPresent()) { + this.kerberosPrincipal = pluginConfig.getOptional(KERBEROS_PRINCIPAL).get(); + } + if (pluginConfig.getOptional(KRB5_PATH).isPresent()) { + this.kerberosKrb5ConfPath = pluginConfig.getOptional(KRB5_PATH).get(); + } + if (pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).isPresent()) { + this.kerberosKeytabPath = pluginConfig.getOptional(KERBEROS_KEYTAB_PATH).get(); + } validate(); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java index e1635919d6..4f3f57e415 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java @@ -37,7 +37,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class IcebergTypeMapper { - public static SeaTunnelDataType<?> mapping(String field, @NonNull Type icebergType) { switch (icebergType.typeId()) { case BOOLEAN: diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java index 212bb6371d..47cfa33108 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java @@ -52,6 +52,9 @@ public class IcebergSinkFactory implements TableSinkFactory { .optional( SinkConfig.TABLE_PROPS, SinkConfig.HADOOP_PROPS, + SinkConfig.KERBEROS_PRINCIPAL, + SinkConfig.KERBEROS_KEYTAB_PATH, + SinkConfig.KRB5_PATH, SinkConfig.WRITE_PROPS, SinkConfig.AUTO_CREATE_PROPS, SinkConfig.TABLE_PRIMARY_KEYS, diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java index ef3beacd26..73cb71f45f 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java @@ -42,7 +42,7 @@ import java.util.Set; public abstract class AbstractSplitEnumerator implements SourceSplitEnumerator<IcebergFileScanTaskSplit, IcebergSplitEnumeratorState> { - protected final SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context; + protected final Context<IcebergFileScanTaskSplit> context; protected final SourceConfig sourceConfig; protected final Map<Integer, List<IcebergFileScanTaskSplit>> pendingSplits; diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java index ebc75bae9b..83f42879d0 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergSourceReader.java @@ -46,7 +46,7 @@ public class IcebergSourceReader implements SourceReader<SeaTunnelRow, IcebergFi private static final long POLL_WAIT_MS = 1000; - private final SourceReader.Context context; + private final Context context; private final Queue<IcebergFileScanTaskSplit> pendingSplits; private final Deserializer deserializer; private final Schema tableSchema; diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java index 9d56072c92..6c99eb409c 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java @@ -274,7 +274,7 @@ public class SchemaUtils { log.info("Schema for table {} updated with new columns", table.name()); } - private static boolean columnExists(org.apache.iceberg.Schema schema, SchemaAddColumn update) { + private static boolean columnExists(Schema schema, SchemaAddColumn update) { Types.StructType struct = update.parentName() == null ? schema.asStruct() @@ -282,13 +282,11 @@ public class SchemaUtils { return struct.field(update.name()) != null; } - private static boolean typeMatches( - org.apache.iceberg.Schema schema, SchemaModifyColumn update) { + private static boolean typeMatches(Schema schema, SchemaModifyColumn update) { return schema.findType(update.name()).typeId() == update.type().typeId(); } - private static boolean findColumns( - org.apache.iceberg.Schema schema, SchemaDeleteColumn deleteColumn) { + private static boolean findColumns(Schema schema, SchemaDeleteColumn deleteColumn) { return schema.findField(deleteColumn.name()) != null; } @@ -300,8 +298,7 @@ public class SchemaUtils { return IcebergTypeMapper.toIcebergType(rowType); } - public static PartitionSpec createPartitionSpec( - org.apache.iceberg.Schema schema, List<String> partitionBy) { + public static PartitionSpec createPartitionSpec(Schema schema, List<String> partitionBy) { if (partitionBy.isEmpty()) { return PartitionSpec.unpartitioned(); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java index 777f5a38e9..54aa3326de 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java @@ -75,6 +75,13 @@ class IcebergCatalogTest { configs.put(CommonConfig.KEY_CATALOG_NAME.key(), CATALOG_NAME); configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps); configs.put(SinkConfig.TABLE_DEFAULT_PARTITION_KEYS.key(), "dt_col"); + // hadoop config directory + configs.put(CommonConfig.HADOOP_CONF_PATH_PROP.key(), "/tmp/hadoop/conf"); + // hadoop kerberos config + configs.put(CommonConfig.KERBEROS_PRINCIPAL.key(), "hive/x...@xxxx.com"); + configs.put( + CommonConfig.KERBEROS_KEYTAB_PATH.key(), "/tmp/hadoop/conf/hive.service.keytab"); + configs.put(CommonConfig.KRB5_PATH.key(), "/tmp/hadoop/conf/krb5.conf"); icebergCatalog = new IcebergCatalog(CATALOG_NAME, ReadonlyConfig.fromMap(configs)); icebergCatalog.open(); }