This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3b141cf621 [Feature][Connector-V2] Support between predicate pushdown 
in paimon (#8962)
3b141cf621 is described below

commit 3b141cf621b3e49172db0f9b3f7db8d212cf0296
Author: xiaochen <598457...@qq.com>
AuthorDate: Thu Mar 13 15:21:16 2025 +0800

    [Feature][Connector-V2] Support between predicate pushdown in paimon (#8962)
---
 docs/en/connector-v2/source/Paimon.md              |  2 +-
 docs/zh/connector-v2/source/Paimon.md              |  2 +-
 .../converter/SqlToPaimonPredicateConverter.java   | 12 +++
 .../source/converter/SqlToPaimonConverterTest.java | 17 +++++
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      |  3 +
 .../resources/paimon_to_assert_with_filter5.conf   | 86 ++++++++++++++++++++++
 6 files changed, 120 insertions(+), 2 deletions(-)

diff --git a/docs/en/connector-v2/source/Paimon.md 
b/docs/en/connector-v2/source/Paimon.md
index 94c1dc0195..ef833db24c 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -56,7 +56,7 @@ The file path of `hdfs-site.xml`
 ### query [string]
 
 The filter condition of the table read. For example: `select * from st_test 
where id > 100`. If not specified, all rows are read.
-Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, 
is not null, and others are not supported.
+Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null, 
is not null, between...and, and others are not supported.
 The Having, Group By, Order By clauses are currently unsupported, because 
these clauses are not supported by Paimon.
 The projection and limit will be supported in the future.
 
diff --git a/docs/zh/connector-v2/source/Paimon.md 
b/docs/zh/connector-v2/source/Paimon.md
index 1ed64857ca..d9f67441a8 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -57,7 +57,7 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
 
 读取表格的筛选条件,例如:`select * from st_test where id > 100`。如果未指定,则将读取所有记录。 
 
-目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null`,其他暂不支持。 
+目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null, 
between...and`,其他暂不支持。 
 
 由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`,未来版本将会支持 `projection` 和 
`limit`。
 
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
index e938011f68..03c9b4ab7b 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -44,6 +44,7 @@ import net.sf.jsqlparser.expression.TimeValue;
 import net.sf.jsqlparser.expression.TimestampValue;
 import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
 import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
+import net.sf.jsqlparser.expression.operators.relational.Between;
 import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
 import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
 import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
@@ -227,6 +228,17 @@ public class SqlToPaimonPredicateConverter {
             Predicate rightPredicate =
                     parseExpressionToPredicate(builder, rowType, 
orExpression.getRightExpression());
             return PredicateBuilder.or(leftPredicate, rightPredicate);
+        } else if (expression instanceof Between) {
+            Between between = (Between) expression;
+            Column column = (Column) between.getLeftExpression();
+            int columnIndex = getColumnIndex(builder, column);
+            Object jsqlStartVal = 
getJSQLParserDataTypeValue(between.getBetweenExpressionStart());
+            Object paimonStartVal =
+                    convertValueByPaimonDataType(rowType, 
column.getColumnName(), jsqlStartVal);
+            Object jsqlEndVal = 
getJSQLParserDataTypeValue(between.getBetweenExpressionEnd());
+            Object paimonEndVal =
+                    convertValueByPaimonDataType(rowType, 
column.getColumnName(), jsqlEndVal);
+            return builder.between(columnIndex, paimonStartVal, paimonEndVal);
         } else if (expression instanceof Parenthesis) {
             Parenthesis parenthesis = (Parenthesis) expression;
             return parseExpressionToPredicate(builder, rowType, 
parenthesis.getExpression());
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
index 8b717b9e87..26eee05477 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonConverterTest.java
@@ -206,6 +206,23 @@ public class SqlToPaimonConverterTest {
         assertEquals(expectedPredicate.toString(), predicate.toString());
     }
 
+    @Test
+    public void testConvertSqlWhereToPaimonPredicateWithBetween() {
+        String query = "SELECT * FROM table WHERE int_col between 3 and 6";
+
+        PlainSelect plainSelect = convertToPlainSelect(query);
+        Predicate predicate =
+                SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate(
+                        rowType, plainSelect);
+
+        assertNotNull(predicate);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate expectedPredicate = PredicateBuilder.or(builder.between(7, 
3, 6));
+
+        assertEquals(expectedPredicate.toString(), predicate.toString());
+    }
+
     @Test
     public void testConvertSqlSelectToPaimonProjectionArrayWithALL() {
         String query = "SELECT * FROM table WHERE int_col > 3 OR double_col < 
6.6";
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 0453facae5..cb947fba9e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -485,6 +485,9 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT 
implements TestResource {
         Container.ExecResult readResult4 =
                 container.executeJob("/paimon_to_assert_with_filter4.conf");
         Assertions.assertEquals(0, readResult4.getExitCode());
+        Container.ExecResult readResult5 =
+                container.executeJob("/paimon_to_assert_with_filter5.conf");
+        Assertions.assertEquals(0, readResult5.getExitCode());
     }
 
     @TestTemplate
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
new file mode 100644
index 0000000000..d1d43ecfd2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter5.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+    query = "select * from st_test where c_boolean= 'true' and c_smallint = 
15987 and c_tinyint between 116 and 120"
+    plugin_output = paimon_source
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = paimon_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 2
+        }
+        {
+          rule_type = MIN_ROW
+          rule_value = 2
+        }
+      ]
+      field_rules = [
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+              equals_to = "true"
+            }
+          ]
+        }
+        {
+            field_name = c_tinyint
+            field_type = tinyint
+            field_value = [
+                {
+                    rule_type = MIN
+                    rule_value = 116
+                },
+                {
+                    rule_type = MAX
+                    rule_value = 120
+                }
+
+            ]
+        }
+        {
+            field_name = c_smallint
+            field_type = smallint
+            field_value = [
+                {
+                    rule_type = NOT_NULL
+                    equals_to = 15987
+                }
+            ]
+        }
+      ]
+    }
+  }
+}

Reply via email to