This is an automated email from the ASF dual-hosted git repository.

zhangdonghao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0d52102241 [Feature][Connector-v2] Support multi paimon source (#9759)
0d52102241 is described below

commit 0d52102241afcc98294b21061c2db02b35c9d2e1
Author: xiaochen <[email protected]>
AuthorDate: Wed Aug 27 14:18:41 2025 +0800

    [Feature][Connector-v2] Support multi paimon source (#9759)
---
 docs/en/connector-v2/source/Paimon.md              | 52 +++++++++---
 docs/zh/connector-v2/source/Paimon.md              | 52 +++++++++---
 .../seatunnel/paimon/config/PaimonConfig.java      |  9 +-
 .../paimon/config/PaimonSourceConfig.java          |  5 ++
 .../paimon/config/PaimonSourceOptions.java         | 11 +++
 .../paimon/config/PaimonSourceTableConfig.java     | 66 +++++++++++++++
 .../seatunnel/paimon/source/PaimonSource.java      | 98 +++++++++++++---------
 .../paimon/source/PaimonSourceFactory.java         |  7 +-
 .../paimon/source/PaimonSourceReader.java          | 29 +++++--
 .../seatunnel/paimon/source/PaimonSourceSplit.java | 10 ++-
 .../paimon/source/PaimonSourceSplitGenerator.java  |  4 +-
 .../source/enumerator/AbstractSplitEnumerator.java | 75 ++++++++++-------
 .../PaimonBatchSourceSplitEnumerator.java          |  8 +-
 .../PaimonStreamSourceSplitEnumerator.java         | 14 +++-
 .../paimon/config/PaimonSourceTableConfigTest.java | 83 ++++++++++++++++++
 .../seatunnel/e2e/connector/paimon/PaimonIT.java   | 12 +++
 .../src/test/resources/fake_to_paimon_2.conf       | 62 ++++++++++++++
 .../paimon-to-assert-with-multipletable.conf       | 77 +++++++++++++++++
 18 files changed, 550 insertions(+), 124 deletions(-)

diff --git a/docs/en/connector-v2/source/Paimon.md 
b/docs/en/connector-v2/source/Paimon.md
index 4bf2a2e180..9111598ab8 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -46,19 +46,20 @@ Read data from Apache Paimon.
 
 ## Options
 
-| name                    |  type  | required | default value |
-|-------------------------|--------|----------|---------------|
-| warehouse               | String | Yes      | -             |
-| catalog_type            | String | No       | filesystem    |
-| catalog_uri             | String | No       | -             |
-| database                | String | Yes      | -             |
-| table                   | String | Yes      | -             |
-| user                    | String | No       | -             |
-| password                | String | No      | -             |
-| hdfs_site_path          | String | No       | -             |
-| query                   | String | No       | -             |
-| paimon.hadoop.conf      | Map    | No       | -             |
-| paimon.hadoop.conf-path | String | No       | -             |
+| name                    | type     | required       | default value |
+|-------------------------|----------|----------------|---------------|
+| warehouse               | String   | Yes            | -             |
+| catalog_type            | String   | No             | filesystem    |
+| catalog_uri             | String   | No             | -             |
+| database                | String   | Yes            | -             |
+| table                   | String   | no             | -             |
+| table_list              | array    | no             | -             |
+| user                    | String   | No             | -             |
+| password                | String   | No             | -             |
+| hdfs_site_path          | String   | No             | -             |
+| query                   | String   | No             | -             |
+| paimon.hadoop.conf      | Map      | No             | -             |
+| paimon.hadoop.conf-path | String   | No             | -             |
 
 ### warehouse [string]
 
@@ -80,6 +81,10 @@ The database you want to access
 
 The table you want to access
 
+### table_list [array]
+
+The list of tables to be read, you can use this configuration instead of 
`table`
+
 ### hdfs_site_path [string]
 
 The file path of `hdfs-site.xml`
@@ -134,6 +139,27 @@ source {
 }
 ```
 
+### Multiple tables
+
+```hocon
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "default"
+    table_list = [
+      {
+        table = "table1"
+        query = "select * from table1 where id > 100"
+      },
+      {
+        table = "table2"
+        query = "select * from table2 where id > 100"
+      }
+    ]
+  }
+}
+```
+
 ### Filter example
 
 ```hocon
