This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 84c0b8d660 [Improve][API] Unified tables_configs and table_list (#8100) 84c0b8d660 is described below commit 84c0b8d66075e6b921b550a01380c665937b60ba Author: zhangdonghao <39961809+hawk9...@users.noreply.github.com> AuthorDate: Mon Nov 25 14:39:09 2024 +0800 [Improve][API] Unified tables_configs and table_list (#8100) --- docs/en/concept/schema-feature.md | 40 +++++++++++++++ docs/en/connector-v2/source/Hive.md | 18 +++++++ docs/en/connector-v2/source/kafka.md | 59 ++++++++++++++++++++++ docs/zh/concept/schema-feature.md | 40 +++++++++++++++ docs/zh/connector-v2/source/Kafka.md | 59 ++++++++++++++++++++++ .../api/table/catalog/CatalogOptions.java | 10 ++++ .../table/catalog/schema/TableSchemaOptions.java | 8 +++ .../seatunnel/assertion/sink/AssertConfig.java | 8 --- .../seatunnel/assertion/sink/AssertSink.java | 2 +- .../seatunnel/fake/config/FakeOption.java | 6 --- .../fake/config/MultipleTableFakeSourceConfig.java | 5 +- .../seatunnel/fake/source/FakeSourceFactory.java | 3 +- .../config/BaseMultipleTableFileSourceConfig.java | 5 +- .../file/config/BaseSourceConfigOptions.java | 10 ---- .../file/local/source/LocalFileSourceFactory.java | 2 +- .../file/oss/source/OssFileSourceFactory.java | 4 +- .../{BaseHiveOptions.java => HiveOptions.java} | 2 +- .../connectors/seatunnel/hive/sink/HiveSink.java | 12 ++--- .../seatunnel/hive/sink/HiveSinkOptions.java | 4 +- .../seatunnel/hive/source/HiveSourceFactory.java | 5 +- .../hive/source/config/HiveSourceOptions.java | 36 ------------- .../config/MultipleTableHiveSourceConfig.java | 21 ++++++-- .../seatunnel/hive/utils/HiveMetaStoreProxy.java | 6 +-- .../seatunnel/hive/utils/HiveTableUtils.java | 4 +- .../connectors/seatunnel/kafka/config/Config.java | 7 --- .../seatunnel/kafka/source/KafkaSourceConfig.java | 14 +++-- .../seatunnel/kafka/source/KafkaSourceFactory.java | 5 +- .../seatunnel/kudu/config/KuduSourceConfig.java | 9 ---- .../kudu/config/KuduSourceTableConfig.java | 5 +- .../seatunnel/kudu/source/KuduSourceFactory.java | 2 +- 30 files changed, 295 insertions(+), 116 deletions(-) diff --git a/docs/en/concept/schema-feature.md b/docs/en/concept/schema-feature.md index 7f88b87d06..3a4e83e06e 100644 --- a/docs/en/concept/schema-feature.md +++ b/docs/en/concept/schema-feature.md @@ -172,6 +172,46 @@ constraintKeys = [ | INDEX_KEY | key | | UNIQUE_KEY | unique key | +## Multi table schemas + +``` +tables_configs = [ + { + schema { + table = "database.schema.table1" + schema_first = false + comment = "comment" + columns = [ + ... + ] + primaryKey { + ... + } + constraintKeys { + ... + } + } + }, + { + schema = { + table = "database.schema.table2" + schema_first = false + comment = "comment" + columns = [ + ... + ] + primaryKey { + ... + } + constraintKeys { + ... + } + } + } +] + +``` + ## How to use schema ### Recommended diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md index 6667ccc8ee..af4edc4730 100644 --- a/docs/en/connector-v2/source/Hive.md +++ b/docs/en/connector-v2/source/Hive.md @@ -120,6 +120,24 @@ Source plugin common parameters, please refer to [Source Common Options](../sour ``` ### Example 2: Multiple tables +> Note: Hive is a structured data source and should be use 'table_list', and 'tables_configs' will be removed in the future. + +```bash + + Hive { + table_list = [ + { + table_name = "default.seatunnel_orc_1" + metastore_uri = "thrift://namenode001:9083" + }, + { + table_name = "default.seatunnel_orc_2" + metastore_uri = "thrift://namenode001:9083" + } + ] + } + +``` ```bash diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index bcc659747b..dfc23a7572 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -189,6 +189,65 @@ source { > This is written to the same pg table according to different formats and > topics of parsing kafka Perform upsert operations based on the id +> Note: Kafka is an unstructured data source and should be use 'tables_configs', and 'table_list' will be removed in the future. + +```hocon + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafka_e2e:9092" + tables_configs = [ + { + topic = "^test-ogg-sou.*" + pattern = "true" + consumer.group = "ogg_multi_group" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + }, + format = ogg_json + }, + { + topic = "test-cdc_mds" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + }, + format = canal_json + } + ] + } +} + +sink { + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + user = test + password = test + generate_sink_sql = true + database = test + table = public.sink + primary_keys = ["id"] + } +} +``` + ```hocon env { diff --git a/docs/zh/concept/schema-feature.md b/docs/zh/concept/schema-feature.md index e9aacb1703..b504d264f8 100644 --- a/docs/zh/concept/schema-feature.md +++ b/docs/zh/concept/schema-feature.md @@ -172,6 +172,46 @@ constraintKeys = [ | INDEX_KEY | 键 | | UNIQUE_KEY | 唯一键 | +## 多表Schema + +``` +tables_configs = [ + { + schema { + table = "database.schema.table1" + schema_first = false + comment = "comment" + columns = [ + ... + ] + primaryKey { + ... + } + constraintKeys { + ... + } + } + }, + { + schema = { + table = "database.schema.table2" + schema_first = false + comment = "comment" + columns = [ + ... + ] + primaryKey { + ... + } + constraintKeys { + ... + } + } + } +] + +``` + ## 如何使用schema ### 推荐 diff --git a/docs/zh/connector-v2/source/Kafka.md b/docs/zh/connector-v2/source/Kafka.md index c2ff4ee125..04820cc7c1 100644 --- a/docs/zh/connector-v2/source/Kafka.md +++ b/docs/zh/connector-v2/source/Kafka.md @@ -181,6 +181,65 @@ source { > 根据不同的 Kafka 主题和格式解析数据,并基于 ID 执行 upsert 操作。 +> 注意: Kafka是一个非结构化数据源,应该使用`tables_configs`,将来会删除`table_list` + +```hocon + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafka_e2e:9092" + tables_configs = [ + { + topic = "^test-ogg-sou.*" + pattern = "true" + consumer.group = "ogg_multi_group" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + }, + format = ogg_json + }, + { + topic = "test-cdc_mds" + start_mode = earliest + schema = { + fields { + id = "int" + name = "string" + description = "string" + weight = "string" + } + }, + format = canal_json + } + ] + } +} + +sink { + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF" + user = test + password = test + generate_sink_sql = true + database = test + table = public.sink + primary_keys = ["id"] + } +} +``` + ```hocon env { execution.parallelism = 1 diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java index 2d1a3bc41b..046ac1dbed 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.api.table.catalog; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; @@ -56,4 +58,12 @@ public interface CatalogOptions { .withDescription( "The table names RegEx of the database to capture." + "The table name needs to include the database name, for example: database_.*\\.table_.*"); + + Option<List<Map<String, Object>>> TABLE_LIST = + Options.key("table_list") + .type(new TypeReference<List<Map<String, Object>>>() {}) + .noDefaultValue() + .withDescription( + "SeaTunnel Multi Table Schema, acts on structed data sources. " + + "such as jdbc, paimon, doris, etc"); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java index 794dbe833c..34ca23ced4 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java @@ -55,6 +55,14 @@ public class TableSchemaOptions { .noDefaultValue() .withDescription("SeaTunnel Schema"); + public static final Option<List<Map<String, Object>>> TABLE_CONFIGS = + Options.key("tables_configs") + .type(new TypeReference<List<Map<String, Object>>>() {}) + .noDefaultValue() + .withDescription( + "SeaTunnel Multi Table Schema, acts on unstructed data sources. " + + "such as file, assert, mongodb, etc"); + // We should use ColumnOptions instead of FieldOptions @Deprecated public static class FieldOptions { diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java index d9fcea69ae..a35e91837f 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java @@ -22,7 +22,6 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import java.util.List; import java.util.Map; public class AssertConfig { @@ -85,13 +84,6 @@ public class AssertConfig { .withDescription( "Rule definition of user's available data. Each rule represents one field validation or row num validation."); - public static final Option<List<Map<String, Object>>> TABLE_CONFIGS = - Options.key("tables_configs") - .type(new TypeReference<List<Map<String, Object>>>() {}) - .noDefaultValue() - .withDescription( - "Table configuration for the sink. Each table configuration contains the table name and the rules for the table."); - public static final Option<String> TABLE_PATH = Options.key("table_path") .stringType() diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java index e84b6fbcb2..8da98df73e 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java @@ -42,11 +42,11 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TABLE_CONFIGS; import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES; import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_RULES; import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES; import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES; -import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_CONFIGS; import static org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_PATH; public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void> diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java index fe956152a8..9c05c86bb6 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java @@ -27,12 +27,6 @@ import java.util.Map; public class FakeOption { - public static final Option<List<Map<String, Object>>> TABLES_CONFIGS = - Options.key("tables_configs") - .type(new TypeReference<List<Map<String, Object>>>() {}) - .noDefaultValue() - .withDescription("The multiple table config list of fake source"); - public static final Option<List<Map<String, Object>>> ROWS = Options.key("rows") .type(new TypeReference<List<Map<String, Object>>>() {}) diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java index 051d88a88f..6459e46566 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.config; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.commons.collections4.CollectionUtils; @@ -36,7 +37,7 @@ public class MultipleTableFakeSourceConfig implements Serializable { @Getter private List<FakeConfig> fakeConfigs; public MultipleTableFakeSourceConfig(ReadonlyConfig fakeSourceRootConfig) { - if (fakeSourceRootConfig.getOptional(FakeOption.TABLES_CONFIGS).isPresent()) { + if (fakeSourceRootConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent()) { parseFromConfigs(fakeSourceRootConfig); } else { parseFromConfig(fakeSourceRootConfig); @@ -56,7 +57,7 @@ public class MultipleTableFakeSourceConfig implements Serializable { private void parseFromConfigs(ReadonlyConfig readonlyConfig) { List<ReadonlyConfig> readonlyConfigs = - readonlyConfig.getOptional(FakeOption.TABLES_CONFIGS).get().stream() + readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).get().stream() .map(ReadonlyConfig::fromMap) .collect(Collectors.toList()); // Use the config outside if it's not set in sub config diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java index 73af0b0cd5..4ea71dda5b 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java @@ -54,7 +54,6 @@ import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.S import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_FAKE_MODE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_TEMPLATE; -import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TABLES_CONFIGS; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_HOUR_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_MINUTE_TEMPLATE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_SECOND_TEMPLATE; @@ -72,7 +71,7 @@ public class FakeSourceFactory implements TableSourceFactory { @Override public OptionRule optionRule() { return OptionRule.builder() - .optional(TABLES_CONFIGS) + .optional(TableSchemaOptions.TABLE_CONFIGS) .optional(TableSchemaOptions.SCHEMA) .optional(STRING_FAKE_MODE) .conditional(STRING_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, STRING_TEMPLATE) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java index 0cda71d091..f44e0d1f6f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.config; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import com.google.common.collect.Lists; import lombok.Getter; @@ -33,7 +34,7 @@ public abstract class BaseMultipleTableFileSourceConfig implements Serializable @Getter private List<BaseFileSourceConfig> fileSourceConfigs; public BaseMultipleTableFileSourceConfig(ReadonlyConfig fileSourceRootConfig) { - if (fileSourceRootConfig.getOptional(BaseSourceConfigOptions.TABLE_CONFIGS).isPresent()) { + if (fileSourceRootConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent()) { parseFromFileSourceConfigs(fileSourceRootConfig); } else { parseFromFileSourceConfig(fileSourceRootConfig); @@ -42,7 +43,7 @@ public abstract class BaseMultipleTableFileSourceConfig implements Serializable private void parseFromFileSourceConfigs(ReadonlyConfig fileSourceRootConfig) { this.fileSourceConfigs = - fileSourceRootConfig.get(BaseSourceConfigOptions.TABLE_CONFIGS).stream() + fileSourceRootConfig.get(TableSchemaOptions.TABLE_CONFIGS).stream() .map(ReadonlyConfig::fromMap) .map(this::getBaseSourceConfig) .collect(Collectors.toList()); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java index ddcc13d47d..de45726e3c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.file.config; -import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; - import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.common.utils.DateTimeUtils; @@ -27,7 +25,6 @@ import org.apache.seatunnel.common.utils.TimeUtils; import org.apache.seatunnel.format.text.constant.TextFormatConstant; import java.util.List; -import java.util.Map; public class BaseSourceConfigOptions { public static final Option<FileFormat> FILE_FORMAT_TYPE = @@ -169,11 +166,4 @@ public class BaseSourceConfigOptions { .enumType(ArchiveCompressFormat.class) .defaultValue(ArchiveCompressFormat.NONE) .withDescription("Archive compression codec"); - - public static final Option<List<Map<String, Object>>> TABLE_CONFIGS = - Options.key("tables_configs") - .type(new TypeReference<List<Map<String, Object>>>() {}) - .noDefaultValue() - .withDescription( - "Local file source configs, used to create multiple local file source."); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java index fb76d276d5..0d58e506da 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java @@ -50,7 +50,7 @@ public class LocalFileSourceFactory implements TableSourceFactory { @Override public OptionRule optionRule() { return OptionRule.builder() - .optional(BaseSourceConfigOptions.TABLE_CONFIGS) + .optional(TableSchemaOptions.TABLE_CONFIGS) .optional(BaseSourceConfigOptions.FILE_PATH) .optional(BaseSourceConfigOptions.FILE_FORMAT_TYPE) .optional(BaseSourceConfigOptions.ENCODING) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index 0eddf05693..6f140330cc 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -51,9 +51,7 @@ public class OssFileSourceFactory implements TableSourceFactory { @Override public OptionRule optionRule() { return OptionRule.builder() - .optional( - org.apache.seatunnel.connectors.seatunnel.file.config - .BaseSourceConfigOptions.TABLE_CONFIGS) + .optional(TableSchemaOptions.TABLE_CONFIGS) .optional(OssConfigOptions.FILE_PATH) .optional(OssConfigOptions.BUCKET) .optional(OssConfigOptions.ACCESS_KEY) diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java similarity index 96% rename from seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java rename to seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java index efed4e91c5..6fe55e2e71 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java @@ -21,7 +21,7 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; -public class BaseHiveOptions extends BaseSourceConfigOptions { +public class HiveOptions extends BaseSourceConfigOptions { public static final Option<String> TABLE_NAME = Options.key("table_name") diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java index 6e91baf001..13f48823b2 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java @@ -41,9 +41,9 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory; import org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; import org.apache.seatunnel.connectors.seatunnel.hive.sink.writter.HiveSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory; import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils; @@ -216,16 +216,14 @@ public class HiveSink StorageFactory.getStorageType(hdfsLocation) .buildHadoopConfWithReadOnlyConfig(readonlyConfig); readonlyConfig - .getOptional(HiveSourceOptions.HDFS_SITE_PATH) + .getOptional(HiveOptions.HDFS_SITE_PATH) .ifPresent(hadoopConf::setHdfsSitePath); + readonlyConfig.getOptional(HiveOptions.REMOTE_USER).ifPresent(hadoopConf::setRemoteUser); readonlyConfig - .getOptional(HiveSourceOptions.REMOTE_USER) - .ifPresent(hadoopConf::setRemoteUser); - readonlyConfig - .getOptional(HiveSourceOptions.KERBEROS_PRINCIPAL) + .getOptional(HiveOptions.KERBEROS_PRINCIPAL) .ifPresent(hadoopConf::setKerberosPrincipal); readonlyConfig - .getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH) + .getOptional(HiveOptions.KERBEROS_KEYTAB_PATH) .ifPresent(hadoopConf::setKerberosKeytabPath); return hadoopConf; } diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java index a241717a44..404244b411 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java @@ -19,9 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.hive.sink; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions; -public class HiveSinkOptions extends BaseHiveOptions { +public class HiveSinkOptions extends HiveOptions { public static final Option<Boolean> ABORT_DROP_PARTITION_METADATA = Options.key("abort_drop_partition_metadata") diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java index 07adfef106..63e235d3dc 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java @@ -20,6 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.hive.source; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.CatalogOptions; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; @@ -27,7 +29,6 @@ import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; -import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; import com.google.auto.service.AutoService; @@ -51,7 +52,7 @@ public class HiveSourceFactory implements TableSourceFactory { return OptionRule.builder() .optional(HiveConfig.TABLE_NAME) .optional(HiveConfig.METASTORE_URI) - .optional(HiveSourceOptions.TABLE_CONFIGS) + .optional(TableSchemaOptions.TABLE_CONFIGS, CatalogOptions.TABLE_LIST) .optional(BaseSourceConfigOptions.READ_PARTITIONS) .optional(BaseSourceConfigOptions.READ_COLUMNS) .optional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL) diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java deleted file mode 100644 index c30cb1783d..0000000000 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.hive.source.config; - -import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; - -import org.apache.seatunnel.api.configuration.Option; -import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions; - -import java.util.List; -import java.util.Map; - -public class HiveSourceOptions extends BaseHiveOptions { - public static final Option<List<Map<String, Object>>> TABLE_CONFIGS = - Options.key("tables_configs") - .type(new TypeReference<List<Map<String, Object>>>() {}) - .noDefaultValue() - .withDescription( - "Local file source configs, used to create multiple local file source."); -} diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java index 9db899ca8c..249ffed497 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java @@ -18,6 +18,8 @@ package org.apache.seatunnel.connectors.seatunnel.hive.source.config; import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogOptions; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import com.google.common.collect.Lists; import lombok.Getter; @@ -33,16 +35,27 @@ public class MultipleTableHiveSourceConfig implements Serializable { @Getter private List<HiveSourceConfig> hiveSourceConfigs; public MultipleTableHiveSourceConfig(ReadonlyConfig readonlyConfig) { - if (readonlyConfig.getOptional(HiveSourceOptions.TABLE_CONFIGS).isPresent()) { - parseFromLocalFileSourceConfigs(readonlyConfig); + if (readonlyConfig.getOptional(CatalogOptions.TABLE_LIST).isPresent()) { + parseFromLocalFileSourceByTableList(readonlyConfig); + } else if (readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent()) { + parseFromLocalFileSourceByTableConfigs(readonlyConfig); } else { parseFromLocalFileSourceConfig(readonlyConfig); } } - private void parseFromLocalFileSourceConfigs(ReadonlyConfig readonlyConfig) { + private void parseFromLocalFileSourceByTableList(ReadonlyConfig readonlyConfig) { this.hiveSourceConfigs = - readonlyConfig.get(HiveSourceOptions.TABLE_CONFIGS).stream() + readonlyConfig.get(CatalogOptions.TABLE_LIST).stream() + .map(ReadonlyConfig::fromMap) + .map(HiveSourceConfig::new) + .collect(Collectors.toList()); + } + // hive is structured, should use table_list + @Deprecated + private void parseFromLocalFileSourceByTableConfigs(ReadonlyConfig readonlyConfig) { + this.hiveSourceConfigs = + readonlyConfig.get(TableSchemaOptions.TABLE_CONFIGS).stream() .map(ReadonlyConfig::fromMap) .map(HiveSourceConfig::new) .collect(Collectors.toList()); diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java index 62d917ca0d..18482aa2c7 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java @@ -23,9 +23,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory; import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; -import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -54,7 +54,7 @@ public class HiveMetaStoreProxy { private static final List<String> HADOOP_CONF_FILES = ImmutableList.of("hive-site.xml"); private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) { - String metastoreUri = readonlyConfig.get(HiveSourceOptions.METASTORE_URI); + String metastoreUri = readonlyConfig.get(HiveOptions.METASTORE_URI); String hiveHadoopConfigPath = readonlyConfig.get(HiveConfig.HADOOP_CONF_PATH); String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH); HiveConf hiveConf = new HiveConf(); @@ -121,7 +121,7 @@ public class HiveMetaStoreProxy { String.format( "Using this hive uris [%s], hive conf [%s] to initialize " + "hive metastore client instance failed", - metastoreUri, readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH)); + metastoreUri, readonlyConfig.get(HiveOptions.HIVE_SITE_PATH)); throw new HiveConnectorException( HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e); } catch (Exception e) { diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java index 7b9192ea64..0805fe04f3 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java @@ -23,16 +23,16 @@ import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants; +import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; -import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions; import org.apache.hadoop.hive.metastore.api.Table; public class HiveTableUtils { public static Table getTableInfo(ReadonlyConfig readonlyConfig) { - String table = readonlyConfig.get(HiveSourceOptions.TABLE_NAME); + String table = readonlyConfig.get(HiveOptions.TABLE_NAME); TablePath tablePath = TablePath.of(table); if (tablePath.getDatabaseName() == null || tablePath.getTableName() == null) { throw new SeaTunnelRuntimeException( diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index 293821e0ed..c01dc3e88d 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -184,13 +184,6 @@ public class Config { .withDescription( "Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON."); - public static final Option<List<Map<String, Object>>> TABLE_LIST = - Options.key("table_list") - .type(new TypeReference<List<Map<String, Object>>>() {}) - .noDefaultValue() - .withDescription( - "Topic list config. You can configure only one `table_list` or one `topic` at the same time"); - public static final Option<String> PROTOBUF_SCHEMA = Options.key("protobuf_schema") .stringType() diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java index 0f645d7218..1093d3f2f2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java @@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.source; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.table.catalog.CatalogOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.TableIdentifier; @@ -31,7 +32,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config; import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay; import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode; @@ -114,11 +114,17 @@ public class KafkaSourceConfig implements Serializable { private Map<TablePath, ConsumerMetadata> createMapConsumerMetadata( ReadonlyConfig readonlyConfig) { List<ConsumerMetadata> consumerMetadataList; - if (readonlyConfig.getOptional(Config.TABLE_LIST).isPresent()) { + if (readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent()) { consumerMetadataList = - readonlyConfig.get(Config.TABLE_LIST).stream() + readonlyConfig.get(TableSchemaOptions.TABLE_CONFIGS).stream() .map(ReadonlyConfig::fromMap) - .map(config -> createConsumerMetadata(config)) + .map(this::createConsumerMetadata) + .collect(Collectors.toList()); + } else if (readonlyConfig.getOptional(CatalogOptions.TABLE_LIST).isPresent()) { + consumerMetadataList = + readonlyConfig.get(CatalogOptions.TABLE_LIST).stream() + .map(ReadonlyConfig::fromMap) + .map(this::createConsumerMetadata) .collect(Collectors.toList()); } else { consumerMetadataList = diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java index 431e9a8c19..fe6f50a8ea 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java @@ -20,6 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.source; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.CatalogOptions; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; @@ -43,7 +45,8 @@ public class KafkaSourceFactory implements TableSourceFactory { public OptionRule optionRule() { return OptionRule.builder() .required(Config.BOOTSTRAP_SERVERS) - .exclusive(Config.TOPIC, Config.TABLE_LIST) + .exclusive( + Config.TOPIC, TableSchemaOptions.TABLE_CONFIGS, CatalogOptions.TABLE_LIST) .optional( Config.START_MODE, Config.PATTERN, diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java index 5abc62ad72..3fd783bbf8 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.config; -import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; - import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; @@ -29,7 +27,6 @@ import lombok.Getter; import lombok.ToString; import java.util.List; -import java.util.Map; @Getter @ToString @@ -55,12 +52,6 @@ public class KuduSourceConfig extends CommonConfig { .noDefaultValue() .withDescription("Kudu scan filter expressions"); - public static final Option<List<Map<String, Object>>> TABLE_LIST = - Options.key("table_list") - .type(new TypeReference<List<Map<String, Object>>>() {}) - .noDefaultValue() - .withDescription("table list config"); - private int batchSizeBytes; protected Long queryTimeout; diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java index b741b7474b..094807edc0 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java @@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.config; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -59,8 +60,8 @@ public class KuduSourceTableConfig implements Serializable { try (KuduCatalog kuduCatalog = (KuduCatalog) optionalCatalog.get()) { kuduCatalog.open(); - if (config.getOptional(KuduSourceConfig.TABLE_LIST).isPresent()) { - return config.get(KuduSourceConfig.TABLE_LIST).stream() + if (config.getOptional(CatalogOptions.TABLE_LIST).isPresent()) { + return config.get(CatalogOptions.TABLE_LIST).stream() .map(ReadonlyConfig::fromMap) .map(readonlyConfig -> parseKuduSourceConfig(readonlyConfig, kuduCatalog)) .collect(Collectors.toList()); diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java index b1bdb7e4ab..78002a9390 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java @@ -33,8 +33,8 @@ import com.google.auto.service.AutoService; import java.io.Serializable; +import static org.apache.seatunnel.api.table.catalog.CatalogOptions.TABLE_LIST; import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.MASTER; -import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_LIST; import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_NAME; @AutoService(Factory.class)