This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 70968a9d4b [INLONG-11091][Manager] Manager supports in filter function configuration (#11094) 70968a9d4b is described below commit 70968a9d4b26a2d95acb845ae68ce4d0d80706a0 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Fri Sep 13 13:10:54 2024 +0800 [INLONG-11091][Manager] Manager supports in filter function configuration (#11094) --- .../pojo/sort/util/FilterFunctionUtils.java | 30 +++++++++++++++++++--- .../pojo/transform/TransformDefinition.java | 2 +- .../pojo/transform/filter/FilterDefinition.java | 2 ++ .../pojo/transform/TransformDefinitionTest.java | 4 +-- .../service/sink/StreamSinkServiceImpl.java | 2 +- 5 files changed, 33 insertions(+), 7 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java index 1b3eadc7f2..c66dccd511 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FilterFunctionUtils.java @@ -31,16 +31,20 @@ import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.FilterRu import org.apache.inlong.manager.pojo.transform.filter.FilterDefinition.TargetValue; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.enums.FilterStrategy; +import org.apache.inlong.sort.protocol.transformation.CompareOperator; import org.apache.inlong.sort.protocol.transformation.ConstantParam; import org.apache.inlong.sort.protocol.transformation.FilterFunction; import org.apache.inlong.sort.protocol.transformation.FunctionParam; import org.apache.inlong.sort.protocol.transformation.LogicOperator; +import org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator; import org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator; import org.apache.inlong.sort.protocol.transformation.function.CustomFunction; +import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.operator.AndOperator; import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; +import org.apache.inlong.sort.protocol.transformation.operator.InOperator; import org.apache.inlong.sort.protocol.transformation.operator.IsNotNullOperator; import org.apache.inlong.sort.protocol.transformation.operator.IsNullOperator; import org.apache.inlong.sort.protocol.transformation.operator.LessThanOperator; @@ -51,13 +55,17 @@ import org.apache.inlong.sort.protocol.transformation.operator.NotEqualOperator; import org.apache.inlong.sort.protocol.transformation.operator.OrOperator; import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** * Util for creat filter function. */ +@Slf4j public class FilterFunctionUtils { /** @@ -155,12 +163,26 @@ public class FilterFunctionUtils { FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat)); } OperationType operationType = filterRule.getOperationType(); - SingleValueCompareOperator compareOperator = parseCompareOperator(operationType); + CompareOperator compareOperator = parseCompareOperator(operationType); TargetValue targetValue = filterRule.getTargetValue(); FunctionParam target = parseTargetValue(targetValue, transformName); RuleRelation relationWithPost = filterRule.getRelationWithPost(); LogicOperator logicOperator = parseLogicOperator(relationWithPost); - return new SingleValueFilterFunction(logicOperator, sourceFieldInfo, compareOperator, target); + if (compareOperator instanceof SingleValueCompareOperator) { + return new SingleValueFilterFunction(logicOperator, sourceFieldInfo, + (SingleValueCompareOperator) compareOperator, target); + } else { + List<FunctionParam> targets = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(filterRule.getTargetValues())) { + for (TargetValue value : filterRule.getTargetValues()) { + targets.add(parseTargetValue(value, transformName)); + } + } else { + targets.add(target); + } + return new MultiValueFilterFunction(sourceFieldInfo, targets, (MultiValueCompareOperator) compareOperator, + logicOperator); + } } private static LogicOperator parseLogicOperator(RuleRelation relation) { @@ -199,7 +221,7 @@ public class FilterFunctionUtils { } } - private static SingleValueCompareOperator parseCompareOperator(OperationType operationType) { + private static CompareOperator parseCompareOperator(OperationType operationType) { switch (operationType) { case eq: return EqualOperator.getInstance(); @@ -217,6 +239,8 @@ public class FilterFunctionUtils { return IsNullOperator.getInstance(); case not_null: return IsNotNullOperator.getInstance(); + case in: + return InOperator.getInstance(); default: throw new IllegalArgumentException(String.format("Unsupported operateType=%s", operationType)); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java index 95c119ef25..f1aafdab01 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/TransformDefinition.java @@ -36,7 +36,7 @@ public abstract class TransformDefinition { @JsonFormat public enum OperationType { - lt, le, eq, ne, ge, gt, is_null, not_null + lt, le, eq, ne, ge, gt, is_null, not_null, in } @JsonFormat diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java index 9cec3550de..bdd642cadf 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/transform/filter/FilterDefinition.java @@ -105,6 +105,8 @@ public class FilterDefinition extends TransformDefinition { private TargetValue targetValue; + private List<TargetValue> targetValues; + private RuleRelation relationWithPost; } diff --git a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java index f68bf36f62..4d5cecbad3 100644 --- a/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java +++ b/inlong-manager/manager-pojo/src/test/java/org/apache/inlong/manager/pojo/transform/TransformDefinitionTest.java @@ -95,9 +95,9 @@ public class TransformDefinitionTest { private List<FilterRule> createFilterRule() { List<FilterRule> filterRules = Lists.newArrayList(); filterRules.add(new FilterRule(new StreamField(0, FieldType.STRING.toString(), "name", null, null), - OperationType.not_null, null, RuleRelation.OR)); + OperationType.not_null, null, null, RuleRelation.OR)); filterRules.add(new FilterRule(new StreamField(1, FieldType.INT.toString(), "age", null, null), - OperationType.gt, new TargetValue(true, null, "50"), null)); + OperationType.gt, new TargetValue(true, null, "50"), null, null)); return filterRules; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 9177253d90..4ce1d1c76c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -481,7 +481,7 @@ public class StreamSinkServiceImpl implements StreamSinkService { this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator); } - LOGGER.info("success to update sink by id: {}", request); + LOGGER.info("success to update sink by id: {}", request.getId()); return true; }