diff --git a/docs/zh/connector-v2/source/Paimon.md 
b/docs/zh/connector-v2/source/Paimon.md
index 1cb7517c76..ccd61d98bf 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -46,19 +46,20 @@ import ChangeLog from '../changelog/connector-paimon.md';
 
 ## 配置选项
 
-| 名称                      |  类型  | 是否必须 | 默认值 |
-|-------------------------|--------|----------|---------------|
-| warehouse               | String | 是      | -             |
-| catalog_type            | String | 否       | filesystem    |
-| catalog_uri             | String | 否       | -             |
-| database                | String | 是      | -             |
-| table                   | String | 是      | -             |
-| user                    | String | 否      | -             |
-| password                | String | 否      | -             |
-| hdfs_site_path          | String | 否       | -             |
-| query                   | String | 否       | -             |
-| paimon.hadoop.conf      | Map    | 否       | -             |
-| paimon.hadoop.conf-path | String | 否       | -             |
+| 名称                      | 类型       | 是否必须   | 默认值 |
+|-------------------------|----------|--------|---------------|
+| warehouse               | String   | 是      | -             |
+| catalog_type            | String   | 否      | filesystem    |
+| catalog_uri             | String   | 否      | -             |
+| database                | String   | 是      | -             |
+| table                   | String   | 否      | -             |
+| table_list              | array    | 否      | -             |
+| user                    | String   | 否      | -             |
+| password                | String   | 否      | -             |
+| hdfs_site_path          | String   | 否      | -             |
+| query                   | String   | 否      | -             |
+| paimon.hadoop.conf      | Map      | 否      | -             |
+| paimon.hadoop.conf-path | String   | 否      | -             |
 
 ### warehouse [string]
 
@@ -80,6 +81,10 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
 
 需要访问的表
 
+### table_list [array]
+
+`Paimon` 表名列表,当需要同时读取多表时使用此配置代替 table
+
 ### hdfs_site_path [string]
 
 `hdfs-site.xml` 文件地址
