This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 658643ae53 [Improve][Connector-V2] Improve the paimon source (#6887)
658643ae53 is described below

commit 658643ae532d4ef1db5b4e591f7bb0b8b342f401
Author: dailai <dai...@chinatelecom.cn>
AuthorDate: Tue Jun 11 18:07:51 2024 +0800

    [Improve][Connector-V2] Improve the paimon source (#6887)
---
 docs/en/connector-v2/source/Paimon.md              | 116 ++++++++-
 seatunnel-connectors-v2/connector-paimon/pom.xml   |   5 +
 .../paimon/config/PaimonSourceConfig.java          |  41 ++++
 .../seatunnel/paimon/source/PaimonSource.java      | 114 +++------
 .../paimon/source/PaimonSourceFactory.java         |  38 ++-
 .../paimon/source/PaimonSourceReader.java          |  16 +-
 .../paimon/source/PaimonSourceSplitEnumerator.java |  16 +-
 .../converter/SqlToPaimonPredicateConverter.java   | 265 +++++++++++++++++++++
 .../seatunnel/paimon/utils/RowConverter.java       |  18 +-
 .../SqlToPaimonPredicateConverterTest.java         | 183 ++++++++++++++
 .../seatunnel/paimon/utils/RowConverterTest.java   |   2 +-
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      |  27 ++-
 .../e2e/connector/paimon/PaimonSinkHdfsIT.java     |  11 +-
 .../resources/fake_to_paimon_with_full_type.conf   |  91 +++++++
 .../resources/paimon_to_assert_with_filter1.conf   |  60 +++++
 .../resources/paimon_to_assert_with_filter2.conf   |  61 +++++
 .../resources/paimon_to_assert_with_filter3.conf   |  81 +++++++
 .../resources/paimon_to_assert_with_filter4.conf   |  71 ++++++
 .../paimon_to_assert_with_hivecatalog.conf         |  59 +++++
 .../paimon_to_assert_with_timestampN.conf          |  90 +++++++
 .../read_from_paimon_with_hdfs_ha_to_assert.conf   |  90 +++++++
 21 files changed, 1355 insertions(+), 100 deletions(-)

diff --git a/docs/en/connector-v2/source/Paimon.md 
b/docs/en/connector-v2/source/Paimon.md
index eb83e3bb42..e50ee0df9e 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -17,17 +17,30 @@ Read data from Apache Paimon.
 
 ## Options
 
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| warehouse      | String | Yes      | -             |
-| database       | String | Yes      | -             |
-| table          | String | Yes      | -             |
-| hdfs_site_path | String | No       | -             |
+|          name           |  type  | required | default value |
+|-------------------------|--------|----------|---------------|
+| warehouse               | String | Yes      | -             |
+| catalog_type            | String | No       | filesystem    |
+| catalog_uri             | String | No       | -             |
+| database                | String | Yes      | -             |
+| table                   | String | Yes      | -             |
+| hdfs_site_path          | String | No       | -             |
+| query                   | String | No       | -             |
+| paimon.hadoop.conf      | Map    | No       | -             |
+| paimon.hadoop.conf-path | String | No       | -             |
 
 ### warehouse [string]
 
 Paimon warehouse path
 
+### catalog_type [string]
+
+Catalog type of Paimon, support filesystem and hive
+
+### catalog_uri [string]
+
+Catalog uri of Paimon, only needed when catalog_type is hive
+
 ### database [string]
 
 The database you want to access
@@ -40,8 +53,39 @@ The table you want to access
 
 The file path of `hdfs-site.xml`
 
+### query [string]
+
+The filter condition of the table read. For example: `select * from st_test 
where id > 100`. If not specified, all rows are read.
+Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, 
is not null, and others are not supported.
+The Having, Group By, Order By clauses are currently unsupported, because 
these clauses are not supported by Paimon.
+The projection and limit will be supported in the future.
+
+Note: When the field after the where condition is a string or boolean value, 
its value must be enclosed in single quotes, otherwise an error will be 
reported. `For example: name='abc' or tag='true'`
+The field data types currently supported by where conditions are as follows:
+
+* string
+* boolean
+* tinyint
+* smallint
+* int
+* bigint
+* float
+* double
+* date
+* timestamp
+
+### paimon.hadoop.conf [string]
+
+Properties in hadoop conf
+
+### paimon.hadoop.conf-path [string]
+
+The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files
+
 ## Examples
 
+### Simple example
+
 ```hocon
 source {
  Paimon {
@@ -52,6 +96,66 @@ source {
 }
 ```
 
+### Filter example
+
+```hocon
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+    query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116 
and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'"
+  }
+}
+```
+
+### Hadoop conf example
+
+```hocon
+source {
+  Paimon {
+    catalog_name="seatunnel_test"
+    warehouse="hdfs:///tmp/paimon"
+    database="seatunnel_namespace1"
+    table="st_test"
+    query = "select * from st_test where pk_id is not null and pk_id < 3"
+    paimon.hadoop.conf = {
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+    }
+  }
+}
+```
+
+### Hive catalog example
+
+```hocon
+source {
+  Paimon {
+    catalog_name="seatunnel_test"
+    catalog_type="hive"
+    catalog_uri="thrift://hadoop04:9083"
+    warehouse="hdfs:///tmp/seatunnel"
+    database="seatunnel_test"
+    table="st_test3"
+    paimon.hadoop.conf = {
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+    }
+  }
+}
+```
+
 ## Changelog
 
 ### next version
diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml 
b/seatunnel-connectors-v2/connector-paimon/pom.xml
index 267e66dc0a..80934e68a2 100644
--- a/seatunnel-connectors-v2/connector-paimon/pom.xml
+++ b/seatunnel-connectors-v2/connector-paimon/pom.xml
@@ -90,6 +90,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>com.github.jsqlparser</groupId>
+            <artifactId>jsqlparser</artifactId>
+            <version>${jsqlparser.version}</version>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
new file mode 100644
index 0000000000..b29542ba45
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSourceConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Getter;
+
+@Getter
+public class PaimonSourceConfig extends PaimonConfig {
+
+    public static final Option<String> QUERY_SQL =
+            Options.key("query")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The query of paimon source");
+
+    private String query;
+
+    public PaimonSourceConfig(ReadonlyConfig readonlyConfig) {
+        super(readonlyConfig);
+        this.query = readonlyConfig.get(QUERY_SQL);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
index 658e1e45e5..358026b5d3 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java
@@ -17,45 +17,26 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.Table;
-
-import com.google.auto.service.AutoService;
+import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter;
 
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.Table;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE;
-import static 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.HDFS_SITE_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
+import java.util.Collections;
+import java.util.List;
 
 /** Paimon connector source class. */
-@AutoService(SeaTunnelSource.class)
 public class PaimonSource
         implements SeaTunnelSource<SeaTunnelRow, PaimonSourceSplit, 
PaimonSourceState> {
 
@@ -63,77 +44,57 @@ public class PaimonSource
 
     public static final String PLUGIN_NAME = "Paimon";
 
-    private Config pluginConfig;
+    private ReadonlyConfig readonlyConfig;
 
     private SeaTunnelRowType seaTunnelRowType;
 
-    private Table table;
+    private Table paimonTable;
+
+    private Predicate predicate;
+
+    private CatalogTable catalogTable;
+
+    public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog 
paimonCatalog) {
+        this.readonlyConfig = readonlyConfig;
+        PaimonSourceConfig paimonSourceConfig = new 
PaimonSourceConfig(readonlyConfig);
+        TablePath tablePath =
+                TablePath.of(paimonSourceConfig.getNamespace(), 
paimonSourceConfig.getTable());
+        this.catalogTable = paimonCatalog.getTable(tablePath);
+        this.paimonTable = paimonCatalog.getPaimonTable(tablePath);
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+        // TODO: We can use this to realize the column projection feature later
+        String filterSql = readonlyConfig.get(PaimonSourceConfig.QUERY_SQL);
+        this.predicate =
+                SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+                        this.paimonTable.rowType(), filterSql);
+    }
 
     @Override
     public String getPluginName() {
         return PLUGIN_NAME;
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-        final CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig, WAREHOUSE.key(), DATABASE.key(), 
TABLE.key());
-        if (!result.isSuccess()) {
-            throw new PaimonConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        // initialize paimon table
-        final String warehouse = pluginConfig.getString(WAREHOUSE.key());
-        final String database = pluginConfig.getString(DATABASE.key());
-        final String table = pluginConfig.getString(TABLE.key());
-        final Map<String, String> optionsMap = new HashMap<>();
-        optionsMap.put(WAREHOUSE.key(), warehouse);
-        final Options options = Options.fromMap(optionsMap);
-        final Configuration hadoopConf = new Configuration();
-        if (pluginConfig.hasPath(HDFS_SITE_PATH.key())) {
-            hadoopConf.addResource(new 
Path(pluginConfig.getString(HDFS_SITE_PATH.key())));
-        }
-        final CatalogContext catalogContext = CatalogContext.create(options, 
hadoopConf);
-        try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) {
-            Identifier identifier = Identifier.create(database, table);
-            this.table = catalog.getTable(identifier);
-        } catch (Exception e) {
-            String errorMsg =
-                    String.format(
-                            "Failed to get table [%s] from database [%s] on 
warehouse [%s]",
-                            database, table, warehouse);
-            throw new PaimonConnectorException(
-                    PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e);
-        }
-        // TODO: Support column projection
-        seaTunnelRowType = RowTypeConverter.convert(this.table.rowType());
-    }
-
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return seaTunnelRowType;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
     public SourceReader<SeaTunnelRow, PaimonSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-        return new PaimonSourceReader(readerContext, table, seaTunnelRowType);
+
+        return new PaimonSourceReader(readerContext, paimonTable, 
seaTunnelRowType, predicate);
     }
 
     @Override
     public SourceSplitEnumerator<PaimonSourceSplit, PaimonSourceState> 
createEnumerator(
             SourceSplitEnumerator.Context<PaimonSourceSplit> 
enumeratorContext) throws Exception {
-        return new PaimonSourceSplitEnumerator(enumeratorContext, table);
+        return new PaimonSourceSplitEnumerator(enumeratorContext, paimonTable, 
predicate);
     }
 
     @Override
@@ -141,6 +102,7 @@ public class PaimonSource
             SourceSplitEnumerator.Context<PaimonSourceSplit> enumeratorContext,
             PaimonSourceState checkpointState)
             throws Exception {
-        return new PaimonSourceSplitEnumerator(enumeratorContext, table, 
checkpointState);
+        return new PaimonSourceSplitEnumerator(
+                enumeratorContext, paimonTable, checkpointState, predicate);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
index 91042be7dd..c11906dec4 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceFactory.java
@@ -17,14 +17,24 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.source;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogEnum;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogFactory;
 import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class PaimonSourceFactory implements TableSourceFactory {
 
@@ -36,10 +46,15 @@ public class PaimonSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(PaimonConfig.WAREHOUSE)
-                .required(PaimonConfig.DATABASE)
-                .required(PaimonConfig.TABLE)
-                .optional(PaimonConfig.HDFS_SITE_PATH)
+                .required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE, 
PaimonConfig.TABLE)
+                .optional(
+                        PaimonConfig.CATALOG_TYPE,
+                        PaimonConfig.HDFS_SITE_PATH,
+                        PaimonSourceConfig.QUERY_SQL,
+                        PaimonConfig.HADOOP_CONF,
+                        PaimonConfig.HADOOP_CONF_PATH)
+                .conditional(
+                        PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE, 
PaimonConfig.CATALOG_URI)
                 .build();
     }
 
