This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 70968a9d4b [INLONG-11091][Manager] Manager supports in filter function 
configuration (#11094)
70968a9d4b is described below

commit 70968a9d4b26a2d95acb845ae68ce4d0d80706a0
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Fri Sep 13 13:10:54 2024 +0800

    [INLONG-11091][Manager] Manager supports in filter function configuration 
(#11094)
---
 .../pojo/sort/util/FilterFunctionUtils.java        | 30 +++++++++++++++++++---
 .../pojo/transform/TransformDefinition.java        |  2 +-
 .../pojo/transform/filter/FilterDefinition.java    |  2 ++
 .../pojo/transform/TransformDefinitionTest.java    |  4 +--
 .../service/sink/StreamSinkServiceImpl.java        |  2 +-
 5 files changed, 33 insertions(+), 7 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
index 1b3eadc7f2..c66dccd511 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java
@@ -31,16 +31,20 @@ import 
org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.FilterRu
 import 
org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.TargetValue;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.transformation.CompareOperator;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
 import org.apache.inlong.sort.protocol.transformation.FilterFunction;
 import org.apache.inlong.sort.protocol.transformation.FunctionParam;
 import org.apache.inlong.sort.protocol.transformation.LogicOperator;
+import 
org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator;
 import 
org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator;
 import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
+import 
org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
 import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.InOperator;
 import 
org.apache.inlong.sort.protocol.transformation.operator.IsNotNullOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.IsNullOperator;
 import 
org.apache.inlong.sort.protocol.transformation.operator.LessThanOperator;
@@ -51,13 +55,17 @@ import 
org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
 /**
  * Util for creat filter function.
  */
+@Slf4j
 public class FilterFunctionUtils {
 
     /**
@@ -155,12 +163,26 @@ public class FilterFunctionUtils {
                     FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
         }
         OperationType operationType = filterRule.getOperationType();
-        SingleValueCompareOperator compareOperator = 
parseCompareOperator(operationType);
+        CompareOperator compareOperator = parseCompareOperator(operationType);
         TargetValue targetValue = filterRule.getTargetValue();
         FunctionParam target = parseTargetValue(targetValue, transformName);
         RuleRelation relationWithPost = filterRule.getRelationWithPost();
         LogicOperator logicOperator = parseLogicOperator(relationWithPost);
-        return new SingleValueFilterFunction(logicOperator, sourceFieldInfo, 
compareOperator, target);
+        if (compareOperator instanceof SingleValueCompareOperator) {
+            return new SingleValueFilterFunction(logicOperator, 
sourceFieldInfo,
+                    (SingleValueCompareOperator) compareOperator, target);
+        } else {
+            List<FunctionParam> targets = new ArrayList<>();
+            if (CollectionUtils.isNotEmpty(filterRule.getTargetValues())) {
+                for (TargetValue value : filterRule.getTargetValues()) {
+                    targets.add(parseTargetValue(value, transformName));
+                }
+            } else {
+                targets.add(target);
+            }
+            return new MultiValueFilterFunction(sourceFieldInfo, targets, 
(MultiValueCompareOperator) compareOperator,
+                    logicOperator);
+        }
     }
 
     private static LogicOperator parseLogicOperator(RuleRelation relation) {
@@ -199,7 +221,7 @@ public class FilterFunctionUtils {
         }
     }
 
-    private static SingleValueCompareOperator 
parseCompareOperator(OperationType operationType) {
+    private static CompareOperator parseCompareOperator(OperationType 
operationType) {
         switch (operationType) {
             case eq:
                 return EqualOperator.getInstance();
@@ -217,6 +239,8 @@ public class FilterFunctionUtils {
                 return IsNullOperator.getInstance();
             case not_null:
                 return IsNotNullOperator.getInstance();
+            case in:
+                return InOperator.getInstance();
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
operateType=%s", operationType));
         }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java
index 95c119ef25..f1aafdab01 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java
@@ -36,7 +36,7 @@ public abstract class TransformDefinition {
 
     @JsonFormat
     public enum OperationType {
-        lt, le, eq, ne, ge, gt, is_null, not_null
+        lt, le, eq, ne, ge, gt, is_null, not_null, in
     }
 
     @JsonFormat
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java
index 9cec3550de..bdd642cadf 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java
@@ -105,6 +105,8 @@ public class FilterDefinition extends TransformDefinition {
 
         private TargetValue targetValue;
 
+        private List<TargetValue> targetValues;
+
         private RuleRelation relationWithPost;
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java
 
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java
index f68bf36f62..4d5cecbad3 100644
--- 
a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java
+++ 
b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java
@@ -95,9 +95,9 @@ public class TransformDefinitionTest {
     private List<FilterRule> createFilterRule() {
         List<FilterRule> filterRules = Lists.newArrayList();
         filterRules.add(new FilterRule(new StreamField(0, 
FieldType.STRING.toString(), "name", null, null),
-                OperationType.not_null, null, RuleRelation.OR));
+                OperationType.not_null, null, null, RuleRelation.OR));
         filterRules.add(new FilterRule(new StreamField(1, 
FieldType.INT.toString(), "age", null, null),
-                OperationType.gt, new TargetValue(true, null, "50"), null));
+                OperationType.gt, new TargetValue(true, null, "50"), null, 
null));
         return filterRules;
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 9177253d90..4ce1d1c76c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -481,7 +481,7 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
             this.startProcessForSink(request.getInlongGroupId(), 
request.getInlongStreamId(), operator);
         }
 
-        LOGGER.info("success to update sink by id: {}", request);
+        LOGGER.info("success to update sink by id: {}", request.getId());
         return true;
     }
 

Reply via email to