@@ -137,6 +142,27 @@ source {
 }
 ```
 
+### 读取多表
+
+```hocon
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "default"
+    table_list = [
+      {
+        table = "table1"
+        query = "select * from table1 where id > 100"
+      },
+      {
+        table = "table2"
+        query = "select * from table2 where id > 100"
+      }
+    ]
+  }
+}
+```
+
 ### Filter 示例
 
 ```hocon
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index d47029207a..39c713e45b 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -65,13 +65,8 @@ public class PaimonConfig implements Serializable {
                 checkArgumentNotBlank(
                         readonlyConfig.get(PaimonBaseOptions.WAREHOUSE),
                         PaimonBaseOptions.WAREHOUSE.key());
-        this.namespace =
-                checkArgumentNotBlank(
-                        readonlyConfig.get(PaimonBaseOptions.DATABASE),
-                        PaimonBaseOptions.DATABASE.key());
-        this.table =
-                checkArgumentNotBlank(
-                        readonlyConfig.get(PaimonBaseOptions.TABLE), 
PaimonBaseOptions.TABLE.key());
+        this.namespace = readonlyConfig.get(PaimonBaseOptions.DATABASE);
+        this.table = readonlyConfig.get(PaimonBaseOptions.TABLE);
         this.hdfsSitePath = 
readonlyConfig.get(PaimonBaseOptions.HDFS_SITE_PATH);
         this.hadoopConfProps = 
readonlyConfig.get(PaimonBaseOptions.HADOOP_CONF);
         this.hadoopConfPath = 
readonlyConfig.get(PaimonBaseOptions.HADOOP_CONF_PATH);
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
index 2d1bc77851..1ca24d6aca 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
@@ -21,13 +21,18 @@ import 
org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import lombok.Getter;
 
+import java.util.ArrayList;
+import java.util.List;
+
 @Getter
 public class PaimonSourceConfig extends PaimonConfig {
 
     private String query;
+    private List<PaimonSourceTableConfig> tableConfigList = new ArrayList<>();
 
     public PaimonSourceConfig(ReadonlyConfig readonlyConfig) {
         super(readonlyConfig);
         this.query = readonlyConfig.get(PaimonSourceOptions.QUERY_SQL);
+        this.tableConfigList = PaimonSourceTableConfig.of(readonlyConfig);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceOptions.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceOptions.java
index 466302f623..baad34fa85 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceOptions.java
@@ -17,9 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.config;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
+import java.util.List;
+import java.util.Map;
+
 public class PaimonSourceOptions extends PaimonBaseOptions {
 
     public static final Option<String> QUERY_SQL =
@@ -27,4 +32,10 @@ public class PaimonSourceOptions extends PaimonBaseOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The query of paimon source");
+
+    public static final Option<List<Map<String, Object>>> TABLE_LIST =
+            Options.key("table_list")
+                    .type(new TypeReference<List<Map<String, Object>>>() {})
+                    .noDefaultValue()
+                    .withDescription("table list config");
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfig.java
new file mode 100644
index 0000000000..ecebcb5a91
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.config;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Getter
+public class PaimonSourceTableConfig implements Serializable {
+
+    private final String database;
+    private final String table;
+    private final String query;
+
+    private PaimonSourceTableConfig(String database, String table, String 
query) {
+        this.database = database;
+        this.table = table;
+        this.query = query;
+    }
+
+    public static PaimonSourceTableConfig 
parsePaimonSourceConfig(ReadonlyConfig config) {
+        String database = config.get(PaimonBaseOptions.DATABASE);
+        String table = config.get(PaimonBaseOptions.TABLE);
+        String query = 
config.getOptional(PaimonSourceOptions.QUERY_SQL).orElse(null);
+        return new PaimonSourceTableConfig(database, table, query);
+    }
+
+    public static List<PaimonSourceTableConfig> of(ReadonlyConfig config) {
+        if (config.getOptional(PaimonSourceOptions.TABLE_LIST).isPresent()) {
+            List<Map<String, Object>> maps = 
config.get(PaimonSourceOptions.TABLE_LIST);
+            return maps.stream()
+                    .map(ReadonlyConfig::fromMap)
+                    .map(PaimonSourceTableConfig::parsePaimonSourceConfig)
+                    .collect(Collectors.toList());
+        }
+        return Lists.newArrayList(parsePaimonSourceConfig(config));
+    }
+
+    public TablePath getTablePath() {
+        return TablePath.of(database, table);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
index ef817227e1..2d970403ce 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.source;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
@@ -42,9 +45,9 @@ import org.apache.paimon.types.RowType;
 
 import net.sf.jsqlparser.statement.select.PlainSelect;
 
-import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex;
@@ -58,43 +61,57 @@ public class PaimonSource
 
     public static final String PLUGIN_NAME = "Paimon";
 
-    private SeaTunnelRowType seaTunnelRowType;
-
-    private Table paimonTable;
-
     private JobContext jobContext;
 
-    private CatalogTable catalogTable;
-
-    protected final ReadBuilder readBuilder;
+    private List<CatalogTable> catalogTables = Lists.newArrayList();
+    private Map<String, Table> paimonTables = Maps.newHashMap();
+    private Map<String, SeaTunnelRowType> seaTunnelRowTypes = 
Maps.newHashMap();
+    private Map<String, ReadBuilder> readBuilders = Maps.newHashMap();
 
     public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog 
paimonCatalog) {
-        PaimonSourceConfig paimonSourceConfig = new 
PaimonSourceConfig(readonlyConfig);
-        TablePath tablePath =
-                TablePath.of(paimonSourceConfig.getNamespace(), 
paimonSourceConfig.getTable());
-        this.catalogTable = paimonCatalog.getTable(tablePath);
-        this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
-
-        String filterSql = paimonSourceConfig.getQuery();
-        PlainSelect plainSelect = convertToPlainSelect(filterSql);
-        RowType paimonRowType = this.paimonTable.rowType();
-        String[] filedNames = paimonRowType.getFieldNames().toArray(new 
String[0]);
-
-        Predicate predicate = null;
-        int[] projectionIndex = null;
-        if (!Objects.isNull(plainSelect)) {
-            projectionIndex = 
convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
-            if (!Objects.isNull(projectionIndex)) {
-                this.catalogTable =
-                        paimonCatalog.getTableWithProjection(tablePath, 
projectionIndex);
-            }
-            predicate =
-                    
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
-                            paimonRowType, plainSelect);
-        }
-        this.seaTunnelRowType = RowTypeConverter.convert(paimonRowType, 
projectionIndex);
-        this.readBuilder =
-                
paimonTable.newReadBuilder().withProjection(projectionIndex).withFilter(predicate);
+        new PaimonSourceConfig(readonlyConfig)
+                .getTableConfigList()
+                .forEach(
+                        tableConfig -> {
+                            TablePath tablePath = tableConfig.getTablePath();
+                            CatalogTable catalogTable = 
paimonCatalog.getTable(tablePath);
+                            Table paimonTable = 
paimonCatalog.getPaimonTable(tablePath);
+                            RowType paimonRowType = paimonTable.rowType();
+                            String[] filedNames =
+                                    paimonRowType.getFieldNames().toArray(new 
String[0]);
+
+                            PlainSelect plainSelect = 
convertToPlainSelect(tableConfig.getQuery());
+                            Predicate predicate = null;
+                            int[] projectionIndex = null;
+                            if (!Objects.isNull(plainSelect)) {
+                                projectionIndex =
+                                        
convertSqlSelectToPaimonProjectionIndex(
+                                                filedNames, plainSelect);
+                                if (!Objects.isNull(projectionIndex)) {
+                                    catalogTable =
+                                            
paimonCatalog.getTableWithProjection(
+                                                    tablePath, 
projectionIndex);
+                                }
+                                predicate =
+                                        SqlToPaimonPredicateConverter
+                                                
.convertSqlWhereToPaimonPredicate(
+                                                        paimonRowType, 
plainSelect);
+                            }
+                            this.catalogTables.add(catalogTable);
+                            String tableKey = tablePath.toString();
+                            this.seaTunnelRowTypes.put(
+                                    tableKey,
+                                    RowTypeConverter.convert(paimonRowType, 
projectionIndex));
+
+                            ReadBuilder readBuilder =
+                                    paimonTable
+                                            .newReadBuilder()
+                                            .withProjection(projectionIndex)
+                                            .withFilter(predicate);
+
+                            this.paimonTables.put(tableKey, paimonTable);
+                            this.readBuilders.put(tableKey, readBuilder);
+                        });
     }
 
     @Override
@@ -104,7 +121,7 @@ public class PaimonSource
 
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
-        return Collections.singletonList(catalogTable);
+        return catalogTables;
     }
 
     @Override
@@ -122,8 +139,7 @@ public class PaimonSource
     @Override
     public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-        return new PaimonSourceReader(
-                readerContext, paimonTable, seaTunnelRowType, 
readBuilder.newRead());
+        return new PaimonSourceReader(readerContext, paimonTables, 
seaTunnelRowTypes, readBuilders);
     }
 
     @Override
@@ -131,10 +147,10 @@ public class PaimonSource
             SourceSplitEnumerator.Context<PaimonSourceSplit> 
enumeratorContext) throws Exception {
         if (getBoundedness() == Boundedness.BOUNDED) {
             return new PaimonBatchSourceSplitEnumerator(
-                    enumeratorContext, new LinkedList<>(), null, 
readBuilder.newScan(), 1);
+                    enumeratorContext, new LinkedList<>(), null, readBuilders, 
1);
         }
         return new PaimonStreamSourceSplitEnumerator(
-                enumeratorContext, new LinkedList<>(), null, 
readBuilder.newStreamScan(), 1);
+                enumeratorContext, new LinkedList<>(), null, readBuilders, 1);
     }
 
     @Override
