This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2714dd1105 [Feature][connector-kudu] implement the filter (#9405)
2714dd1105 is described below

commit 2714dd11058cff071035358d3ffc72489f005fdd
Author: WenDing-Y <[email protected]>
AuthorDate: Mon Jun 9 11:45:44 2025 +0800

    [Feature][connector-kudu] implement the filter (#9405)
---
 docs/en/connector-v2/source/Kudu.md                |   4 +-
 .../kudu/config/KuduSourceTableConfig.java         |   6 +-
 .../connectors/seatunnel/kudu/util/KuduUtil.java   | 108 ++++++++++++++++++++-
 .../seatunnel/e2e/connector/kudu/KuduIT.java       |  14 +++
 .../src/test/resources/kudu_to_assert.conf         |  76 +++++++++++++++
 .../src/test/resources/kudu_to_assert_equal.conf   |  78 +++++++++++++++
 .../src/test/resources/kudu_to_assert_range.conf   |  77 +++++++++++++++
 7 files changed, 358 insertions(+), 5 deletions(-)

diff --git a/docs/en/connector-v2/source/Kudu.md 
b/docs/en/connector-v2/source/Kudu.md
index 7a57b8dba6..22e1442d56 100644
--- a/docs/en/connector-v2/source/Kudu.md
+++ b/docs/en/connector-v2/source/Kudu.md
@@ -44,7 +44,7 @@ The tested kudu version is 1.11.1.
 
 ## Source Options
 
-|                   Name                    |  Type  | Required |              
      Default                     |                                             
                                              Description                       
                                                                     |
+|                   Name                    | Type   | Required | Default      
                                  | Description                                 
                                                                                
                                                                     |
 
|-------------------------------------------|--------|----------|------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | kudu_masters                              | String | Yes      | -            
                                  | Kudu master address. Separated by ',',such 
as '192.168.88.110:7051'.                                                       
                                                                      |
 | table_name                                | String | Yes      | -            
                                  | The name of kudu table.                     
                                                                                
                                                                     |
@@ -57,7 +57,7 @@ The tested kudu version is 1.11.1.
 | kerberos_krb5conf                         | String | No       | -            
                                  | Kerberos krb5 conf. Note that all zeta 
nodes require have this file.                                                   
                                                                          |
 | scan_token_query_timeout                  | Long   | No       | 30000        
                                  | The timeout for connecting scan token. If 
not set, it will be the same as operationTimeout.                               
                                                                       |
 | scan_token_batch_size_bytes               | Int    | No       | 1024 * 1024  
                                  | Kudu scan bytes. The maximum number of 
bytes read at a time, the default is 1MB.                                       
                                                                          |
-| filter                                    | Int    | No       | 1024 * 1024  
                                  | Kudu scan filter expressions,Not supported 
yet.                                                                            
                                                                      |
+| filter                                    | String | No       | -            
                                  | Kudu scan filter expressions,example id > 
100 AND id < 200.                                                               
                                                                       |
 | schema                                    | Map    | No       | 1024 * 1024  
                                  | SeaTunnel Schema.                           
                                                                                
                                                                     |
 | table_list                                | Array  | No       | -            
                                  | The list of tables to be read. you can use 
this configuration instead of `table_path` example: ```table_list = [{ 
table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ``` |
 | common-options                            |        | No       | -            
                                  | Source plugin common parameters, please 
refer to [Source Common Options](../source-common-options.md) for details.      
                                                                         |
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
index 09dadd81bc..742c33a46d 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
@@ -45,9 +45,10 @@ public class KuduSourceTableConfig implements Serializable {
 
     private String filter;
 
-    private KuduSourceTableConfig(String tablePath, CatalogTable catalogTable) 
{
+    private KuduSourceTableConfig(String tablePath, CatalogTable catalogTable, 
String filter) {
         this.tablePath = TablePath.of(tablePath);
         this.catalogTable = catalogTable;
+        this.filter = filter;
     }
 
     public static List<KuduSourceTableConfig> of(ReadonlyConfig config) {
@@ -82,6 +83,7 @@ public class KuduSourceTableConfig implements Serializable {
             catalogTable =
                     
kuduCatalog.getTable(TablePath.of(config.get(KuduBaseOptions.TABLE_NAME)));
         }
-        return new KuduSourceTableConfig(tableName, catalogTable);
+        return new KuduSourceTableConfig(
+                tableName, catalogTable, config.get(KuduSourceOptions.FILTER));
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java
index 27374d0a3a..e6ff36b9af 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java
@@ -28,9 +28,12 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
 import org.apache.kudu.client.AsyncKuduClient;
 import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduPredicate;
 import org.apache.kudu.client.KuduScanToken;
 import org.apache.kudu.client.KuduTable;
 
@@ -42,6 +45,8 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 public class KuduUtil {
@@ -139,6 +144,107 @@ public class KuduUtil {
 
     private static void addPredicates(
             KuduScanToken.KuduScanTokenBuilder kuduScanTokenBuilder, String 
filter, Schema schema) {
-        // todo Support for where condition
+
+        log.info("Adding predicates to Kudu scan token: {}", filter);
+
+        List<ColumnSchema> columns = schema.getColumns();
+        for (ColumnSchema column : columns) {
+            log.info(" column name " + column.getName());
+        }
+
+        if (StringUtils.isBlank(filter)) {
+            return;
+        }
+
+        List<String> conditions = 
Arrays.asList(filter.trim().split("\\s+AND\\s+"));
+
+        Pattern pattern = Pattern.compile("(\\w+)\\s*([=><]=?|<=|>=)\\s*(.+)");
+        for (String condition : conditions) {
+            Matcher matcher = pattern.matcher(condition.trim());
+
+            String column = null;
+            String op = null;
+            String value = null;
+
+            if (matcher.matches()) {
+                column = matcher.group(1);
+                op = matcher.group(2);
+                value = matcher.group(3);
+            } else {
+                throw new IllegalArgumentException("Invalid filter condition: 
" + condition);
+            }
+
+            if (!schema.hasColumn(column)) {
+                throw new KuduConnectorException(
+                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                        "Column not found in Kudu schema: " + column);
+            }
+
+            Type type = schema.getColumn(column).getType();
+
+            KuduPredicate.ComparisonOp comparisonOp = null;
+            switch (op) {
+                case "=":
+                    comparisonOp = KuduPredicate.ComparisonOp.EQUAL;
+                    break;
+                case ">":
+                    comparisonOp = KuduPredicate.ComparisonOp.GREATER;
+                    break;
+                case ">=":
+                    comparisonOp = KuduPredicate.ComparisonOp.GREATER_EQUAL;
+                    break;
+                case "<":
+                    comparisonOp = KuduPredicate.ComparisonOp.LESS;
+                    break;
+                case "<=":
+                    comparisonOp = KuduPredicate.ComparisonOp.LESS_EQUAL;
+                    break;
+                default:
+                    throw new KuduConnectorException(
+                            CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                            "Unsupported operator: " + op);
+            }
+
+            Object parsedValue = parseValue(type, value);
+
+            KuduPredicate predicate =
+                    KuduPredicate.newComparisonPredicate(
+                            schema.getColumn(column), comparisonOp, 
parsedValue);
+            kuduScanTokenBuilder.addPredicate(predicate);
+        }
+    }
+
+    private static Object parseValue(Type type, String value) {
+        try {
+            switch (type.getDataType()) {
+                case INT8:
+                    return Byte.valueOf(value);
+                case INT16:
+                    return Short.valueOf(value);
+                case INT32:
+                    return Integer.valueOf(value);
+                case INT64:
+                    return Long.valueOf(value);
+                case STRING:
+                    return value.startsWith("'") && value.endsWith("'")
+                            ? value.substring(1, value.length() - 1)
+                            : value;
+                case BOOL:
+                    return Boolean.valueOf(value);
+                case UNIXTIME_MICROS:
+                    return new java.sql.Timestamp(Long.parseLong(value));
+                case FLOAT:
+                    return Float.valueOf(value);
+                case DOUBLE:
+                    return Double.valueOf(value);
+                default:
+                    throw new IllegalArgumentException("Unsupported type: " + 
type);
+            }
+        } catch (NumberFormatException e) {
+            throw new KuduConnectorException(
+                    CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                    "Failed to parse value '" + value + "' as type " + type,
+                    e);
+        }
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index 255f1c5bfe..047e5bc07f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -340,6 +340,20 @@ public class KuduIT extends TestSuiteBase implements 
TestResource {
         kuduClient.deleteTable(KUDU_SINK_TABLE);
     }
 
+    @TestTemplate
+    public void testKuduFilter(TestContainer container) throws IOException, 
InterruptedException {
+        initializeKuduTable();
+        batchInsertData();
+        Container.ExecResult execResult = 
container.executeJob("/kudu_to_assert.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        Container.ExecResult execResultRange = 
container.executeJob("/kudu_to_assert_range.conf");
+        Assertions.assertEquals(0, execResultRange.getExitCode());
+        Container.ExecResult execResultEqual = 
container.executeJob("/kudu_to_assert_equal.conf");
+        Assertions.assertEquals(0, execResultEqual.getExitCode());
+        kuduClient.deleteTable(KUDU_SOURCE_TABLE);
+        kuduClient.deleteTable(KUDU_SINK_TABLE);
+    }
+
     @DisabledOnContainer(
             value = {},
             type = {EngineType.SPARK},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert.conf
new file mode 100644
index 0000000000..3b033909ca
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  kudu{
+   kudu_masters = "kudu-master:7051"
+   table_name = "kudu_source_table"
+   plugin_output = "kudu"
+   filter = "id>=1 AND id<=2"
+
+}
+}
+
+transform {
+
+}
+
+sink {
+   Assert{
+   plugin_input = "kudu"
+       rules =
+         {
+           field_rules = [
+             {
+               field_name = id
+               field_type = INT
+               field_value = [
+                 {
+                   rule_type = MIN
+                   rule_value = 1
+                 },
+                 {
+                   rule_type = MAX
+                   rule_value = 2
+                 }
+               ]
+             }
+           ]
+            row_rules = [
+                                   {
+                                     rule_type = MIN_ROW
+                                     rule_value = 2
+                                   },
+                                   {
+                                     rule_type = MAX_ROW
+                                     rule_value = 2
+                                   }
+                                 ]
+         }
+
+ }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_equal.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_equal.conf
new file mode 100644
index 0000000000..af46a32789
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_equal.conf
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  kudu{
+   kudu_masters = "kudu-master:7051"
+   table_name = "kudu_source_table"
+   plugin_output = "kudu"
+   filter="id=11 AND val_bool=true AND val_int16=300 AND val_int32=30000 AND 
val_int64=30000000 AND val_float=1.0 AND val_double=2.0 AND val_string='test' 
AND val_unixtime_micros=1693477266998  "
+
+}
+}
+
+transform {
+}
+
+sink {
+   Assert{
+
+
+       rules =
+         {
+           field_rules = [
+             {
+               field_name = id
+               field_type = INT
+               field_value = [
+
+                 {
+                   rule_type = MIN
+                   rule_value = 11
+                 },
+                 {
+                   rule_type = MAX
+                   rule_value = 11
+                 }
+               ]
+             }
+           ]
+            row_rules = [
+                                   {
+                                     rule_type = MIN_ROW
+                                     rule_value = 1
+                                   },
+                                   {
+                                     rule_type = MAX_ROW
+                                     rule_value = 1
+                                   }
+                                 ]
+
+         }
+
+ }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_range.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_range.conf
new file mode 100644
index 0000000000..1b3f6604a3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_range.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  kudu{
+   kudu_masters = "kudu-master:7051"
+   table_name = "kudu_source_table"
+   plugin_output = "kudu"
+   filter="id>1 AND id<3"
+
+}
+}
+
+transform {
+}
+
+sink {
+   Assert{
+
+   plugin_input = "kudu"
+       rules =
+         {
+           field_rules = [
+             {
+               field_name = id
+               field_type = INT
+               field_value = [
+
+                 {
+                   rule_type = MIN
+                   rule_value = 2
+                 },
+                 {
+                   rule_type = MAX
+                   rule_value = 2
+                 }
+               ]
+             }
+           ]
+            row_rules = [
+                                                {
+                                                  rule_type = MIN_ROW
+                                                  rule_value = 1
+                                                },
+                                                {
+                                                  rule_type = MAX_ROW
+                                                  rule_value = 1
+                                                }
+                                              ]
+         }
+
+ }
+}
\ No newline at end of file

Reply via email to