@@ -47,4 +62,19 @@ public class PaimonSourceFactory implements 
TableSourceFactory {
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return PaimonSource.class;
     }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        PaimonCatalogFactory paimonCatalogFactory = new PaimonCatalogFactory();
+        try (PaimonCatalog paimonCatalog =
+                (PaimonCatalog)
+                        
paimonCatalogFactory.createCatalog(factoryIdentifier(), readonlyConfig)) {
+            paimonCatalog.open();
+            return () ->
+                    (SeaTunnelSource<T, SplitT, StateT>)
+                            new PaimonSource(readonlyConfig, paimonCatalog);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
index 139365c04d..6cd1d87c63 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java
@@ -24,8 +24,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
 import lombok.extern.slf4j.Slf4j;
@@ -46,11 +48,14 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
     private final Table table;
     private final SeaTunnelRowType seaTunnelRowType;
     private volatile boolean noMoreSplit;
+    private final Predicate predicate;
 
-    public PaimonSourceReader(Context context, Table table, SeaTunnelRowType 
seaTunnelRowType) {
+    public PaimonSourceReader(
+            Context context, Table table, SeaTunnelRowType seaTunnelRowType, 
Predicate predicate) {
         this.context = context;
         this.table = table;
         this.seaTunnelRowType = seaTunnelRowType;
+        this.predicate = predicate;
     }
 
     @Override
@@ -70,13 +75,18 @@ public class PaimonSourceReader implements 
SourceReader<SeaTunnelRow, PaimonSour
             if (Objects.nonNull(split)) {
                 // read logic
                 try (final RecordReader<InternalRow> reader =
-                        
table.newReadBuilder().newRead().createReader(split.getSplit())) {
+                        table.newReadBuilder()
+                                .withFilter(predicate)
+                                .newRead()
+                                .executeFilter()
+                                .createReader(split.getSplit())) {
                     final RecordReaderIterator<InternalRow> rowIterator =
                             new RecordReaderIterator<>(reader);
                     while (rowIterator.hasNext()) {
                         final InternalRow row = rowIterator.next();
                         final SeaTunnelRow seaTunnelRow =
-                                RowConverter.convert(row, seaTunnelRowType);
+                                RowConverter.convert(
+                                        row, seaTunnelRowType, 
((FileStoreTable) table).schema());
                         output.collect(seaTunnelRow);
                     }
                 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
index aab44b2260..f6c76dc895 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.paimon.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
 
@@ -48,17 +49,25 @@ public class PaimonSourceSplitEnumerator
     /** The table that wants to read */
     private final Table table;
 
-    public PaimonSourceSplitEnumerator(Context<PaimonSourceSplit> context, 
Table table) {
+    private final Predicate predicate;
+
+    public PaimonSourceSplitEnumerator(
+            Context<PaimonSourceSplit> context, Table table, Predicate 
predicate) {
         this.context = context;
         this.table = table;
         this.assignedSplit = new HashSet<>();
+        this.predicate = predicate;
     }
 
     public PaimonSourceSplitEnumerator(
-            Context<PaimonSourceSplit> context, Table table, PaimonSourceState 
sourceState) {
+            Context<PaimonSourceSplit> context,
+            Table table,
+            PaimonSourceState sourceState,
+            Predicate predicate) {
         this.context = context;
         this.table = table;
         this.assignedSplit = sourceState.getAssignedSplits();
+        this.predicate = predicate;
     }
 
     @Override
@@ -146,7 +155,8 @@ public class PaimonSourceSplitEnumerator
     private Set<PaimonSourceSplit> getTableSplits() {
         final Set<PaimonSourceSplit> tableSplits = new HashSet<>();
         // TODO Support columns projection
-        final List<Split> splits = 
table.newReadBuilder().newScan().plan().splits();
+        final List<Split> splits =
+                
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
         splits.forEach(split -> tableSplits.add(new PaimonSourceSplit(split)));
         return tableSplits;
     }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
new file mode 100644
index 0000000000..7b076ba558
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source.converter;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DateTimeUtils;
+
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.expression.DateValue;
+import net.sf.jsqlparser.expression.DoubleValue;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.HexValue;
+import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.Parenthesis;
+import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.TimeValue;
+import net.sf.jsqlparser.expression.TimestampValue;
+import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
+import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
+import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
+import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
+import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
+import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
+import net.sf.jsqlparser.expression.operators.relational.MinorThan;
+import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
+import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.schema.Column;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.select.PlainSelect;
+import net.sf.jsqlparser.statement.select.Select;
+import net.sf.jsqlparser.statement.select.SelectBody;
+
+import java.math.BigDecimal;
+import java.util.Objects;
+import java.util.Optional;
+
+public class SqlToPaimonPredicateConverter {
+
+    public static Predicate convertSqlWhereToPaimonPredicate(RowType rowType, 
String query) {
+        try {
+            if (StringUtils.isBlank(query)) {
+                return null;
+            }
+            Statement statement = CCJSqlParserUtil.parse(query);
+            // Confirm that the SQL statement is a Select statement
+            if (!(statement instanceof Select)) {
+                throw new IllegalArgumentException("Only SELECT statements are 
supported.");
+            }
+            Select select = (Select) statement;
+            SelectBody selectBody = select.getSelectBody();
+            if (!(selectBody instanceof PlainSelect)) {
+                throw new IllegalArgumentException("Only simple SELECT 
statements are supported.");
+            }
+            PlainSelect plainSelect = (PlainSelect) selectBody;
+            if (plainSelect.getHaving() != null
+                    || plainSelect.getGroupBy() != null
+                    || plainSelect.getOrderByElements() != null
+                    || plainSelect.getLimit() != null) {
+                throw new IllegalArgumentException(
+                        "Only SELECT statements with WHERE clause are 
supported. The Having, Group By, Order By, Limit clauses are currently 
unsupported.");
+            }
+            Expression whereExpression = plainSelect.getWhere();
+            if (Objects.isNull(whereExpression)) {
+                return null;
+            }
+            PredicateBuilder builder = new PredicateBuilder(rowType);
+            return parseExpressionToPredicate(builder, rowType, 
whereExpression);
+        } catch (JSQLParserException e) {
+            throw new IllegalArgumentException("Error parsing SQL WHERE 
clause", e);
+        }
+    }
+
+    private static Predicate parseExpressionToPredicate(
+            PredicateBuilder builder, RowType rowType, Expression expression) {
+        if (expression instanceof IsNullExpression) {
+            IsNullExpression isNullExpression = (IsNullExpression) expression;
+            Column column = (Column) isNullExpression.getLeftExpression();
+            int columnIndex = getColumnIndex(builder, column);
+            if (isNullExpression.isNot()) {
+                return builder.isNotNull(columnIndex);
+            }
+            return builder.isNull(columnIndex);
+        } else if (expression instanceof EqualsTo) {
+            EqualsTo equalsTo = (EqualsTo) expression;
+            Column column = (Column) equalsTo.getLeftExpression();
+            int columnIndex = getColumnIndex(builder, column);
+            Object jsqlParserDataTypeValue =
+                    getJSQLParserDataTypeValue(equalsTo.getRightExpression());
+            Object paimonDataValue =
+                    convertValueByPaimonDataType(
+                            rowType, column.getColumnName(), 
jsqlParserDataTypeValue);
+            return builder.equal(columnIndex, paimonDataValue);
+        } else if (expression instanceof GreaterThan) {
+            GreaterThan greaterThan = (GreaterThan) expression;
+            Column column = (Column) greaterThan.getLeftExpression();
+            int columnIndex = getColumnIndex(builder, column);
+            Object jsqlParserDataTypeValue =
+                    
getJSQLParserDataTypeValue(greaterThan.getRightExpression());
+            Object paimonDataValue =
+                    convertValueByPaimonDataType(
+                            rowType, column.getColumnName(), 
jsqlParserDataTypeValue);
+            return builder.greaterThan(columnIndex, paimonDataValue);
+        } else if (expression instanceof GreaterThanEquals) {
+            GreaterThanEquals greaterThanEquals = (GreaterThanEquals) 
expression;
+            Column column = (Column) greaterThanEquals.getLeftExpression();
+            int columnIndex = getColumnIndex(builder, column);
+            Object jsqlParserDataTypeValue =
+                    
getJSQLParserDataTypeValue(greaterThanEquals.getRightExpression());
+            Object paimonDataValue =
+                    convertValueByPaimonDataType(
+                            rowType, column.getColumnName(), 
jsqlParserDataTypeValue);
+            return builder.greaterOrEqual(columnIndex, paimonDataValue);
+        } else if (expression instanceof MinorThan) {
+            MinorThan minorThan = (MinorThan) expression;
+            Column column = (Column) minorThan.getLeftExpression();
+            int columnIndex = getColumnIndex(builder, column);
+            Object jsqlParserDataTypeValue =
+                    getJSQLParserDataTypeValue(minorThan.getRightExpression());
+            Object paimonDataValue =
+                    convertValueByPaimonDataType(
+                            rowType, column.getColumnName(), 
jsqlParserDataTypeValue);
+            return builder.lessThan(columnIndex, paimonDataValue);
+        } else if (expression instanceof MinorThanEquals) {
+            MinorThanEquals minorThanEquals = (MinorThanEquals) expression;
+            Column column = (Column) minorThanEquals.getLeftExpression();
+            int columnIndex = getColumnIndex(builder, column);
+            Object jsqlParserDataTypeValue =
+                    
getJSQLParserDataTypeValue(minorThanEquals.getRightExpression());
+            Object paimonDataValue =
+                    convertValueByPaimonDataType(
+                            rowType, column.getColumnName(), 
jsqlParserDataTypeValue);
+            return builder.lessOrEqual(columnIndex, paimonDataValue);
+        } else if (expression instanceof NotEqualsTo) {
+            NotEqualsTo notEqualsTo = (NotEqualsTo) expression;
+            Column column = (Column) notEqualsTo.getLeftExpression();
+            int columnIndex = getColumnIndex(builder, column);
+            Object jsqlParserDataTypeValue =
+                    
getJSQLParserDataTypeValue(notEqualsTo.getRightExpression());
+            Object paimonDataValue =
+                    convertValueByPaimonDataType(
+                            rowType, column.getColumnName(), 
jsqlParserDataTypeValue);
+            return builder.notEqual(columnIndex, paimonDataValue);
+        } else if (expression instanceof AndExpression) {
+            AndExpression andExpression = (AndExpression) expression;
+            Predicate leftPredicate =
+                    parseExpressionToPredicate(builder, rowType, 
andExpression.getLeftExpression());
+            Predicate rightPredicate =
+                    parseExpressionToPredicate(
+                            builder, rowType, 
andExpression.getRightExpression());
+            return PredicateBuilder.and(leftPredicate, rightPredicate);
+        } else if (expression instanceof OrExpression) {
+            OrExpression orExpression = (OrExpression) expression;
+            Predicate leftPredicate =
+                    parseExpressionToPredicate(builder, rowType, 
orExpression.getLeftExpression());
+            Predicate rightPredicate =
+                    parseExpressionToPredicate(builder, rowType, 
orExpression.getRightExpression());
+            return PredicateBuilder.or(leftPredicate, rightPredicate);
+        } else if (expression instanceof Parenthesis) {
+            Parenthesis parenthesis = (Parenthesis) expression;
+            return parseExpressionToPredicate(builder, rowType, 
parenthesis.getExpression());
+        }
+        throw new IllegalArgumentException(
+                "Unsupported expression type: " + 
expression.getClass().getSimpleName());
+    }
+
+    private static Object convertValueByPaimonDataType(
+            RowType rowType, String columnName, Object 
jsqlParserDataTypeValue) {
+        Optional<DataField> theFiled =
+                rowType.getFields().stream()
+                        .filter(field -> 
field.name().equalsIgnoreCase(columnName))
+                        .findFirst();
+        String strValue = jsqlParserDataTypeValue.toString();
+        if (theFiled.isPresent()) {
+            DataType dataType = theFiled.get().type();
+            switch (dataType.getTypeRoot()) {
+                case CHAR:
+                case VARCHAR:
+                    return jsqlParserDataTypeValue;
+                case BOOLEAN:
+                    return Boolean.parseBoolean(strValue);
+                case DECIMAL:
+                    DecimalType decimalType = (DecimalType) dataType;
+                    return Decimal.fromBigDecimal(
+                            new BigDecimal(strValue),
+                            decimalType.getPrecision(),
+                            decimalType.getScale());
+                case TINYINT:
+                    return Byte.parseByte(strValue);
+                case SMALLINT:
+                    return Short.parseShort(strValue);
+                case INTEGER:
+                    return Integer.parseInt(strValue);
+                case BIGINT:
+                    return Long.parseLong(strValue);
+                case FLOAT:
+                    return Float.parseFloat(strValue);
+                case DOUBLE:
+                    return Double.parseDouble(strValue);
+                case DATE:
+                    return DateTimeUtils.toInternal(
+                            
org.apache.seatunnel.common.utils.DateUtils.parse(strValue));
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                    return Timestamp.fromLocalDateTime(
+                            
org.apache.seatunnel.common.utils.DateTimeUtils.parse(strValue));
+                default:
+                    throw new IllegalArgumentException(
+                            "Unsupported Paimon data type :" + 
dataType.getTypeRoot());
+            }
+        }
+        throw new IllegalArgumentException(
+                String.format("The column named [%s] is not exists", 
columnName));
+    }
+
+    private static Object getJSQLParserDataTypeValue(Expression expression) {
+        if (expression instanceof LongValue) {
+            return ((LongValue) expression).getValue();
+        } else if (expression instanceof StringValue || expression instanceof 
HexValue) {
+            return BinaryString.fromString(((StringValue) 
expression).getValue());
+        } else if (expression instanceof DoubleValue) {
+            return ((DoubleValue) expression).getValue();
+        } else if (expression instanceof DateValue) {
+            return ((DateValue) expression).getValue();
+        } else if (expression instanceof TimeValue) {
+            return ((TimeValue) expression).getValue();
+        } else if (expression instanceof TimestampValue) {
+            return ((TimestampValue) expression).getValue();
+        }
+        throw new IllegalArgumentException("Unsupported expression value type: 
" + expression);
+    }
+
+    private static int getColumnIndex(PredicateBuilder builder, Column column) 
{
+        int index = builder.indexOf(column.getColumnName());
+        if (index == -1) {
+            throw new IllegalArgumentException(
+                    String.format("The column named [%s] is not exists", 
column.getColumnName()));
+        }
+        return index;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
index 11d9b3b61a..f92d175c2a 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java
@@ -55,6 +55,7 @@ import java.time.LocalDateTime;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /** The converter for converting {@link InternalRow} and {@link SeaTunnelRow} 
*/
 public class RowConverter {
@@ -238,7 +239,8 @@ public class RowConverter {
      * @param seaTunnelRowType SeaTunnel row type
      * @return SeaTunnel row
      */
-    public static SeaTunnelRow convert(InternalRow rowData, SeaTunnelRowType 
seaTunnelRowType) {
+    public static SeaTunnelRow convert(
+            InternalRow rowData, SeaTunnelRowType seaTunnelRowType, 
TableSchema tableSchema) {
         Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
         for (int i = 0; i < objects.length; i++) {
             // judge the field is or not equals null
@@ -289,9 +291,15 @@ public class RowConverter {
                     objects[i] = DateTimeUtils.toLocalDate(dateInt);
                     break;
                 case TIMESTAMP:
-                    // Now SeaTunnel not supported assigned the timezone for 
timestamp,
-                    // so we use the default precision 6
-                    Timestamp timestamp = rowData.getTimestamp(i, 6);
+                    int precision = TimestampType.DEFAULT_PRECISION;
+                    Optional<DataField> precisionOptional =
+                            tableSchema.fields().stream()
+                                    .filter(dataField -> 
dataField.name().equals(fieldName))
+                                    .findFirst();
+                    if (precisionOptional.isPresent()) {
+                        precision = ((TimestampType) 
precisionOptional.get().type()).getPrecision();
+                    }
+                    Timestamp timestamp = rowData.getTimestamp(i, precision);
                     objects[i] = timestamp.toLocalDateTime();
                     break;
                 case ARRAY:
@@ -320,7 +328,7 @@ public class RowConverter {
                     SeaTunnelDataType<?> rowType = 
seaTunnelRowType.getFieldType(i);
                     InternalRow row =
                             rowData.getRow(i, ((SeaTunnelRowType) 
rowType).getTotalFields());
-                    objects[i] = convert(row, (SeaTunnelRowType) rowType);
+                    objects[i] = convert(row, (SeaTunnelRowType) rowType, 
tableSchema);
                     break;
                 default:
                     throw CommonError.unsupportedDataType(
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java
new file mode 100644
index 0000000000..d04b76c09b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverterTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.source.converter;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.DateTimeUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class SqlToPaimonPredicateConverterTest {
+
+    private RowType rowType;
+
+    @BeforeEach
+    public void setUp() {
+        rowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "char_col", new CharType()),
+                                new DataField(1, "varchar_col", new 
VarCharType()),
+                                new DataField(2, "boolean_col", new 
BooleanType()),
+                                new DataField(3, "binary_col", new 
VarBinaryType()),
+                                new DataField(4, "decimal_col", new 
DecimalType(10, 2)),
+                                new DataField(5, "tinyint_col", new 
TinyIntType()),
+                                new DataField(6, "smallint_col", new 
SmallIntType()),
+                                new DataField(7, "int_col", new IntType()),
+                                new DataField(8, "bigint_col", new 
BigIntType()),
+                                new DataField(9, "float_col", new FloatType()),
+                                new DataField(10, "double_col", new 
DoubleType()),
+                                new DataField(11, "date_col", new DateType()),
+                                new DataField(12, "timestamp_col", new 
TimestampType())));
+    }
+
+    @Test
+    public void testConvertSqlWhereToPaimonPredicate() {
+        String query =
+                "SELECT * FROM table WHERE "
+                        + "char_col = 'a' AND "
+                        + "varchar_col = 'test' AND "
+                        + "boolean_col = 'true' AND "
+                        + "decimal_col = 123.45 AND "
+                        + "tinyint_col = 1 AND "
+                        + "smallint_col = 2 AND "
+                        + "int_col = 3 AND "
+                        + "bigint_col = 4 AND "
+                        + "float_col = 5.5 AND "
+                        + "double_col = 6.6 AND "
+                        + "date_col = '2022-01-01' AND "
+                        + "timestamp_col = '2022-01-01T12:00:00.123'";
+
+        Predicate predicate =
+                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+
+        assertNotNull(predicate);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+
+        // Validate each part of the predicate
+        Predicate expectedPredicate =
+                PredicateBuilder.and(
+                        builder.equal(0, "a"),
+                        builder.equal(1, "test"),
+                        builder.equal(2, true),
+                        builder.equal(4, Decimal.fromBigDecimal(new 
BigDecimal("123.45"), 10, 2)),
+                        builder.equal(5, (byte) 1),
+                        builder.equal(6, (short) 2),
+                        builder.equal(7, 3),
+                        builder.equal(8, 4L),
+                        builder.equal(9, 5.5f),
+                        builder.equal(10, 6.6d),
+                        builder.equal(11, 
DateTimeUtils.toInternal(LocalDate.parse("2022-01-01"))),
+                        builder.equal(
+                                12,
+                                Timestamp.fromLocalDateTime(
+                                        
LocalDateTime.parse("2022-01-01T12:00:00.123"))));
+
+        assertEquals(expectedPredicate.toString(), predicate.toString());
+    }
+
+    @Test
+    public void testConvertSqlWhereToPaimonPredicateWithIsNull() {
+        String query = "SELECT * FROM table WHERE char_col IS NULL";
+
+        Predicate predicate =
+                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+
+        assertNotNull(predicate);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate expectedPredicate = builder.isNull(0);
+
+        assertEquals(expectedPredicate.toString(), predicate.toString());
+    }
+
+    @Test
+    public void testConvertSqlWhereToPaimonPredicateWithIsNotNull() {
+        String query = "SELECT * FROM table WHERE char_col IS NOT NULL";
+
+        Predicate predicate =
+                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+
+        assertNotNull(predicate);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate expectedPredicate = builder.isNotNull(0);
+
+        assertEquals(expectedPredicate.toString(), predicate.toString());
+    }
+
+    @Test
+    public void testConvertSqlWhereToPaimonPredicateWithAnd() {
+        String query = "SELECT * FROM table WHERE int_col > 3 AND double_col < 
6.6";
+
+        Predicate predicate =
+                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+
+        assertNotNull(predicate);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate expectedPredicate =
+                PredicateBuilder.and(builder.greaterThan(7, 3), 
builder.lessThan(10, 6.6d));
+
+        assertEquals(expectedPredicate.toString(), predicate.toString());
+    }
+
+    @Test
+    public void testConvertSqlWhereToPaimonPredicateWithOr() {
+        String query = "SELECT * FROM table WHERE int_col > 3 OR double_col < 
6.6";
+
+        Predicate predicate =
+                
SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(rowType, query);
+
+        assertNotNull(predicate);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate expectedPredicate =
+                PredicateBuilder.or(builder.greaterThan(7, 3), 
builder.lessThan(10, 6.6d));
+
+        assertEquals(expectedPredicate.toString(), predicate.toString());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
index fd0bf33381..ebde744d03 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverterTest.java
@@ -235,7 +235,7 @@ public class RowConverterTest {
 
     @Test
     public void paimonToSeaTunnel() {
-        SeaTunnelRow convert = RowConverter.convert(internalRow, 
seaTunnelRowType);
+        SeaTunnelRow convert = RowConverter.convert(internalRow, 
seaTunnelRowType, tableSchema);
         Assertions.assertEquals(convert, seaTunnelRow);
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 05fa3db4b9..90b9a63cdd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -341,7 +341,8 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
     }
 
     @TestTemplate
-    public void testFakeCDCSinkPaimonWithTimestampN(TestContainer container) 
throws Exception {
+    public void testFakeCDCSinkPaimonWithTimestampNAndRead(TestContainer 
container)
+            throws Exception {
         Container.ExecResult execResult = 
container.executeJob("/fake_cdc_sink_paimon_case7.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
 
@@ -404,6 +405,10 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
                                         "2024-03-10T10:00:00.123456789");
                             }
                         });
+
+        Container.ExecResult readResult =
+                container.executeJob("/paimon_to_assert_with_timestampN.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
     }
 
     @TestTemplate
@@ -457,6 +462,26 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
                         });
     }
 
+    @TestTemplate
+    public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer 
container)
+            throws Exception {
+        Container.ExecResult writeResult =
+                container.executeJob("/fake_to_paimon_with_full_type.conf");
+        Assertions.assertEquals(0, writeResult.getExitCode());
+        Container.ExecResult readResult =
+                container.executeJob("/paimon_to_assert_with_filter1.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
+        Container.ExecResult readResult2 =
+                container.executeJob("/paimon_to_assert_with_filter2.conf");
+        Assertions.assertEquals(0, readResult2.getExitCode());
+        Container.ExecResult readResult3 =
+                container.executeJob("/paimon_to_assert_with_filter3.conf");
+        Assertions.assertEquals(0, readResult3.getExitCode());
+        Container.ExecResult readResult4 =
+                container.executeJob("/paimon_to_assert_with_filter4.conf");
+        Assertions.assertEquals(0, readResult4.getExitCode());
+    }
+
     protected final ContainerExtendedFactory containerExtendedFactory =
             container -> {
                 FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
index 5d2ce86c2b..259bc0128a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java
@@ -148,10 +148,15 @@ public class PaimonSinkHdfsIT extends TestSuiteBase {
                                         }
                                     });
                         });
+
+        Container.ExecResult readResult =
+                
container.executeJob("/read_from_paimon_with_hdfs_ha_to_assert.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
     }
 
     @TestTemplate
-    public void testFakeCDCSinkPaimonWithHiveCatalog(TestContainer container) 
throws Exception {
+    public void testFakeCDCSinkPaimonWithHiveCatalogAndRead(TestContainer 
container)
+            throws Exception {
         Container.ExecResult execResult =
                 
container.executeJob("/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
@@ -194,5 +199,9 @@ public class PaimonSinkHdfsIT extends TestSuiteBase {
                                         }
                                     });
                         });
+
+        Container.ExecResult readResult =
+                
container.executeJob("/paimon_to_assert_with_hivecatalog.conf");
+        Assertions.assertEquals(0, readResult.getExitCode());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
new file mode 100644
index 0000000000..df53bde66e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+      primaryKey {
+        name = "c_tinyint"
+        columnNames = [c_tinyint]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [{"a": "b"}, [101], "c_string", true, 117, 15987, 563873951, 
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", 
"bWlJWmo=", "2023-04-21", "2023-04-21T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "c"}, [102], "c_string1", false, 118, 15988, 
563873952, 7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212", 
"bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "e"}, [103], "c_string2", true, 119, 15987, 563873953, 
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", 
"bWlJWmo=", "2023-04-23", "2023-04-23T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "f"}, [104], null, false, 118, 15988, 563873951, 
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214", 
"bWlJWmo=", "2023-04-24", "2023-04-24T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "b"}, [101], "c_string1", true, 120, 15987, 563873952, 
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", 
"bWlJWmo=", "2023-04-25", "2023-04-25T23:20:58"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = [{"a": "c"}, [102], "c_string2", false, 116, 15987, 
563873953, 7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212", 
"bWlJWmo=", "2023-04-26", "2023-04-26T23:20:58"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = [{"a": "e"}, [103], "c_string3", true, 116, 15989, 563873951, 
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", 
"bWlJWmo=", "2023-04-27", "2023-04-27T23:20:58"]
+      }
+      {
+        kind = DELETE
+        fields = [{"a": "f"}, [104], "c_string4", true, 120, 15987, 563873952, 
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214", 
"bWlJWmo=", "2023-04-28", "2023-04-28T23:20:58"]
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
new file mode 100644
index 0000000000..0a1b88d1d3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter1.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+    query = "select * from st_test where c_string is not null"
+    result_table_name = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 3
+        }
+        {
+          rule_type = MIN_ROW
+          rule_value = 3
+        }
+      ]
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
new file mode 100644
index 0000000000..c516e1382b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter2.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+    query = "select * from st_test where c_string='c_string2'"
+    result_table_name = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 1
+        }
+        {
+          rule_type = MIN_ROW
+          rule_value = 1
+        }
+      ]
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "c_string2"
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
new file mode 100644
index 0000000000..b30dbee56f
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter3.conf
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+    query = "select * from st_test where c_boolean= 'true' and c_tinyint > 116 
and c_smallint = 15987"
+    result_table_name = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 2
+        }
+        {
+          rule_type = MIN_ROW
+          rule_value = 2
+        }
+      ]
+      field_rules = [
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "true"
+            }
+          ]
+        }
+        {
+            field_name = c_tinyint
+            field_type = tinyint
+            field_value = [
+                {
+                    rule_type = MIN
+                    rule_value = 116
+                }
+            ]
+        }
+        {
+            field_name = c_smallint
+            field_type = smallint
+            field_value = [
+                {
+                    rule_type = NOT_NULL
+                    equals_to = 15987
+                }
+            ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
new file mode 100644
index 0000000000..7f0872114c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter4.conf
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+    query = "select * from st_test where c_date > '2023-04-21' and 
c_timestamp='2023-04-27 23:20:58'"
+    result_table_name = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 1
+        }
+        {
+          rule_type = MIN_ROW
+          rule_value = 1
+        }
+      ]
+      field_rules = [
+        {
+            field_name = c_date
+            field_type = date
+            field_value = [
+                {
+                    rule_type = NOT_NULL
+                    equals_to = "2023-04-27"
+                }
+            ]
+        }
+        {
+            field_name = c_timestamp
+            field_type = timestamp
+            field_value = [
+                {
+                    rule_type = NOT_NULL
+                    equals_to = "2023-04-27T23:20:58"
+                }
+            ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
new file mode 100644
index 0000000000..8b7077916e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_hivecatalog.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    catalog_name="seatunnel_test"
+    catalog_type="hive"
+    catalog_uri="thrift://hadoop04:9083"
+    warehouse="hdfs:///tmp/seatunnel"
+    database="seatunnel_test"
+    table="st_test3"
+    paimon.hadoop.conf = {
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+    }
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 2
+        }
+        {
+          rule_type = MIN_ROW
+          rule_value = 2
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
new file mode 100644
index 0000000000..63c489364b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_timestampN.conf
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "seatunnel_namespace7"
+    table = "st_test"
+    result_table_name = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 2
+        }
+        {
+          rule_type = MIN_ROW
+          rule_value = 2
+        }
+      ]
+      field_rules = [
+        {
+          field_name = one_time
+          field_type = timestamp
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "2024-03-10T10:00:12"
+            }
+          ]
+        }
+        {
+          field_name = two_time
+          field_type = timestamp
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "2024-03-10T10:00:00.123"
+            }
+          ]
+        }
+        {
+          field_name = three_time
+          field_type = timestamp
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "2024-03-10T10:00:00.123456"
+            }
+          ]
+        }
+        {
+          field_name = four_time
+          field_type = timestamp
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "2024-03-10T10:00:00.123456789"
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
new file mode 100644
index 0000000000..29dc2be324
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/read_from_paimon_with_hdfs_ha_to_assert.conf
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    catalog_name="seatunnel_test"
+    warehouse="hdfs:///tmp/paimon"
+    database="seatunnel_namespace1"
+    table="st_test"
+    query = "select * from st_test where pk_id is not null and pk_id < 3"
+    paimon.hadoop.conf = {
+      fs.defaultFS = "hdfs://nameservice1"
+      dfs.nameservices = "nameservice1"
+      dfs.ha.namenodes.nameservice1 = "nn1,nn2"
+      dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
+      dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
+      dfs.client.failover.proxy.provider.nameservice1 = 
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+      dfs.client.use.datanode.hostname = "true"
+    }
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 1
+        }
+        {
+          rule_type = MIN_ROW
+          rule_value = 1
+        }
+      ]
+      field_rules = [
+        {
+          field_name = pk_id
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = 1
+            }
+          ]
+        }
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "A_1"
+            }
+          ]
+        }
+        {
+          field_name = score
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = 100
+            }
+          ]
+        }
+      ]
+    }
+  }
+}

Reply via email to