@@ -147,14 +163,14 @@ public class PaimonSource
                     enumeratorContext,
                     checkpointState.getAssignedSplits(),
                     checkpointState.getCurrentSnapshotId(),
-                    readBuilder.newScan(),
+                    readBuilders,
                     1);
         }
         return new PaimonStreamSourceSplitEnumerator(
                 enumeratorContext,
                 checkpointState.getAssignedSplits(),
                 checkpointState.getCurrentSnapshotId(),
-                readBuilder.newStreamScan(),
+                readBuilders,
                 1);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
index 97063518e9..3114854a8e 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
@@ -46,16 +46,15 @@ public class PaimonSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(
-                        PaimonSourceOptions.WAREHOUSE,
-                        PaimonSourceOptions.DATABASE,
-                        PaimonSourceOptions.TABLE)
+                .required(PaimonSourceOptions.WAREHOUSE)
                 .optional(
+                        PaimonSourceOptions.DATABASE,
                         PaimonSourceOptions.CATALOG_TYPE,
                         PaimonSourceOptions.HDFS_SITE_PATH,
                         PaimonSourceOptions.QUERY_SQL,
                         PaimonSourceOptions.HADOOP_CONF,
                         PaimonSourceOptions.HADOOP_CONF_PATH)
+                .exclusive(PaimonSourceOptions.TABLE, 
PaimonSourceOptions.TABLE_LIST)
                 .conditional(
                         PaimonSourceOptions.CATALOG_TYPE,
                         PaimonCatalogEnum.HIVE,
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
index 36ad1f68f3..ac24e64d04 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
@@ -31,6 +31,7 @@ import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.TableRead;
 
 import lombok.extern.slf4j.Slf4j;
@@ -38,7 +39,9 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
@@ -48,17 +51,23 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
 
     private final Deque<PaimonSourceSplit> sourceSplits = new 
ConcurrentLinkedDeque<>();
     private final SourceReader.Context context;
-    private final Table table;
-    private final SeaTunnelRowType seaTunnelRowType;
+    private final Map<String, Table> tables;
+    private final Map<String, SeaTunnelRowType> seaTunnelRowTypes;
+    private final Map<String, TableRead> tableReads;
     private volatile boolean noMoreSplit;
-    private final TableRead tableRead;
 
     public PaimonSourceReader(
-            Context context, Table table, SeaTunnelRowType seaTunnelRowType, 
TableRead tableRead) {
+            Context context,
+            Map<String, Table> tables,
+            Map<String, SeaTunnelRowType> seaTunnelRowTypes,
+            Map<String, ReadBuilder> readBuilders) {
         this.context = context;
-        this.table = table;
-        this.seaTunnelRowType = seaTunnelRowType;
-        this.tableRead = tableRead;
+        this.tables = tables;
+        this.seaTunnelRowTypes = seaTunnelRowTypes;
+        this.tableReads = new HashMap<>();
+        for (Map.Entry<String, ReadBuilder> entry : readBuilders.entrySet()) {
+            this.tableReads.put(entry.getKey(), entry.getValue().newRead());
+        }
     }
 
     @Override
@@ -76,7 +85,10 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
         synchronized (output.getCheckpointLock()) {
             final PaimonSourceSplit split = sourceSplits.poll();
             if (Objects.nonNull(split)) {
-                // read logic
+                String tableId = split.getTableId();
+                Table table = tables.get(tableId);
+                SeaTunnelRowType seaTunnelRowType = 
seaTunnelRowTypes.get(tableId);
+                TableRead tableRead = tableReads.get(tableId);
                 try (final RecordReader<InternalRow> reader =
                                 
tableRead.executeFilter().createReader(split.getSplit());
                         final RecordReaderIterator<InternalRow> rowIterator =
@@ -94,6 +106,7 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
                                 seaTunnelRow.setRowKind(rowKind);
                             }
                         }
+                        seaTunnelRow.setTableId(tableId);
                         output.collect(seaTunnelRow);
                     }
                 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
index eba167eadd..c9e4341cbe 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
@@ -21,20 +21,26 @@ import org.apache.seatunnel.api.source.SourceSplit;
 
 import org.apache.paimon.table.source.Split;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 
 /** Paimon source split, wrapped the {@link Split} of paimon table. */
 @Getter
-@AllArgsConstructor
 public class PaimonSourceSplit implements SourceSplit {
     private static final long serialVersionUID = 1L;
 
     /** The unique ID of the split. Unique within the scope of this source. */
     private final String id;
 
+    private final String tableId;
+
     private final Split split;
 
+    public PaimonSourceSplit(String id, String tableId, Split split) {
+        this.id = id;
+        this.tableId = tableId;
+        this.split = split;
+    }
+
     @Override
     public String splitId() {
         return split.toString();
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
index 93b64c13a2..82d514cc3f 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
@@ -29,9 +29,9 @@ public class PaimonSourceSplitGenerator {
      */
     private final char[] currentId = "0000000000".toCharArray();
 
-    public List<PaimonSourceSplit> createSplits(TableScan.Plan plan) {
+    public List<PaimonSourceSplit> createSplits(String tableId, TableScan.Plan 
plan) {
         return plan.splits().stream()
-                .map(s -> new PaimonSourceSplit(getNextId(), s))
+                .map(s -> new PaimonSourceSplit(getNextId(), tableId, s))
                 .collect(Collectors.toList());
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
index 2d748ad257..e18bda0e22 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
@@ -17,15 +17,18 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
 
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
 import 
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplitGenerator;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState;
 
 import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
 
@@ -37,12 +40,13 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -62,8 +66,8 @@ public abstract class AbstractSplitEnumerator
     /** The splits that have not assigned */
     protected Deque<PaimonSourceSplit> pendingSplits;
 
-    protected final TableScan tableScan;
     protected final Object stateLock = new Object();
+    private final Map<String, TableScan> tableScans = new HashMap<>();
 
     private final int splitMaxNum;
 
@@ -75,23 +79,32 @@ public abstract class AbstractSplitEnumerator
             Context<PaimonSourceSplit> context,
             Deque<PaimonSourceSplit> pendingSplits,
             @Nullable Long nextSnapshotId,
-            TableScan tableScan,
-            int splitMaxPerTask) {
+            Map<String, ReadBuilder> readBuilders,
+            int splitMaxPerTask,
+            JobMode jobMode) {
         this.context = context;
         this.pendingSplits = new LinkedList<>(pendingSplits);
         this.nextSnapshotId = nextSnapshotId;
         this.readersAwaitingSplit = new LinkedHashSet<>();
         this.splitGenerator = new PaimonSourceSplitGenerator();
-        this.tableScan = tableScan;
         this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
         this.executorService =
                 Executors.newCachedThreadPool(
                         new ThreadFactoryBuilder()
                                 
.setNameFormat("Seatunnel-PaimonSourceSplitEnumerator-%d")
                                 .build());
-        if (tableScan instanceof StreamTableScan && nextSnapshotId != null) {
-            ((StreamTableScan) tableScan).restore(nextSnapshotId);
-        }
+
+        readBuilders.forEach(
+                (tableId, readBuilder) -> {
+                    TableScan scan =
+                            JobMode.BATCH.equals(jobMode)
+                                    ? readBuilder.newScan()
+                                    : readBuilder.newStreamScan();
+                    tableScans.put(tableId, scan);
+                    if (scan instanceof StreamTableScan && nextSnapshotId != 
null) {
+                        ((StreamTableScan) scan).restore(nextSnapshotId);
+                    }
+                });
     }
 
     @Override
@@ -197,22 +210,28 @@ public abstract class AbstractSplitEnumerator
 
     // This need to be synchronized because scan object is not thread safe. 
handleSplitRequest and
     // CompletableFuture.supplyAsync will invoke this.
-    protected synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot() 
{
+    protected synchronized List<PlanWithNextSnapshotId> scanNextSnapshot() {
+
+        List<PlanWithNextSnapshotId> snapshotIds = Lists.newArrayList();
         if (pendingSplits.size() >= splitMaxNum) {
-            return Optional.empty();
+            return snapshotIds;
         }
-        TableScan.Plan plan = tableScan.plan();
-        Long nextSnapshotId = null;
-        if (tableScan instanceof StreamTableScan) {
-            nextSnapshotId = ((StreamTableScan) tableScan).checkpoint();
-        }
-        return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
+        tableScans.forEach(
+                (tableId, tableScan) -> {
+                    TableScan.Plan plan = tableScan.plan();
+                    Long nextSnapshotId = null;
+                    if (tableScan instanceof StreamTableScan) {
+                        nextSnapshotId = ((StreamTableScan) 
tableScan).checkpoint();
+                    }
+                    snapshotIds.add(new PlanWithNextSnapshotId(tableId, plan, 
nextSnapshotId));
+                });
+        return snapshotIds;
     }
 
     // This method could not be synchronized, because it runs in 
coordinatorThread, which will make
     // it serializable execution.
     protected void processDiscoveredSplits(
-            Optional<PlanWithNextSnapshotId> planWithNextSnapshotIdOptional, 
Throwable error) {
+            List<PlanWithNextSnapshotId> planWithNextSnapshotIds, Throwable 
error) {
         if (error != null) {
             if (error instanceof EndOfScanException) {
                 log.debug("Catching EndOfStreamException, the stream is 
finished.");
@@ -223,28 +242,28 @@ public abstract class AbstractSplitEnumerator
             }
             return;
         }
-        if (!planWithNextSnapshotIdOptional.isPresent()) {
-            return;
-        }
-        PlanWithNextSnapshotId planWithNextSnapshotId = 
planWithNextSnapshotIdOptional.get();
-        nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
-        TableScan.Plan plan = planWithNextSnapshotId.plan;
 
-        if (plan.splits().isEmpty()) {
-            return;
+        for (PlanWithNextSnapshotId planWithNextSnapshotId : 
planWithNextSnapshotIds) {
+            nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
+            TableScan.Plan plan = planWithNextSnapshotId.plan;
+            if (plan.splits().isEmpty()) {
+                continue;
+            }
+            
addSplits(splitGenerator.createSplits(planWithNextSnapshotId.tableId, plan));
         }
-
-        addSplits(splitGenerator.createSplits(plan));
         assignSplits();
     }
 
     /** The result of scan. */
     @Getter
     protected static class PlanWithNextSnapshotId {
+
         private final TableScan.Plan plan;
         private final Long nextSnapshotId;
+        private final String tableId;
 
-        public PlanWithNextSnapshotId(TableScan.Plan plan, Long 
nextSnapshotId) {
+        public PlanWithNextSnapshotId(String tableId, TableScan.Plan plan, 
Long nextSnapshotId) {
+            this.tableId = tableId;
             this.plan = plan;
             this.nextSnapshotId = nextSnapshotId;
         }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
index 46ecf42756..43cf5c4e3e 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
@@ -17,16 +17,18 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
 
+import org.apache.seatunnel.common.constants.JobMode;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState;
 
-import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.ReadBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nullable;
 
 import java.util.Deque;
+import java.util.Map;
 import java.util.Set;
 
 /** Paimon source split enumerator, used to calculate the splits for every 
reader. */
@@ -37,9 +39,9 @@ public class PaimonBatchSourceSplitEnumerator extends 
AbstractSplitEnumerator {
             Context<PaimonSourceSplit> context,
             Deque<PaimonSourceSplit> pendingSplits,
             @Nullable Long nextSnapshotId,
-            TableScan tableScan,
+            Map<String, ReadBuilder> readBuilders,
             int splitMaxPerTask) {
-        super(context, pendingSplits, nextSnapshotId, tableScan, 
splitMaxPerTask);
+        super(context, pendingSplits, nextSnapshotId, readBuilders, 
splitMaxPerTask, JobMode.BATCH);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
index 2cce57be93..6852472f59 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
@@ -17,15 +17,17 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
 
+import org.apache.seatunnel.common.constants.JobMode;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
 
-import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.ReadBuilder;
 
 import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nullable;
 
 import java.util.Deque;
+import java.util.Map;
 
 /** Paimon source split enumerator, used to calculate the splits for every 
reader. */
 @Slf4j
@@ -35,9 +37,15 @@ public class PaimonStreamSourceSplitEnumerator extends 
AbstractSplitEnumerator {
             Context<PaimonSourceSplit> context,
             Deque<PaimonSourceSplit> pendingSplits,
             @Nullable Long nextSnapshotId,
-            TableScan tableScan,
+            Map<String, ReadBuilder> readBuilders,
             int splitMaxPerTask) {
-        super(context, pendingSplits, nextSnapshotId, tableScan, 
splitMaxPerTask);
+        super(
+                context,
+                pendingSplits,
+                nextSnapshotId,
+                readBuilders,
+                splitMaxPerTask,
+                JobMode.STREAMING);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfigTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfigTest.java
new file mode 100644
index 0000000000..47969d7194
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfigTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PaimonSourceTableConfigTest {
+
+    @Test
+    public void testSingleTableConfig() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("warehouse", "file:///tmp/paimon");
+        configMap.put("database", "test_db");
+        configMap.put("table", "test_table");
+        configMap.put("query", "SELECT * FROM test_table");
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        List<PaimonSourceTableConfig> tableConfigs = 
PaimonSourceTableConfig.of(config);
+
+        assertEquals(1, tableConfigs.size());
+        PaimonSourceTableConfig tableConfig = tableConfigs.get(0);
+        assertEquals("test_db", tableConfig.getDatabase());
+        assertEquals("test_table", tableConfig.getTable());
+        assertEquals("SELECT * FROM test_table", tableConfig.getQuery());
+    }
+
+    @Test
+    public void testMultiTableConfig() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("warehouse", "file:///tmp/paimon");
+
+        Map<String, Object> table1 = new HashMap<>();
+        table1.put("database", "test_db");
+        table1.put("table", "table1");
+        table1.put("query", "SELECT * FROM table1");
+
+        Map<String, Object> table2 = new HashMap<>();
+        table2.put("database", "test_db");
+        table2.put("table", "table2");
+
+        configMap.put("table_list", Lists.newArrayList(table1, table2));
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        List<PaimonSourceTableConfig> tableConfigs = 
PaimonSourceTableConfig.of(config);
+
+        assertEquals(2, tableConfigs.size());
+
+        PaimonSourceTableConfig config1 = tableConfigs.get(0);
+        assertEquals("test_db", config1.getDatabase());
+        assertEquals("table1", config1.getTable());
+        assertEquals("SELECT * FROM table1", config1.getQuery());
+
+        PaimonSourceTableConfig config2 = tableConfigs.get(1);
+        assertEquals("test_db", config2.getDatabase());
+        assertEquals("table2", config2.getTable());
+        assertEquals(null, config2.getQuery());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index 1c84df04f1..b197d53273 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -90,6 +90,18 @@ public class PaimonIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, readProjectionResult.getExitCode());
     }
 
+    @TestTemplate
+    public void testMultiTableRead(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult textWriteResult = 
container.executeJob("/fake_to_paimon.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        Container.ExecResult textWriteResult2 = 
container.executeJob("/fake_to_paimon_2.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        Container.ExecResult multiReadResult =
+                
container.executeJob("/paimon-to-assert-with-multipletable.conf");
+        Assertions.assertEquals(0, multiReadResult.getExitCode());
+    }
+
     @Override
     public void startUp() throws Exception {}
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_2.conf
new file mode 100644
index 0000000000..746a9a8c4a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_2.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    auto.increment.enabled = true
+    auto.increment.start = 1
+    row.num = 100
+    schema = {
+      fields {
+        pk_id = bigint
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+        c_time = time
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    plugin_output = "fake"
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    database = "default"
+    table = "st_test_p"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon-to-assert-with-multipletable.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon-to-assert-with-multipletable.conf
new file mode 100644
index 0000000000..a6e8c6f743
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon-to-assert-with-multipletable.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/seatunnel_mnt/paimon"
+    table_list = [
+      {
+        database = "default"
+        table = "st_test"
+      },
+      {
+        database = "default"
+        table = "st_test_p"
+      }
+    ]
+    plugin_output = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = paimon_source
+    rules {
+      table-names = ["default.st_test", "default.st_test_p"],
+      tables_configs = [
+          {
+            table_path = "default.st_test"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 100000
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 100000
+              }
+            ]
+          },
+          {
+            table_path = "default.st_test_p"
+            row_rules = [
+              {
+                rule_type = MAX_ROW
+                rule_value = 100
+              },
+              {
+                rule_type = MIN_ROW
+                rule_value = 100
+              }
+            ]
+          }
+      ]
+    }
+  }
+}
+
+

Reply via email to