This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 84c0b8d660 [Improve][API] Unified tables_configs and table_list (#8100)
84c0b8d660 is described below

commit 84c0b8d66075e6b921b550a01380c665937b60ba
Author: zhangdonghao <39961809+hawk9...@users.noreply.github.com>
AuthorDate: Mon Nov 25 14:39:09 2024 +0800

    [Improve][API] Unified tables_configs and table_list (#8100)
---
 docs/en/concept/schema-feature.md                  | 40 +++++++++++++++
 docs/en/connector-v2/source/Hive.md                | 18 +++++++
 docs/en/connector-v2/source/kafka.md               | 59 ++++++++++++++++++++++
 docs/zh/concept/schema-feature.md                  | 40 +++++++++++++++
 docs/zh/connector-v2/source/Kafka.md               | 59 ++++++++++++++++++++++
 .../api/table/catalog/CatalogOptions.java          | 10 ++++
 .../table/catalog/schema/TableSchemaOptions.java   |  8 +++
 .../seatunnel/assertion/sink/AssertConfig.java     |  8 ---
 .../seatunnel/assertion/sink/AssertSink.java       |  2 +-
 .../seatunnel/fake/config/FakeOption.java          |  6 ---
 .../fake/config/MultipleTableFakeSourceConfig.java |  5 +-
 .../seatunnel/fake/source/FakeSourceFactory.java   |  3 +-
 .../config/BaseMultipleTableFileSourceConfig.java  |  5 +-
 .../file/config/BaseSourceConfigOptions.java       | 10 ----
 .../file/local/source/LocalFileSourceFactory.java  |  2 +-
 .../file/oss/source/OssFileSourceFactory.java      |  4 +-
 .../{BaseHiveOptions.java => HiveOptions.java}     |  2 +-
 .../connectors/seatunnel/hive/sink/HiveSink.java   | 12 ++---
 .../seatunnel/hive/sink/HiveSinkOptions.java       |  4 +-
 .../seatunnel/hive/source/HiveSourceFactory.java   |  5 +-
 .../hive/source/config/HiveSourceOptions.java      | 36 -------------
 .../config/MultipleTableHiveSourceConfig.java      | 21 ++++++--
 .../seatunnel/hive/utils/HiveMetaStoreProxy.java   |  6 +--
 .../seatunnel/hive/utils/HiveTableUtils.java       |  4 +-
 .../connectors/seatunnel/kafka/config/Config.java  |  7 ---
 .../seatunnel/kafka/source/KafkaSourceConfig.java  | 14 +++--
 .../seatunnel/kafka/source/KafkaSourceFactory.java |  5 +-
 .../seatunnel/kudu/config/KuduSourceConfig.java    |  9 ----
 .../kudu/config/KuduSourceTableConfig.java         |  5 +-
 .../seatunnel/kudu/source/KuduSourceFactory.java   |  2 +-
 30 files changed, 295 insertions(+), 116 deletions(-)

diff --git a/docs/en/concept/schema-feature.md 
b/docs/en/concept/schema-feature.md
index 7f88b87d06..3a4e83e06e 100644
--- a/docs/en/concept/schema-feature.md
+++ b/docs/en/concept/schema-feature.md
@@ -172,6 +172,46 @@ constraintKeys = [
 | INDEX_KEY      | key         |
 | UNIQUE_KEY     | unique key  |
 
+## Multi table schemas
+
+```
+tables_configs = [
+  {
+    schema {
+      table = "database.schema.table1"
+      schema_first = false
+      comment = "comment"
+      columns = [
+        ...
+      ]
+      primaryKey {
+        ...
+      }
+      constraintKeys {
+        ...
+      }
+    }
+  },
+  {
+    schema = {
+      table = "database.schema.table2"
+      schema_first = false
+      comment = "comment"
+      columns = [
+        ...
+      ]
+      primaryKey {
+        ...
+      }
+      constraintKeys {
+        ...
+      }
+    }
+  }
+]
+
+```
+
 ## How to use schema
 
 ### Recommended
diff --git a/docs/en/connector-v2/source/Hive.md 
b/docs/en/connector-v2/source/Hive.md
index 6667ccc8ee..af4edc4730 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -120,6 +120,24 @@ Source plugin common parameters, please refer to [Source 
Common Options](../sour
 ```
 
 ### Example 2: Multiple tables
+> Note: Hive is a structured data source and should be use 'table_list', and 
'tables_configs' will be removed in the future.
+
+```bash
+
+  Hive {
+    table_list = [
+        {
+          table_name = "default.seatunnel_orc_1"
+          metastore_uri = "thrift://namenode001:9083"
+        },
+        {
+          table_name = "default.seatunnel_orc_2"
+          metastore_uri = "thrift://namenode001:9083"
+        }
+    ]
+  }
+
+```
 
 ```bash
 
diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index bcc659747b..dfc23a7572 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -189,6 +189,65 @@ source {
 
 > This is written to the same pg table according to different formats and 
 > topics of parsing kafka Perform upsert operations based on the id
 
+> Note: Kafka is an unstructured data source and should be use 
'tables_configs', and 'table_list' will be removed in the future.
+
+```hocon
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafka_e2e:9092"
+    tables_configs = [
+      {
+        topic = "^test-ogg-sou.*"
+        pattern = "true"
+        consumer.group = "ogg_multi_group"
+        start_mode = earliest
+        schema = {
+          fields {
+            id = "int"
+            name = "string"
+            description = "string"
+            weight = "string"
+          }
+        },
+        format = ogg_json
+      },
+      {
+        topic = "test-cdc_mds"
+        start_mode = earliest
+        schema = {
+          fields {
+            id = "int"
+            name = "string"
+            description = "string"
+            weight = "string"
+          }
+        },
+        format = canal_json
+      }
+    ]
+  }
+}
+
+sink {
+  Jdbc {
+    driver = org.postgresql.Driver
+    url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+    user = test
+    password = test
+    generate_sink_sql = true
+    database = test
+    table = public.sink
+    primary_keys = ["id"]
+  }
+}
+```
+
 ```hocon
 
 env {
diff --git a/docs/zh/concept/schema-feature.md 
b/docs/zh/concept/schema-feature.md
index e9aacb1703..b504d264f8 100644
--- a/docs/zh/concept/schema-feature.md
+++ b/docs/zh/concept/schema-feature.md
@@ -172,6 +172,46 @@ constraintKeys = [
 | INDEX_KEY  | 键   |
 | UNIQUE_KEY | 唯一键 |
 
+## 多表Schema
+
+```
+tables_configs = [
+  {
+    schema {
+      table = "database.schema.table1"
+      schema_first = false
+      comment = "comment"
+      columns = [
+        ...
+      ]
+      primaryKey {
+        ...
+      }
+      constraintKeys {
+        ...
+      }
+    }
+  },
+  {
+    schema = {
+      table = "database.schema.table2"
+      schema_first = false
+      comment = "comment"
+      columns = [
+        ...
+      ]
+      primaryKey {
+        ...
+      }
+      constraintKeys {
+        ...
+      }
+    }
+  }
+]
+
+```
+
 ## 如何使用schema
 
 ### 推荐
diff --git a/docs/zh/connector-v2/source/Kafka.md 
b/docs/zh/connector-v2/source/Kafka.md
index c2ff4ee125..04820cc7c1 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -181,6 +181,65 @@ source {
 
 > 根据不同的 Kafka 主题和格式解析数据,并基于 ID 执行 upsert 操作。
 
+> 注意: Kafka是一个非结构化数据源,应该使用`tables_configs`,将来会删除`table_list`
+
+```hocon
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Kafka {
+    bootstrap.servers = "kafka_e2e:9092"
+    tables_configs = [
+      {
+        topic = "^test-ogg-sou.*"
+        pattern = "true"
+        consumer.group = "ogg_multi_group"
+        start_mode = earliest
+        schema = {
+          fields {
+            id = "int"
+            name = "string"
+            description = "string"
+            weight = "string"
+          }
+        },
+        format = ogg_json
+      },
+      {
+        topic = "test-cdc_mds"
+        start_mode = earliest
+        schema = {
+          fields {
+            id = "int"
+            name = "string"
+            description = "string"
+            weight = "string"
+          }
+        },
+        format = canal_json
+      }
+    ]
+  }
+}
+
+sink {
+  Jdbc {
+    driver = org.postgresql.Driver
+    url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+    user = test
+    password = test
+    generate_sink_sql = true
+    database = test
+    table = public.sink
+    primary_keys = ["id"]
+  }
+}
+```
+
 ```hocon
 env {
   execution.parallelism = 1
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
index 2d1a3bc41b..046ac1dbed 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogOptions.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.api.table.catalog;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
@@ -56,4 +58,12 @@ public interface CatalogOptions {
                     .withDescription(
                             "The table names RegEx of the database to capture."
                                     + "The table name needs to include the 
database name, for example: database_.*\\.table_.*");
+
+    Option<List<Map<String, Object>>> TABLE_LIST =
+            Options.key("table_list")
+                    .type(new TypeReference<List<Map<String, Object>>>() {})
+                    .noDefaultValue()
+                    .withDescription(
+                            "SeaTunnel Multi Table Schema, acts on structed 
data sources. "
+                                    + "such as jdbc, paimon, doris, etc");
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
index 794dbe833c..34ca23ced4 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java
@@ -55,6 +55,14 @@ public class TableSchemaOptions {
                     .noDefaultValue()
                     .withDescription("SeaTunnel Schema");
 
+    public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
+            Options.key("tables_configs")
+                    .type(new TypeReference<List<Map<String, Object>>>() {})
+                    .noDefaultValue()
+                    .withDescription(
+                            "SeaTunnel Multi Table Schema, acts on unstructed 
data sources. "
+                                    + "such as file, assert, mongodb, etc");
+
     // We should use ColumnOptions instead of FieldOptions
     @Deprecated
     public static class FieldOptions {
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
index d9fcea69ae..a35e91837f 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertConfig.java
@@ -22,7 +22,6 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-import java.util.List;
 import java.util.Map;
 
 public class AssertConfig {
@@ -85,13 +84,6 @@ public class AssertConfig {
                     .withDescription(
                             "Rule definition of user's available data. Each 
rule represents one field validation or row num validation.");
 
-    public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
-            Options.key("tables_configs")
-                    .type(new TypeReference<List<Map<String, Object>>>() {})
-                    .noDefaultValue()
-                    .withDescription(
-                            "Table configuration for the sink. Each table 
configuration contains the table name and the rules for the table.");
-
     public static final Option<String> TABLE_PATH =
             Options.key("table_path")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
index e84b6fbcb2..8da98df73e 100644
--- 
a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
+++ 
b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSink.java
@@ -42,11 +42,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static 
org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TABLE_CONFIGS;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.CATALOG_TABLE_RULES;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.FIELD_RULES;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.ROW_RULES;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.RULES;
-import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_CONFIGS;
 import static 
org.apache.seatunnel.connectors.seatunnel.assertion.sink.AssertConfig.TABLE_PATH;
 
 public class AssertSink extends AbstractSimpleSink<SeaTunnelRow, Void>
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
index fe956152a8..9c05c86bb6 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java
@@ -27,12 +27,6 @@ import java.util.Map;
 
 public class FakeOption {
 
-    public static final Option<List<Map<String, Object>>> TABLES_CONFIGS =
-            Options.key("tables_configs")
-                    .type(new TypeReference<List<Map<String, Object>>>() {})
-                    .noDefaultValue()
-                    .withDescription("The multiple table config list of fake 
source");
-
     public static final Option<List<Map<String, Object>>> ROWS =
             Options.key("rows")
                     .type(new TypeReference<List<Map<String, Object>>>() {})
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
index 051d88a88f..6459e46566 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/MultipleTableFakeSourceConfig.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.fake.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -36,7 +37,7 @@ public class MultipleTableFakeSourceConfig implements 
Serializable {
     @Getter private List<FakeConfig> fakeConfigs;
 
     public MultipleTableFakeSourceConfig(ReadonlyConfig fakeSourceRootConfig) {
-        if 
(fakeSourceRootConfig.getOptional(FakeOption.TABLES_CONFIGS).isPresent()) {
+        if 
(fakeSourceRootConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent())
 {
             parseFromConfigs(fakeSourceRootConfig);
         } else {
             parseFromConfig(fakeSourceRootConfig);
@@ -56,7 +57,7 @@ public class MultipleTableFakeSourceConfig implements 
Serializable {
 
     private void parseFromConfigs(ReadonlyConfig readonlyConfig) {
         List<ReadonlyConfig> readonlyConfigs =
-                
readonlyConfig.getOptional(FakeOption.TABLES_CONFIGS).get().stream()
+                
readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).get().stream()
                         .map(ReadonlyConfig::fromMap)
                         .collect(Collectors.toList());
         // Use the config outside if it's not set in sub config
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
index 73af0b0cd5..4ea71dda5b 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java
@@ -54,7 +54,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.S
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL;
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_FAKE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_TEMPLATE;
-import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TABLES_CONFIGS;
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_HOUR_TEMPLATE;
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_MINUTE_TEMPLATE;
 import static 
org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.TIME_SECOND_TEMPLATE;
@@ -72,7 +71,7 @@ public class FakeSourceFactory implements TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .optional(TABLES_CONFIGS)
+                .optional(TableSchemaOptions.TABLE_CONFIGS)
                 .optional(TableSchemaOptions.SCHEMA)
                 .optional(STRING_FAKE_MODE)
                 .conditional(STRING_FAKE_MODE, FakeOption.FakeMode.TEMPLATE, 
STRING_TEMPLATE)
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java
index 0cda71d091..f44e0d1f6f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseMultipleTableFileSourceConfig.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 
 import com.google.common.collect.Lists;
 import lombok.Getter;
@@ -33,7 +34,7 @@ public abstract class BaseMultipleTableFileSourceConfig 
implements Serializable
     @Getter private List<BaseFileSourceConfig> fileSourceConfigs;
 
     public BaseMultipleTableFileSourceConfig(ReadonlyConfig 
fileSourceRootConfig) {
-        if 
(fileSourceRootConfig.getOptional(BaseSourceConfigOptions.TABLE_CONFIGS).isPresent())
 {
+        if 
(fileSourceRootConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent())
 {
             parseFromFileSourceConfigs(fileSourceRootConfig);
         } else {
             parseFromFileSourceConfig(fileSourceRootConfig);
@@ -42,7 +43,7 @@ public abstract class BaseMultipleTableFileSourceConfig 
implements Serializable
 
     private void parseFromFileSourceConfigs(ReadonlyConfig 
fileSourceRootConfig) {
         this.fileSourceConfigs =
-                
fileSourceRootConfig.get(BaseSourceConfigOptions.TABLE_CONFIGS).stream()
+                
fileSourceRootConfig.get(TableSchemaOptions.TABLE_CONFIGS).stream()
                         .map(ReadonlyConfig::fromMap)
                         .map(this::getBaseSourceConfig)
                         .collect(Collectors.toList());
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
index ddcc13d47d..de45726e3c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
-import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
@@ -27,7 +25,6 @@ import org.apache.seatunnel.common.utils.TimeUtils;
 import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 
 import java.util.List;
-import java.util.Map;
 
 public class BaseSourceConfigOptions {
     public static final Option<FileFormat> FILE_FORMAT_TYPE =
@@ -169,11 +166,4 @@ public class BaseSourceConfigOptions {
                     .enumType(ArchiveCompressFormat.class)
                     .defaultValue(ArchiveCompressFormat.NONE)
                     .withDescription("Archive compression codec");
-
-    public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
-            Options.key("tables_configs")
-                    .type(new TypeReference<List<Map<String, Object>>>() {})
-                    .noDefaultValue()
-                    .withDescription(
-                            "Local file source configs, used to create 
multiple local file source.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index fb76d276d5..0d58e506da 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -50,7 +50,7 @@ public class LocalFileSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .optional(BaseSourceConfigOptions.TABLE_CONFIGS)
+                .optional(TableSchemaOptions.TABLE_CONFIGS)
                 .optional(BaseSourceConfigOptions.FILE_PATH)
                 .optional(BaseSourceConfigOptions.FILE_FORMAT_TYPE)
                 .optional(BaseSourceConfigOptions.ENCODING)
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 0eddf05693..6f140330cc 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -51,9 +51,7 @@ public class OssFileSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .optional(
-                        org.apache.seatunnel.connectors.seatunnel.file.config
-                                .BaseSourceConfigOptions.TABLE_CONFIGS)
+                .optional(TableSchemaOptions.TABLE_CONFIGS)
                 .optional(OssConfigOptions.FILE_PATH)
                 .optional(OssConfigOptions.BUCKET)
                 .optional(OssConfigOptions.ACCESS_KEY)
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
similarity index 96%
rename from 
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
rename to 
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
index efed4e91c5..6fe55e2e71 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
 
-public class BaseHiveOptions extends BaseSourceConfigOptions {
+public class HiveOptions extends BaseSourceConfigOptions {
 
     public static final Option<String> TABLE_NAME =
             Options.key("table_name")
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 6e91baf001..13f48823b2 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -41,9 +41,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.sink.writter.HiveSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
 import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils;
 
@@ -216,16 +216,14 @@ public class HiveSink
                 StorageFactory.getStorageType(hdfsLocation)
                         .buildHadoopConfWithReadOnlyConfig(readonlyConfig);
         readonlyConfig
-                .getOptional(HiveSourceOptions.HDFS_SITE_PATH)
+                .getOptional(HiveOptions.HDFS_SITE_PATH)
                 .ifPresent(hadoopConf::setHdfsSitePath);
+        
readonlyConfig.getOptional(HiveOptions.REMOTE_USER).ifPresent(hadoopConf::setRemoteUser);
         readonlyConfig
-                .getOptional(HiveSourceOptions.REMOTE_USER)
-                .ifPresent(hadoopConf::setRemoteUser);
-        readonlyConfig
-                .getOptional(HiveSourceOptions.KERBEROS_PRINCIPAL)
+                .getOptional(HiveOptions.KERBEROS_PRINCIPAL)
                 .ifPresent(hadoopConf::setKerberosPrincipal);
         readonlyConfig
-                .getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH)
+                .getOptional(HiveOptions.KERBEROS_KEYTAB_PATH)
                 .ifPresent(hadoopConf::setKerberosKeytabPath);
         return hadoopConf;
     }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
index a241717a44..404244b411 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
 
-public class HiveSinkOptions extends BaseHiveOptions {
+public class HiveSinkOptions extends HiveOptions {
 
     public static final Option<Boolean> ABORT_DROP_PARTITION_METADATA =
             Options.key("abort_drop_partition_metadata")
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
index 07adfef106..63e235d3dc 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.hive.source;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
@@ -27,7 +29,6 @@ import 
org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
 
 import com.google.auto.service.AutoService;
 
@@ -51,7 +52,7 @@ public class HiveSourceFactory implements TableSourceFactory {
         return OptionRule.builder()
                 .optional(HiveConfig.TABLE_NAME)
                 .optional(HiveConfig.METASTORE_URI)
-                .optional(HiveSourceOptions.TABLE_CONFIGS)
+                .optional(TableSchemaOptions.TABLE_CONFIGS, 
CatalogOptions.TABLE_LIST)
                 .optional(BaseSourceConfigOptions.READ_PARTITIONS)
                 .optional(BaseSourceConfigOptions.READ_COLUMNS)
                 .optional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL)
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
deleted file mode 100644
index c30cb1783d..0000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
-
-import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions;
-
-import java.util.List;
-import java.util.Map;
-
-public class HiveSourceOptions extends BaseHiveOptions {
-    public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
-            Options.key("tables_configs")
-                    .type(new TypeReference<List<Map<String, Object>>>() {})
-                    .noDefaultValue()
-                    .withDescription(
-                            "Local file source configs, used to create 
multiple local file source.");
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
index 9db899ca8c..249ffed497 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
@@ -18,6 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 
 import com.google.common.collect.Lists;
 import lombok.Getter;
@@ -33,16 +35,27 @@ public class MultipleTableHiveSourceConfig implements 
Serializable {
     @Getter private List<HiveSourceConfig> hiveSourceConfigs;
 
     public MultipleTableHiveSourceConfig(ReadonlyConfig readonlyConfig) {
-        if 
(readonlyConfig.getOptional(HiveSourceOptions.TABLE_CONFIGS).isPresent()) {
-            parseFromLocalFileSourceConfigs(readonlyConfig);
+        if (readonlyConfig.getOptional(CatalogOptions.TABLE_LIST).isPresent()) 
{
+            parseFromLocalFileSourceByTableList(readonlyConfig);
+        } else if 
(readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent()) {
+            parseFromLocalFileSourceByTableConfigs(readonlyConfig);
         } else {
             parseFromLocalFileSourceConfig(readonlyConfig);
         }
     }
 
-    private void parseFromLocalFileSourceConfigs(ReadonlyConfig 
readonlyConfig) {
+    private void parseFromLocalFileSourceByTableList(ReadonlyConfig 
readonlyConfig) {
         this.hiveSourceConfigs =
-                readonlyConfig.get(HiveSourceOptions.TABLE_CONFIGS).stream()
+                readonlyConfig.get(CatalogOptions.TABLE_LIST).stream()
+                        .map(ReadonlyConfig::fromMap)
+                        .map(HiveSourceConfig::new)
+                        .collect(Collectors.toList());
+    }
+    // hive is structured, should use table_list
+    @Deprecated
+    private void parseFromLocalFileSourceByTableConfigs(ReadonlyConfig 
readonlyConfig) {
+        this.hiveSourceConfigs =
+                readonlyConfig.get(TableSchemaOptions.TABLE_CONFIGS).stream()
                         .map(ReadonlyConfig::fromMap)
                         .map(HiveSourceConfig::new)
                         .collect(Collectors.toList());
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 62d917ca0d..18482aa2c7 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -23,9 +23,9 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -54,7 +54,7 @@ public class HiveMetaStoreProxy {
     private static final List<String> HADOOP_CONF_FILES = 
ImmutableList.of("hive-site.xml");
 
     private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
-        String metastoreUri = 
readonlyConfig.get(HiveSourceOptions.METASTORE_URI);
+        String metastoreUri = readonlyConfig.get(HiveOptions.METASTORE_URI);
         String hiveHadoopConfigPath = 
readonlyConfig.get(HiveConfig.HADOOP_CONF_PATH);
         String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH);
         HiveConf hiveConf = new HiveConf();
@@ -121,7 +121,7 @@ public class HiveMetaStoreProxy {
                     String.format(
                             "Using this hive uris [%s], hive conf [%s] to 
initialize "
                                     + "hive metastore client instance failed",
-                            metastoreUri, 
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH));
+                            metastoreUri, 
readonlyConfig.get(HiveOptions.HIVE_SITE_PATH));
             throw new HiveConnectorException(
                     
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
         } catch (Exception e) {
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
index 7b9192ea64..0805fe04f3 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
@@ -23,16 +23,16 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
 
 import org.apache.hadoop.hive.metastore.api.Table;
 
 public class HiveTableUtils {
 
     public static Table getTableInfo(ReadonlyConfig readonlyConfig) {
-        String table = readonlyConfig.get(HiveSourceOptions.TABLE_NAME);
+        String table = readonlyConfig.get(HiveOptions.TABLE_NAME);
         TablePath tablePath = TablePath.of(table);
         if (tablePath.getDatabaseName() == null || tablePath.getTableName() == 
null) {
             throw new SeaTunnelRuntimeException(
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 293821e0ed..c01dc3e88d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -184,13 +184,6 @@ public class Config {
                     .withDescription(
                             "Semantics that can be chosen 
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
 
-    public static final Option<List<Map<String, Object>>> TABLE_LIST =
-            Options.key("table_list")
-                    .type(new TypeReference<List<Map<String, Object>>>() {})
-                    .noDefaultValue()
-                    .withDescription(
-                            "Topic list config. You can configure only one 
`table_list` or one `topic` at the same time");
-
     public static final Option<String> PROTOBUF_SCHEMA =
             Options.key("protobuf_schema")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 0f645d7218..1093d3f2f2 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.kafka.source;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
@@ -31,7 +32,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
@@ -114,11 +114,17 @@ public class KafkaSourceConfig implements Serializable {
     private Map<TablePath, ConsumerMetadata> createMapConsumerMetadata(
             ReadonlyConfig readonlyConfig) {
         List<ConsumerMetadata> consumerMetadataList;
-        if (readonlyConfig.getOptional(Config.TABLE_LIST).isPresent()) {
+        if 
(readonlyConfig.getOptional(TableSchemaOptions.TABLE_CONFIGS).isPresent()) {
             consumerMetadataList =
-                    readonlyConfig.get(Config.TABLE_LIST).stream()
+                    
readonlyConfig.get(TableSchemaOptions.TABLE_CONFIGS).stream()
                             .map(ReadonlyConfig::fromMap)
-                            .map(config -> createConsumerMetadata(config))
+                            .map(this::createConsumerMetadata)
+                            .collect(Collectors.toList());
+        } else if 
(readonlyConfig.getOptional(CatalogOptions.TABLE_LIST).isPresent()) {
+            consumerMetadataList =
+                    readonlyConfig.get(CatalogOptions.TABLE_LIST).stream()
+                            .map(ReadonlyConfig::fromMap)
+                            .map(this::createConsumerMetadata)
                             .collect(Collectors.toList());
         } else {
             consumerMetadataList =
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 431e9a8c19..fe6f50a8ea 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -20,6 +20,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.kafka.source;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
@@ -43,7 +45,8 @@ public class KafkaSourceFactory implements TableSourceFactory 
{
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(Config.BOOTSTRAP_SERVERS)
-                .exclusive(Config.TOPIC, Config.TABLE_LIST)
+                .exclusive(
+                        Config.TOPIC, TableSchemaOptions.TABLE_CONFIGS, 
CatalogOptions.TABLE_LIST)
                 .optional(
                         Config.START_MODE,
                         Config.PATTERN,
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
index 5abc62ad72..3fd783bbf8 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.config;
 
-import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -29,7 +27,6 @@ import lombok.Getter;
 import lombok.ToString;
 
 import java.util.List;
-import java.util.Map;
 
 @Getter
 @ToString
@@ -55,12 +52,6 @@ public class KuduSourceConfig extends CommonConfig {
                     .noDefaultValue()
                     .withDescription("Kudu scan filter expressions");
 
-    public static final Option<List<Map<String, Object>>> TABLE_LIST =
-            Options.key("table_list")
-                    .type(new TypeReference<List<Map<String, Object>>>() {})
-                    .noDefaultValue()
-                    .withDescription("table list config");
-
     private int batchSizeBytes;
 
     protected Long queryTimeout;
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
index b741b7474b..094807edc0 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogOptions;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -59,8 +60,8 @@ public class KuduSourceTableConfig implements Serializable {
 
         try (KuduCatalog kuduCatalog = (KuduCatalog) optionalCatalog.get()) {
             kuduCatalog.open();
-            if (config.getOptional(KuduSourceConfig.TABLE_LIST).isPresent()) {
-                return config.get(KuduSourceConfig.TABLE_LIST).stream()
+            if (config.getOptional(CatalogOptions.TABLE_LIST).isPresent()) {
+                return config.get(CatalogOptions.TABLE_LIST).stream()
                         .map(ReadonlyConfig::fromMap)
                         .map(readonlyConfig -> 
parseKuduSourceConfig(readonlyConfig, kuduCatalog))
                         .collect(Collectors.toList());
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
index b1bdb7e4ab..78002a9390 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
@@ -33,8 +33,8 @@ import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 
+import static org.apache.seatunnel.api.table.catalog.CatalogOptions.TABLE_LIST;
 import static 
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.MASTER;
-import static 
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_LIST;
 import static 
org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_NAME;
 
 @AutoService(Factory.class)

Reply via email to