This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1ec43755d5 [feature][connectors-v2] Support in predicate pushdown in
paimon (#9379)
1ec43755d5 is described below
commit 1ec43755d589841ee55206dac9758d55505364d3
Author: zhangdonghao <[email protected]>
AuthorDate: Thu May 29 20:38:23 2025 +0800
[feature][connectors-v2] Support in predicate pushdown in paimon (#9379)
---
docs/en/connector-v2/source/Paimon.md | 2 +-
docs/zh/connector-v2/source/Paimon.md | 2 +-
.../converter/SqlToPaimonPredicateConverter.java | 48 ++++++++++++
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 6 ++
.../resources/paimon_to_assert_with_filter6.conf | 86 ++++++++++++++++++++++
.../resources/paimon_to_assert_with_filter7.conf | 86 ++++++++++++++++++++++
6 files changed, 228 insertions(+), 2 deletions(-)
diff --git a/docs/en/connector-v2/source/Paimon.md
b/docs/en/connector-v2/source/Paimon.md
index 8e390fc4b6..2237914fb2 100644
--- a/docs/en/connector-v2/source/Paimon.md
+++ b/docs/en/connector-v2/source/Paimon.md
@@ -58,7 +58,7 @@ The file path of `hdfs-site.xml`
### query [string]
The filter condition of the table read. For example: `select * from st_test
where id > 100`. If not specified, all rows are read.
-Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null,
is not null, between...and, and others are not supported.
+Currently, where conditions only support <, <=, >, >=, =, !=, or, and,is null,
is not null, between...and, in, not in, and others are not supported.
The Having, Group By, Order By clauses are currently unsupported, because
these clauses are not supported by Paimon.
The projection and limit will be supported in the future.
diff --git a/docs/zh/connector-v2/source/Paimon.md
b/docs/zh/connector-v2/source/Paimon.md
index 965435fd55..a34d56a4f6 100644
--- a/docs/zh/connector-v2/source/Paimon.md
+++ b/docs/zh/connector-v2/source/Paimon.md
@@ -59,7 +59,7 @@ Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要
读取表格的筛选条件,例如:`select * from st_test where id > 100`。如果未指定,则将读取所有记录。
-目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null,
between...and`,其他暂不支持。
+目前,`where` 支持`<, <=, >, >=, =, !=, or, and,is null, is not null,
between...and, in , not in`,其他暂不支持。
由于 Paimon 限制,目前不支持 `Having`, `Group By` 和 `Order By`,未来版本将会支持 `projection` 和
`limit`。
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
index 03c9b4ab7b..a3fa418750 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/converter/SqlToPaimonPredicateConverter.java
@@ -48,10 +48,12 @@ import
net.sf.jsqlparser.expression.operators.relational.Between;
import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
+import net.sf.jsqlparser.expression.operators.relational.InExpression;
import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
import net.sf.jsqlparser.expression.operators.relational.MinorThan;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
+import
net.sf.jsqlparser.expression.operators.relational.ParenthesedExpressionList;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.statement.Statement;
@@ -242,11 +244,57 @@ public class SqlToPaimonPredicateConverter {
} else if (expression instanceof Parenthesis) {
Parenthesis parenthesis = (Parenthesis) expression;
return parseExpressionToPredicate(builder, rowType,
parenthesis.getExpression());
+ } else if (expression instanceof InExpression) {
+ return handleInExpression(builder, rowType, (InExpression)
expression);
}
throw new IllegalArgumentException(
"Unsupported expression type: " +
expression.getClass().getSimpleName());
}
+ private static Predicate handleInExpression(
+ PredicateBuilder builder, RowType rowType, InExpression expr) {
+ Expression left = expr.getLeftExpression();
+ Column column = safeGetColumn(left);
+ int index = getColumnIndex(builder, column);
+
+ Expression right = expr.getRightExpression();
+ if (!(right instanceof ParenthesedExpressionList)) {
+ throw new IllegalArgumentException(
+ "Unsupported right expression in IN: expected a
parenthesized expression list");
+ }
+
+ ParenthesedExpressionList list = (ParenthesedExpressionList) right;
+ List<Expression> expressions = list.getExpressions();
+ if (expressions.isEmpty()) {
+ throw new IllegalArgumentException("Empty value list in IN clause
is not allowed");
+ }
+
+ List<Object> values = new ArrayList<>(expressions.size());
+ for (Expression expression : expressions) {
+ Object rawVal = getJSQLParserDataTypeValue(expression);
+ if (rawVal == null) {
+ throw new IllegalArgumentException("Null value found in IN
clause values");
+ }
+ Object convertedVal =
+ convertValueByPaimonDataType(rowType,
column.getColumnName(), rawVal);
+ if (convertedVal == null) {
+ throw new IllegalArgumentException(
+ "Failed to convert value in IN clause: " + rawVal);
+ }
+ values.add(convertedVal);
+ }
+
+ return expr.isNot() ? builder.notIn(index, values) : builder.in(index,
values);
+ }
+
+ private static Column safeGetColumn(Expression expr) {
+ if (!(expr instanceof Column)) {
+ throw new IllegalArgumentException(
+ "Expected Column expression, but got: " +
expr.getClass().getSimpleName());
+ }
+ return (Column) expr;
+ }
+
private static Object convertValueByPaimonDataType(
RowType rowType, String columnName, Object
jsqlParserDataTypeValue) {
Optional<DataField> theFiled =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 540fa90269..b2926039c3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -486,6 +486,12 @@ public class PaimonSinkCDCIT extends AbstractPaimonIT
implements TestResource {
Container.ExecResult readResult5 =
container.executeJob("/paimon_to_assert_with_filter5.conf");
Assertions.assertEquals(0, readResult5.getExitCode());
+ Container.ExecResult readResult6 =
+ container.executeJob("/paimon_to_assert_with_filter6.conf");
+ Assertions.assertEquals(0, readResult6.getExitCode());
+ Container.ExecResult readResult7 =
+ container.executeJob("/paimon_to_assert_with_filter7.conf");
+ Assertions.assertEquals(0, readResult7.getExitCode());
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
new file mode 100644
index 0000000000..785f294cf6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter6.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "full_type"
+ table = "st_test"
+ query = "select * from st_test where c_boolean= 'true' and c_smallint =
15987 and c_tinyint in (117, 118, 119)"
+ plugin_output = paimon_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = paimon_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ }
+ ]
+ field_rules = [
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "true"
+ }
+ ]
+ }
+ {
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 117
+ },
+ {
+ rule_type = MAX
+ rule_value = 119
+ }
+
+ ]
+ }
+ {
+ field_name = c_smallint
+ field_type = smallint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 15987
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
new file mode 100644
index 0000000000..daa5a1fb1e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert_with_filter7.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "full_type"
+ table = "st_test"
+ query = "select * from st_test where c_boolean= 'true' and c_smallint =
15987 and c_tinyint not in (116, 120)"
+ plugin_output = paimon_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = paimon_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ }
+ ]
+ field_rules = [
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "true"
+ }
+ ]
+ }
+ {
+ field_name = c_tinyint
+ field_type = tinyint
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 117
+ },
+ {
+ rule_type = MAX
+ rule_value = 119
+ }
+
+ ]
+ }
+ {
+ field_name = c_smallint
+ field_type = smallint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 15987
+ }
+ ]
+ }
+ ]
+ }
+ }
+}