This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2714dd1105 [Feature][connector-kudu] implement the filter (#9405)
2714dd1105 is described below
commit 2714dd11058cff071035358d3ffc72489f005fdd
Author: WenDing-Y <[email protected]>
AuthorDate: Mon Jun 9 11:45:44 2025 +0800
[Feature][connector-kudu] implement the filter (#9405)
---
docs/en/connector-v2/source/Kudu.md | 4 +-
.../kudu/config/KuduSourceTableConfig.java | 6 +-
.../connectors/seatunnel/kudu/util/KuduUtil.java | 108 ++++++++++++++++++++-
.../seatunnel/e2e/connector/kudu/KuduIT.java | 14 +++
.../src/test/resources/kudu_to_assert.conf | 76 +++++++++++++++
.../src/test/resources/kudu_to_assert_equal.conf | 78 +++++++++++++++
.../src/test/resources/kudu_to_assert_range.conf | 77 +++++++++++++++
7 files changed, 358 insertions(+), 5 deletions(-)
diff --git a/docs/en/connector-v2/source/Kudu.md
b/docs/en/connector-v2/source/Kudu.md
index 7a57b8dba6..22e1442d56 100644
--- a/docs/en/connector-v2/source/Kudu.md
+++ b/docs/en/connector-v2/source/Kudu.md
@@ -44,7 +44,7 @@ The tested kudu version is 1.11.1.
## Source Options
-| Name | Type | Required |
Default |
Description
|
+| Name | Type | Required | Default
| Description
|
|-------------------------------------------|--------|----------|------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| kudu_masters | String | Yes | -
| Kudu master address. Separated by ',',such
as '192.168.88.110:7051'.
|
| table_name | String | Yes | -
| The name of kudu table.
|
@@ -57,7 +57,7 @@ The tested kudu version is 1.11.1.
| kerberos_krb5conf | String | No | -
| Kerberos krb5 conf. Note that all zeta
nodes require have this file.
|
| scan_token_query_timeout | Long | No | 30000
| The timeout for connecting scan token. If
not set, it will be the same as operationTimeout.
|
| scan_token_batch_size_bytes | Int | No | 1024 * 1024
| Kudu scan bytes. The maximum number of
bytes read at a time, the default is 1MB.
|
-| filter | Int | No | 1024 * 1024
| Kudu scan filter expressions,Not supported
yet.
|
+| filter | String | No | -
| Kudu scan filter expressions,example id >
100 AND id < 200.
|
| schema | Map | No | 1024 * 1024
| SeaTunnel Schema.
|
| table_list | Array | No | -
| The list of tables to be read. you can use
this configuration instead of `table_path` example: ```table_list = [{
table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ``` |
| common-options | | No | -
| Source plugin common parameters, please
refer to [Source Common Options](../source-common-options.md) for details.
|
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
index 09dadd81bc..742c33a46d 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
@@ -45,9 +45,10 @@ public class KuduSourceTableConfig implements Serializable {
private String filter;
- private KuduSourceTableConfig(String tablePath, CatalogTable catalogTable)
{
+ private KuduSourceTableConfig(String tablePath, CatalogTable catalogTable,
String filter) {
this.tablePath = TablePath.of(tablePath);
this.catalogTable = catalogTable;
+ this.filter = filter;
}
public static List<KuduSourceTableConfig> of(ReadonlyConfig config) {
@@ -82,6 +83,7 @@ public class KuduSourceTableConfig implements Serializable {
catalogTable =
kuduCatalog.getTable(TablePath.of(config.get(KuduBaseOptions.TABLE_NAME)));
}
- return new KuduSourceTableConfig(tableName, catalogTable);
+ return new KuduSourceTableConfig(
+ tableName, catalogTable, config.get(KuduSourceOptions.FILTER));
}
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java
index 27374d0a3a..e6ff36b9af 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java
@@ -28,9 +28,12 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanToken;
import org.apache.kudu.client.KuduTable;
@@ -42,6 +45,8 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
@Slf4j
public class KuduUtil {
@@ -139,6 +144,107 @@ public class KuduUtil {
private static void addPredicates(
KuduScanToken.KuduScanTokenBuilder kuduScanTokenBuilder, String
filter, Schema schema) {
- // todo Support for where condition
+
+ log.info("Adding predicates to Kudu scan token: {}", filter);
+
+ List<ColumnSchema> columns = schema.getColumns();
+ for (ColumnSchema column : columns) {
+ log.info(" column name " + column.getName());
+ }
+
+ if (StringUtils.isBlank(filter)) {
+ return;
+ }
+
+ List<String> conditions =
Arrays.asList(filter.trim().split("\\s+AND\\s+"));
+
+ Pattern pattern = Pattern.compile("(\\w+)\\s*([=><]=?|<=|>=)\\s*(.+)");
+ for (String condition : conditions) {
+ Matcher matcher = pattern.matcher(condition.trim());
+
+ String column = null;
+ String op = null;
+ String value = null;
+
+ if (matcher.matches()) {
+ column = matcher.group(1);
+ op = matcher.group(2);
+ value = matcher.group(3);
+ } else {
+ throw new IllegalArgumentException("Invalid filter condition:
" + condition);
+ }
+
+ if (!schema.hasColumn(column)) {
+ throw new KuduConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "Column not found in Kudu schema: " + column);
+ }
+
+ Type type = schema.getColumn(column).getType();
+
+ KuduPredicate.ComparisonOp comparisonOp = null;
+ switch (op) {
+ case "=":
+ comparisonOp = KuduPredicate.ComparisonOp.EQUAL;
+ break;
+ case ">":
+ comparisonOp = KuduPredicate.ComparisonOp.GREATER;
+ break;
+ case ">=":
+ comparisonOp = KuduPredicate.ComparisonOp.GREATER_EQUAL;
+ break;
+ case "<":
+ comparisonOp = KuduPredicate.ComparisonOp.LESS;
+ break;
+ case "<=":
+ comparisonOp = KuduPredicate.ComparisonOp.LESS_EQUAL;
+ break;
+ default:
+ throw new KuduConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "Unsupported operator: " + op);
+ }
+
+ Object parsedValue = parseValue(type, value);
+
+ KuduPredicate predicate =
+ KuduPredicate.newComparisonPredicate(
+ schema.getColumn(column), comparisonOp,
parsedValue);
+ kuduScanTokenBuilder.addPredicate(predicate);
+ }
+ }
+
+ private static Object parseValue(Type type, String value) {
+ try {
+ switch (type.getDataType()) {
+ case INT8:
+ return Byte.valueOf(value);
+ case INT16:
+ return Short.valueOf(value);
+ case INT32:
+ return Integer.valueOf(value);
+ case INT64:
+ return Long.valueOf(value);
+ case STRING:
+ return value.startsWith("'") && value.endsWith("'")
+ ? value.substring(1, value.length() - 1)
+ : value;
+ case BOOL:
+ return Boolean.valueOf(value);
+ case UNIXTIME_MICROS:
+ return new java.sql.Timestamp(Long.parseLong(value));
+ case FLOAT:
+ return Float.valueOf(value);
+ case DOUBLE:
+ return Double.valueOf(value);
+ default:
+ throw new IllegalArgumentException("Unsupported type: " +
type);
+ }
+ } catch (NumberFormatException e) {
+ throw new KuduConnectorException(
+ CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+ "Failed to parse value '" + value + "' as type " + type,
+ e);
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index 255f1c5bfe..047e5bc07f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -340,6 +340,20 @@ public class KuduIT extends TestSuiteBase implements
TestResource {
kuduClient.deleteTable(KUDU_SINK_TABLE);
}
+ @TestTemplate
+ public void testKuduFilter(TestContainer container) throws IOException,
InterruptedException {
+ initializeKuduTable();
+ batchInsertData();
+ Container.ExecResult execResult =
container.executeJob("/kudu_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ Container.ExecResult execResultRange =
container.executeJob("/kudu_to_assert_range.conf");
+ Assertions.assertEquals(0, execResultRange.getExitCode());
+ Container.ExecResult execResultEqual =
container.executeJob("/kudu_to_assert_equal.conf");
+ Assertions.assertEquals(0, execResultEqual.getExitCode());
+ kuduClient.deleteTable(KUDU_SOURCE_TABLE);
+ kuduClient.deleteTable(KUDU_SINK_TABLE);
+ }
+
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert.conf
new file mode 100644
index 0000000000..3b033909ca
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ kudu{
+ kudu_masters = "kudu-master:7051"
+ table_name = "kudu_source_table"
+ plugin_output = "kudu"
+ filter = "id>=1 AND id<=2"
+
+}
+}
+
+transform {
+
+}
+
+sink {
+ Assert{
+ plugin_input = "kudu"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = INT
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 2
+ }
+ ]
+ }
+ ]
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ }
+ ]
+ }
+
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_equal.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_equal.conf
new file mode 100644
index 0000000000..af46a32789
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_equal.conf
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ kudu{
+ kudu_masters = "kudu-master:7051"
+ table_name = "kudu_source_table"
+ plugin_output = "kudu"
+ filter="id=11 AND val_bool=true AND val_int16=300 AND val_int32=30000 AND
val_int64=30000000 AND val_float=1.0 AND val_double=2.0 AND val_string='test'
AND val_unixtime_micros=1693477266998 "
+
+}
+}
+
+transform {
+}
+
+sink {
+ Assert{
+
+
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = INT
+ field_value = [
+
+ {
+ rule_type = MIN
+ rule_value = 11
+ },
+ {
+ rule_type = MAX
+ rule_value = 11
+ }
+ ]
+ }
+ ]
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ }
+ ]
+
+ }
+
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_range.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_range.conf
new file mode 100644
index 0000000000..1b3f6604a3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_range.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ kudu{
+ kudu_masters = "kudu-master:7051"
+ table_name = "kudu_source_table"
+ plugin_output = "kudu"
+ filter="id>1 AND id<3"
+
+}
+}
+
+transform {
+}
+
+sink {
+ Assert{
+
+ plugin_input = "kudu"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = INT
+ field_value = [
+
+ {
+ rule_type = MIN
+ rule_value = 2
+ },
+ {
+ rule_type = MAX
+ rule_value = 2
+ }
+ ]
+ }
+ ]
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 1
+ },
+ {
+ rule_type = MAX_ROW
+ rule_value = 1
+ }
+ ]
+ }
+
+ }
+}
\ No newline at end of file