This is an automated email from the ASF dual-hosted git repository.
zhangdonghao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0d52102241 [Feature][Connector-v2] Support multi paimon source (#9759)
0d52102241 is described below
commit 0d52102241afcc98294b21061c2db02b35c9d2e1
Author: xiaochen <[email protected]>
AuthorDate: Wed Aug 27 14:18:41 2025 +0800
[Feature][Connector-v2] Support multi paimon source (#9759)
---
docs/en/connector-v2/source/Paimon.md | 52 +++++++++---
docs/zh/connector-v2/source/Paimon.md | 52 +++++++++---
.../seatunnel/paimon/config/PaimonConfig.java | 9 +-
.../paimon/config/PaimonSourceConfig.java | 5 ++
.../paimon/config/PaimonSourceOptions.java | 11 +++
.../paimon/config/PaimonSourceTableConfig.java | 66 +++++++++++++++
.../seatunnel/paimon/source/PaimonSource.java | 98 +++++++++++++---------
.../paimon/source/PaimonSourceFactory.java | 7 +-
.../paimon/source/PaimonSourceReader.java | 29 +++++--
.../seatunnel/paimon/source/PaimonSourceSplit.java | 10 ++-
.../paimon/source/PaimonSourceSplitGenerator.java | 4 +-
.../source/enumerator/AbstractSplitEnumerator.java | 75 ++++++++++-------
.../PaimonBatchSourceSplitEnumerator.java | 8 +-
.../PaimonStreamSourceSplitEnumerator.java | 14 +++-
.../paimon/config/PaimonSourceTableConfigTest.java | 83 ++++++++++++++++++
.../seatunnel/e2e/connector/paimon/PaimonIT.java | 12 +++
.../src/test/resources/fake_to_paimon_2.conf | 62 ++++++++++++++
.../paimon-to-assert-with-multipletable.conf | 77 +++++++++++++++++
18 files changed, 550 insertions(+), 124 deletions(-)
diff --git a/docs/en/connector-v2/source/Paimon.md
b/docs/en/connector-v2/source/Paimon.md
index 4bf2a2e180..9111598ab8 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -46,19 +46,20 @@ Read data from Apache Paimon.
## Options
-| name | type | required | default value |
-|-------------------------|--------|----------|---------------|
-| warehouse | String | Yes | - |
-| catalog_type | String | No | filesystem |
-| catalog_uri | String | No | - |
-| database | String | Yes | - |
-| table | String | Yes | - |
-| user | String | No | - |
-| password | String | No | - |
-| hdfs_site_path | String | No | - |
-| query | String | No | - |
-| paimon.hadoop.conf | Map | No | - |
-| paimon.hadoop.conf-path | String | No | - |
+| name | type | required | default value |
+|-------------------------|----------|----------------|---------------|
+| warehouse | String | Yes | - |
+| catalog_type | String | No | filesystem |
+| catalog_uri | String | No | - |
+| database | String | Yes | - |
+| table | String | no | - |
+| table_list | array | no | - |
+| user | String | No | - |
+| password | String | No | - |
+| hdfs_site_path | String | No | - |
+| query | String | No | - |
+| paimon.hadoop.conf | Map | No | - |
+| paimon.hadoop.conf-path | String | No | - |
### warehouse [string]
@@ -80,6 +81,10 @@ The database you want to access
The table you want to access
+### table_list [array]
+
+The list of tables to be read, you can use this configuration instead of
`table`
+
### hdfs_site_path [string]
The file path of `hdfs-site.xml`
@@ -134,6 +139,27 @@ source {
}
```
+### Multiple tables
+
+```hocon
+source {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "default"
+ table_list = [
+ {
+ table = "table1"
+ query = "select * from table1 where id > 100"
+ },
+ {
+ table = "table2"
+ query = "select * from table2 where id > 100"
+ }
+ ]
+ }
+}
+```
+
### Filter example
```hocon
diff --git a/docs/zh/connector-v2/source/Paimon.md
b/docs/zh/connector-v2/source/Paimon.md
index 1cb7517c76..ccd61d98bf 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -46,19 +46,20 @@ import ChangeLog from '../changelog/connector-paimon.md';
## 配置选项
-| 名称 | 类型 | 是否必须 | 默认值 |
-|-------------------------|--------|----------|---------------|
-| warehouse | String | 是 | - |
-| catalog_type | String | 否 | filesystem |
-| catalog_uri | String | 否 | - |
-| database | String | 是 | - |
-| table | String | 是 | - |
-| user | String | 否 | - |
-| password | String | 否 | - |
-| hdfs_site_path | String | 否 | - |
-| query | String | 否 | - |
-| paimon.hadoop.conf | Map | 否 | - |
-| paimon.hadoop.conf-path | String | 否 | - |
+| 名称 | 类型 | 是否必须 | 默认值 |
+|-------------------------|----------|--------|---------------|
+| warehouse | String | 是 | - |
+| catalog_type | String | 否 | filesystem |
+| catalog_uri | String | 否 | - |
+| database | String | 是 | - |
+| table | String | 否 | - |
+| table_list | array | 否 | - |
+| user | String | 否 | - |
+| password | String | 否 | - |
+| hdfs_site_path | String | 否 | - |
+| query | String | 否 | - |
+| paimon.hadoop.conf | Map | 否 | - |
+| paimon.hadoop.conf-path | String | 否 | - |
### warehouse [string]
@@ -80,6 +81,10 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
需要访问的表
+### table_list [array]
+
+`Paimon` 表名列表,当需要同时读取多表时使用此配置代替 table
+
### hdfs_site_path [string]
`hdfs-site.xml` 文件地址
@@ -137,6 +142,27 @@ source {
}
```
+### 读取多表
+
+```hocon
+source {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "default"
+ table_list = [
+ {
+ table = "table1"
+ query = "select * from table1 where id > 100"
+ },
+ {
+ table = "table2"
+ query = "select * from table2 where id > 100"
+ }
+ ]
+ }
+}
+```
+
### Filter 示例
```hocon
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
index d47029207a..39c713e45b 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java
@@ -65,13 +65,8 @@ public class PaimonConfig implements Serializable {
checkArgumentNotBlank(
readonlyConfig.get(PaimonBaseOptions.WAREHOUSE),
PaimonBaseOptions.WAREHOUSE.key());
- this.namespace =
- checkArgumentNotBlank(
- readonlyConfig.get(PaimonBaseOptions.DATABASE),
- PaimonBaseOptions.DATABASE.key());
- this.table =
- checkArgumentNotBlank(
- readonlyConfig.get(PaimonBaseOptions.TABLE),
PaimonBaseOptions.TABLE.key());
+ this.namespace = readonlyConfig.get(PaimonBaseOptions.DATABASE);
+ this.table = readonlyConfig.get(PaimonBaseOptions.TABLE);
this.hdfsSitePath =
readonlyConfig.get(PaimonBaseOptions.HDFS_SITE_PATH);
this.hadoopConfProps =
readonlyConfig.get(PaimonBaseOptions.HADOOP_CONF);
this.hadoopConfPath =
readonlyConfig.get(PaimonBaseOptions.HADOOP_CONF_PATH);
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
index 2d1bc77851..1ca24d6aca 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
@@ -21,13 +21,18 @@ import
org.apache.seatunnel.api.configuration.ReadonlyConfig;
import lombok.Getter;
+import java.util.ArrayList;
+import java.util.List;
+
@Getter
public class PaimonSourceConfig extends PaimonConfig {
private String query;
+ private List<PaimonSourceTableConfig> tableConfigList = new ArrayList<>();
public PaimonSourceConfig(ReadonlyConfig readonlyConfig) {
super(readonlyConfig);
this.query = readonlyConfig.get(PaimonSourceOptions.QUERY_SQL);
+ this.tableConfigList = PaimonSourceTableConfig.of(readonlyConfig);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceOptions.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceOptions.java
index 466302f623..baad34fa85 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceOptions.java
@@ -17,9 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.config;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import java.util.List;
+import java.util.Map;
+
public class PaimonSourceOptions extends PaimonBaseOptions {
public static final Option<String> QUERY_SQL =
@@ -27,4 +32,10 @@ public class PaimonSourceOptions extends PaimonBaseOptions {
.stringType()
.noDefaultValue()
.withDescription("The query of paimon source");
+
+ public static final Option<List<Map<String, Object>>> TABLE_LIST =
+ Options.key("table_list")
+ .type(new TypeReference<List<Map<String, Object>>>() {})
+ .noDefaultValue()
+ .withDescription("table list config");
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfig.java
new file mode 100644
index 0000000000..ecebcb5a91
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.config;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Getter
+public class PaimonSourceTableConfig implements Serializable {
+
+ private final String database;
+ private final String table;
+ private final String query;
+
+ private PaimonSourceTableConfig(String database, String table, String
query) {
+ this.database = database;
+ this.table = table;
+ this.query = query;
+ }
+
+ public static PaimonSourceTableConfig
parsePaimonSourceConfig(ReadonlyConfig config) {
+ String database = config.get(PaimonBaseOptions.DATABASE);
+ String table = config.get(PaimonBaseOptions.TABLE);
+ String query =
config.getOptional(PaimonSourceOptions.QUERY_SQL).orElse(null);
+ return new PaimonSourceTableConfig(database, table, query);
+ }
+
+ public static List<PaimonSourceTableConfig> of(ReadonlyConfig config) {
+ if (config.getOptional(PaimonSourceOptions.TABLE_LIST).isPresent()) {
+ List<Map<String, Object>> maps =
config.get(PaimonSourceOptions.TABLE_LIST);
+ return maps.stream()
+ .map(ReadonlyConfig::fromMap)
+ .map(PaimonSourceTableConfig::parsePaimonSourceConfig)
+ .collect(Collectors.toList());
+ }
+ return Lists.newArrayList(parsePaimonSourceConfig(config));
+ }
+
+ public TablePath getTablePath() {
+ return TablePath.of(database, table);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
index ef817227e1..2d970403ce 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.source;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
@@ -42,9 +45,9 @@ import org.apache.paimon.types.RowType;
import net.sf.jsqlparser.statement.select.PlainSelect;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import static
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter.convertSqlSelectToPaimonProjectionIndex;
@@ -58,43 +61,57 @@ public class PaimonSource
public static final String PLUGIN_NAME = "Paimon";
- private SeaTunnelRowType seaTunnelRowType;
-
- private Table paimonTable;
-
private JobContext jobContext;
- private CatalogTable catalogTable;
-
- protected final ReadBuilder readBuilder;
+ private List<CatalogTable> catalogTables = Lists.newArrayList();
+ private Map<String, Table> paimonTables = Maps.newHashMap();
+ private Map<String, SeaTunnelRowType> seaTunnelRowTypes =
Maps.newHashMap();
+ private Map<String, ReadBuilder> readBuilders = Maps.newHashMap();
public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog
paimonCatalog) {
- PaimonSourceConfig paimonSourceConfig = new
PaimonSourceConfig(readonlyConfig);
- TablePath tablePath =
- TablePath.of(paimonSourceConfig.getNamespace(),
paimonSourceConfig.getTable());
- this.catalogTable = paimonCatalog.getTable(tablePath);
- this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
-
- String filterSql = paimonSourceConfig.getQuery();
- PlainSelect plainSelect = convertToPlainSelect(filterSql);
- RowType paimonRowType = this.paimonTable.rowType();
- String[] filedNames = paimonRowType.getFieldNames().toArray(new
String[0]);
-
- Predicate predicate = null;
- int[] projectionIndex = null;
- if (!Objects.isNull(plainSelect)) {
- projectionIndex =
convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect);
- if (!Objects.isNull(projectionIndex)) {
- this.catalogTable =
- paimonCatalog.getTableWithProjection(tablePath,
projectionIndex);
- }
- predicate =
-
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
- paimonRowType, plainSelect);
- }
- this.seaTunnelRowType = RowTypeConverter.convert(paimonRowType,
projectionIndex);
- this.readBuilder =
-
paimonTable.newReadBuilder().withProjection(projectionIndex).withFilter(predicate);
+ new PaimonSourceConfig(readonlyConfig)
+ .getTableConfigList()
+ .forEach(
+ tableConfig -> {
+ TablePath tablePath = tableConfig.getTablePath();
+ CatalogTable catalogTable =
paimonCatalog.getTable(tablePath);
+ Table paimonTable =
paimonCatalog.getPaimonTable(tablePath);
+ RowType paimonRowType = paimonTable.rowType();
+ String[] filedNames =
+ paimonRowType.getFieldNames().toArray(new
String[0]);
+
+ PlainSelect plainSelect =
convertToPlainSelect(tableConfig.getQuery());
+ Predicate predicate = null;
+ int[] projectionIndex = null;
+ if (!Objects.isNull(plainSelect)) {
+ projectionIndex =
+
convertSqlSelectToPaimonProjectionIndex(
+ filedNames, plainSelect);
+ if (!Objects.isNull(projectionIndex)) {
+ catalogTable =
+
paimonCatalog.getTableWithProjection(
+ tablePath,
projectionIndex);
+ }
+ predicate =
+ SqlToPaimonPredicateConverter
+
.convertSqlWhereToPaimonPredicate(
+ paimonRowType,
plainSelect);
+ }
+ this.catalogTables.add(catalogTable);
+ String tableKey = tablePath.toString();
+ this.seaTunnelRowTypes.put(
+ tableKey,
+ RowTypeConverter.convert(paimonRowType,
projectionIndex));
+
+ ReadBuilder readBuilder =
+ paimonTable
+ .newReadBuilder()
+ .withProjection(projectionIndex)
+ .withFilter(predicate);
+
+ this.paimonTables.put(tableKey, paimonTable);
+ this.readBuilders.put(tableKey, readBuilder);
+ });
}
@Override
@@ -104,7 +121,7 @@ public class PaimonSource
@Override
public List<CatalogTable> getProducedCatalogTables() {
- return Collections.singletonList(catalogTable);
+ return catalogTables;
}
@Override
@@ -122,8 +139,7 @@ public class PaimonSource
@Override
public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
- return new PaimonSourceReader(
- readerContext, paimonTable, seaTunnelRowType,
readBuilder.newRead());
+ return new PaimonSourceReader(readerContext, paimonTables,
seaTunnelRowTypes, readBuilders);
}
@Override
@@ -131,10 +147,10 @@ public class PaimonSource
SourceSplitEnumerator.Context<PaimonSourceSplit>
enumeratorContext) throws Exception {
if (getBoundedness() == Boundedness.BOUNDED) {
return new PaimonBatchSourceSplitEnumerator(
- enumeratorContext, new LinkedList<>(), null,
readBuilder.newScan(), 1);
+ enumeratorContext, new LinkedList<>(), null, readBuilders,
1);
}
return new PaimonStreamSourceSplitEnumerator(
- enumeratorContext, new LinkedList<>(), null,
readBuilder.newStreamScan(), 1);
+ enumeratorContext, new LinkedList<>(), null, readBuilders, 1);
}
@Override
@@ -147,14 +163,14 @@ public class PaimonSource
enumeratorContext,
checkpointState.getAssignedSplits(),
checkpointState.getCurrentSnapshotId(),
- readBuilder.newScan(),
+ readBuilders,
1);
}
return new PaimonStreamSourceSplitEnumerator(
enumeratorContext,
checkpointState.getAssignedSplits(),
checkpointState.getCurrentSnapshotId(),
- readBuilder.newStreamScan(),
+ readBuilders,
1);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
index 97063518e9..3114854a8e 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
@@ -46,16 +46,15 @@ public class PaimonSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(
- PaimonSourceOptions.WAREHOUSE,
- PaimonSourceOptions.DATABASE,
- PaimonSourceOptions.TABLE)
+ .required(PaimonSourceOptions.WAREHOUSE)
.optional(
+ PaimonSourceOptions.DATABASE,
PaimonSourceOptions.CATALOG_TYPE,
PaimonSourceOptions.HDFS_SITE_PATH,
PaimonSourceOptions.QUERY_SQL,
PaimonSourceOptions.HADOOP_CONF,
PaimonSourceOptions.HADOOP_CONF_PATH)
+ .exclusive(PaimonSourceOptions.TABLE,
PaimonSourceOptions.TABLE_LIST)
.conditional(
PaimonSourceOptions.CATALOG_TYPE,
PaimonCatalogEnum.HIVE,
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
index 36ad1f68f3..ac24e64d04 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
@@ -31,6 +31,7 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
import lombok.extern.slf4j.Slf4j;
@@ -38,7 +39,9 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
@@ -48,17 +51,23 @@ public class PaimonSourceReader implements
SourceReader<SeaTunnelRow, PaimonSour
private final Deque<PaimonSourceSplit> sourceSplits = new
ConcurrentLinkedDeque<>();
private final SourceReader.Context context;
- private final Table table;
- private final SeaTunnelRowType seaTunnelRowType;
+ private final Map<String, Table> tables;
+ private final Map<String, SeaTunnelRowType> seaTunnelRowTypes;
+ private final Map<String, TableRead> tableReads;
private volatile boolean noMoreSplit;
- private final TableRead tableRead;
public PaimonSourceReader(
- Context context, Table table, SeaTunnelRowType seaTunnelRowType,
TableRead tableRead) {
+ Context context,
+ Map<String, Table> tables,
+ Map<String, SeaTunnelRowType> seaTunnelRowTypes,
+ Map<String, ReadBuilder> readBuilders) {
this.context = context;
- this.table = table;
- this.seaTunnelRowType = seaTunnelRowType;
- this.tableRead = tableRead;
+ this.tables = tables;
+ this.seaTunnelRowTypes = seaTunnelRowTypes;
+ this.tableReads = new HashMap<>();
+ for (Map.Entry<String, ReadBuilder> entry : readBuilders.entrySet()) {
+ this.tableReads.put(entry.getKey(), entry.getValue().newRead());
+ }
}
@Override
@@ -76,7 +85,10 @@ public class PaimonSourceReader implements
SourceReader<SeaTunnelRow, PaimonSour
synchronized (output.getCheckpointLock()) {
final PaimonSourceSplit split = sourceSplits.poll();
if (Objects.nonNull(split)) {
- // read logic
+ String tableId = split.getTableId();
+ Table table = tables.get(tableId);
+ SeaTunnelRowType seaTunnelRowType =
seaTunnelRowTypes.get(tableId);
+ TableRead tableRead = tableReads.get(tableId);
try (final RecordReader<InternalRow> reader =
tableRead.executeFilter().createReader(split.getSplit());
final RecordReaderIterator<InternalRow> rowIterator =
@@ -94,6 +106,7 @@ public class PaimonSourceReader implements
SourceReader<SeaTunnelRow, PaimonSour
seaTunnelRow.setRowKind(rowKind);
}
}
+ seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
index eba167eadd..c9e4341cbe 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java
@@ -21,20 +21,26 @@ import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.paimon.table.source.Split;
-import lombok.AllArgsConstructor;
import lombok.Getter;
/** Paimon source split, wrapped the {@link Split} of paimon table. */
@Getter
-@AllArgsConstructor
public class PaimonSourceSplit implements SourceSplit {
private static final long serialVersionUID = 1L;
/** The unique ID of the split. Unique within the scope of this source. */
private final String id;
+ private final String tableId;
+
private final Split split;
+ public PaimonSourceSplit(String id, String tableId, Split split) {
+ this.id = id;
+ this.tableId = tableId;
+ this.split = split;
+ }
+
@Override
public String splitId() {
return split.toString();
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
index 93b64c13a2..82d514cc3f 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java
@@ -29,9 +29,9 @@ public class PaimonSourceSplitGenerator {
*/
private final char[] currentId = "0000000000".toCharArray();
- public List<PaimonSourceSplit> createSplits(TableScan.Plan plan) {
+ public List<PaimonSourceSplit> createSplits(String tableId, TableScan.Plan
plan) {
return plan.splits().stream()
- .map(s -> new PaimonSourceSplit(getNextId(), s))
+ .map(s -> new PaimonSourceSplit(getNextId(), tableId, s))
.collect(Collectors.toList());
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
index 2d748ad257..e18bda0e22 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
@@ -17,15 +17,18 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplitGenerator;
import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState;
import org.apache.paimon.table.source.EndOfScanException;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
@@ -37,12 +40,13 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
import java.util.Deque;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -62,8 +66,8 @@ public abstract class AbstractSplitEnumerator
/** The splits that have not assigned */
protected Deque<PaimonSourceSplit> pendingSplits;
- protected final TableScan tableScan;
protected final Object stateLock = new Object();
+ private final Map<String, TableScan> tableScans = new HashMap<>();
private final int splitMaxNum;
@@ -75,23 +79,32 @@ public abstract class AbstractSplitEnumerator
Context<PaimonSourceSplit> context,
Deque<PaimonSourceSplit> pendingSplits,
@Nullable Long nextSnapshotId,
- TableScan tableScan,
- int splitMaxPerTask) {
+ Map<String, ReadBuilder> readBuilders,
+ int splitMaxPerTask,
+ JobMode jobMode) {
this.context = context;
this.pendingSplits = new LinkedList<>(pendingSplits);
this.nextSnapshotId = nextSnapshotId;
this.readersAwaitingSplit = new LinkedHashSet<>();
this.splitGenerator = new PaimonSourceSplitGenerator();
- this.tableScan = tableScan;
this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
this.executorService =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("Seatunnel-PaimonSourceSplitEnumerator-%d")
.build());
- if (tableScan instanceof StreamTableScan && nextSnapshotId != null) {
- ((StreamTableScan) tableScan).restore(nextSnapshotId);
- }
+
+ readBuilders.forEach(
+ (tableId, readBuilder) -> {
+ TableScan scan =
+ JobMode.BATCH.equals(jobMode)
+ ? readBuilder.newScan()
+ : readBuilder.newStreamScan();
+ tableScans.put(tableId, scan);
+ if (scan instanceof StreamTableScan && nextSnapshotId !=
null) {
+ ((StreamTableScan) scan).restore(nextSnapshotId);
+ }
+ });
}
@Override
@@ -197,22 +210,28 @@ public abstract class AbstractSplitEnumerator
// This need to be synchronized because scan object is not thread safe.
handleSplitRequest and
// CompletableFuture.supplyAsync will invoke this.
- protected synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot()
{
+ protected synchronized List<PlanWithNextSnapshotId> scanNextSnapshot() {
+
+ List<PlanWithNextSnapshotId> snapshotIds = Lists.newArrayList();
if (pendingSplits.size() >= splitMaxNum) {
- return Optional.empty();
+ return snapshotIds;
}
- TableScan.Plan plan = tableScan.plan();
- Long nextSnapshotId = null;
- if (tableScan instanceof StreamTableScan) {
- nextSnapshotId = ((StreamTableScan) tableScan).checkpoint();
- }
- return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
+ tableScans.forEach(
+ (tableId, tableScan) -> {
+ TableScan.Plan plan = tableScan.plan();
+ Long nextSnapshotId = null;
+ if (tableScan instanceof StreamTableScan) {
+ nextSnapshotId = ((StreamTableScan)
tableScan).checkpoint();
+ }
+ snapshotIds.add(new PlanWithNextSnapshotId(tableId, plan,
nextSnapshotId));
+ });
+ return snapshotIds;
}
// This method could not be synchronized, because it runs in
coordinatorThread, which will make
// it serializable execution.
protected void processDiscoveredSplits(
- Optional<PlanWithNextSnapshotId> planWithNextSnapshotIdOptional,
Throwable error) {
+ List<PlanWithNextSnapshotId> planWithNextSnapshotIds, Throwable
error) {
if (error != null) {
if (error instanceof EndOfScanException) {
log.debug("Catching EndOfStreamException, the stream is
finished.");
@@ -223,28 +242,28 @@ public abstract class AbstractSplitEnumerator
}
return;
}
- if (!planWithNextSnapshotIdOptional.isPresent()) {
- return;
- }
- PlanWithNextSnapshotId planWithNextSnapshotId =
planWithNextSnapshotIdOptional.get();
- nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
- TableScan.Plan plan = planWithNextSnapshotId.plan;
- if (plan.splits().isEmpty()) {
- return;
+ for (PlanWithNextSnapshotId planWithNextSnapshotId :
planWithNextSnapshotIds) {
+ nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
+ TableScan.Plan plan = planWithNextSnapshotId.plan;
+ if (plan.splits().isEmpty()) {
+ continue;
+ }
+
addSplits(splitGenerator.createSplits(planWithNextSnapshotId.tableId, plan));
}
-
- addSplits(splitGenerator.createSplits(plan));
assignSplits();
}
/** The result of scan. */
@Getter
protected static class PlanWithNextSnapshotId {
+
private final TableScan.Plan plan;
private final Long nextSnapshotId;
+ private final String tableId;
- public PlanWithNextSnapshotId(TableScan.Plan plan, Long
nextSnapshotId) {
+ public PlanWithNextSnapshotId(String tableId, TableScan.Plan plan,
Long nextSnapshotId) {
+ this.tableId = tableId;
this.plan = plan;
this.nextSnapshotId = nextSnapshotId;
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
index 46ecf42756..43cf5c4e3e 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
@@ -17,16 +17,18 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
+import org.apache.seatunnel.common.constants.JobMode;
import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState;
-import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.ReadBuilder;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.util.Deque;
+import java.util.Map;
import java.util.Set;
/** Paimon source split enumerator, used to calculate the splits for every
reader. */
@@ -37,9 +39,9 @@ public class PaimonBatchSourceSplitEnumerator extends
AbstractSplitEnumerator {
Context<PaimonSourceSplit> context,
Deque<PaimonSourceSplit> pendingSplits,
@Nullable Long nextSnapshotId,
- TableScan tableScan,
+ Map<String, ReadBuilder> readBuilders,
int splitMaxPerTask) {
- super(context, pendingSplits, nextSnapshotId, tableScan,
splitMaxPerTask);
+ super(context, pendingSplits, nextSnapshotId, readBuilders,
splitMaxPerTask, JobMode.BATCH);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
index 2cce57be93..6852472f59 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java
@@ -17,15 +17,17 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator;
+import org.apache.seatunnel.common.constants.JobMode;
import
org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit;
-import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.ReadBuilder;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.util.Deque;
+import java.util.Map;
/** Paimon source split enumerator, used to calculate the splits for every
reader. */
@Slf4j
@@ -35,9 +37,15 @@ public class PaimonStreamSourceSplitEnumerator extends
AbstractSplitEnumerator {
Context<PaimonSourceSplit> context,
Deque<PaimonSourceSplit> pendingSplits,
@Nullable Long nextSnapshotId,
- TableScan tableScan,
+ Map<String, ReadBuilder> readBuilders,
int splitMaxPerTask) {
- super(context, pendingSplits, nextSnapshotId, tableScan,
splitMaxPerTask);
+ super(
+ context,
+ pendingSplits,
+ nextSnapshotId,
+ readBuilders,
+ splitMaxPerTask,
+ JobMode.STREAMING);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfigTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfigTest.java
new file mode 100644
index 0000000000..47969d7194
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceTableConfigTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PaimonSourceTableConfigTest {
+
+ @Test
+ public void testSingleTableConfig() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("warehouse", "file:///tmp/paimon");
+ configMap.put("database", "test_db");
+ configMap.put("table", "test_table");
+ configMap.put("query", "SELECT * FROM test_table");
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ List<PaimonSourceTableConfig> tableConfigs =
PaimonSourceTableConfig.of(config);
+
+ assertEquals(1, tableConfigs.size());
+ PaimonSourceTableConfig tableConfig = tableConfigs.get(0);
+ assertEquals("test_db", tableConfig.getDatabase());
+ assertEquals("test_table", tableConfig.getTable());
+ assertEquals("SELECT * FROM test_table", tableConfig.getQuery());
+ }
+
+ @Test
+ public void testMultiTableConfig() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("warehouse", "file:///tmp/paimon");
+
+ Map<String, Object> table1 = new HashMap<>();
+ table1.put("database", "test_db");
+ table1.put("table", "table1");
+ table1.put("query", "SELECT * FROM table1");
+
+ Map<String, Object> table2 = new HashMap<>();
+ table2.put("database", "test_db");
+ table2.put("table", "table2");
+
+ configMap.put("table_list", Lists.newArrayList(table1, table2));
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ List<PaimonSourceTableConfig> tableConfigs =
PaimonSourceTableConfig.of(config);
+
+ assertEquals(2, tableConfigs.size());
+
+ PaimonSourceTableConfig config1 = tableConfigs.get(0);
+ assertEquals("test_db", config1.getDatabase());
+ assertEquals("table1", config1.getTable());
+ assertEquals("SELECT * FROM table1", config1.getQuery());
+
+ PaimonSourceTableConfig config2 = tableConfigs.get(1);
+ assertEquals("test_db", config2.getDatabase());
+ assertEquals("table2", config2.getTable());
+ assertEquals(null, config2.getQuery());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index 1c84df04f1..b197d53273 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -90,6 +90,18 @@ public class PaimonIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, readProjectionResult.getExitCode());
}
+ @TestTemplate
+ public void testMultiTableRead(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult textWriteResult =
container.executeJob("/fake_to_paimon.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ Container.ExecResult textWriteResult2 =
container.executeJob("/fake_to_paimon_2.conf");
+ Assertions.assertEquals(0, textWriteResult.getExitCode());
+ Container.ExecResult multiReadResult =
+
container.executeJob("/paimon-to-assert-with-multipletable.conf");
+ Assertions.assertEquals(0, multiReadResult.getExitCode());
+ }
+
@Override
public void startUp() throws Exception {}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_2.conf
new file mode 100644
index 0000000000..746a9a8c4a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_2.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ auto.increment.enabled = true
+ auto.increment.start = 1
+ row.num = 100
+ schema = {
+ fields {
+ pk_id = bigint
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_time = time
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ plugin_output = "fake"
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "/tmp/seatunnel_mnt/paimon"
+ database = "default"
+ table = "st_test_p"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon-to-assert-with-multipletable.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon-to-assert-with-multipletable.conf
new file mode 100644
index 0000000000..a6e8c6f743
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon-to-assert-with-multipletable.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Paimon {
+ warehouse = "/tmp/seatunnel_mnt/paimon"
+ table_list = [
+ {
+ database = "default"
+ table = "st_test"
+ },
+ {
+ database = "default"
+ table = "st_test_p"
+ }
+ ]
+ plugin_output = paimon_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = paimon_source
+ rules {
+ table-names = ["default.st_test", "default.st_test_p"],
+ tables_configs = [
+ {
+ table_path = "default.st_test"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 100000
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 100000
+ }
+ ]
+ },
+ {
+ table_path = "default.st_test_p"
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 100
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
+
+