This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a375f9e9b8 [INLONG-11220][SDK] Transform support GREATEST() and 
LEAST() function (#11255)
a375f9e9b8 is described below

commit a375f9e9b8357b122aa7c74112d2c596267f18f4
Author: emptyOVO <118812562+empty...@users.noreply.github.com>
AuthorDate: Tue Oct 8 12:54:17 2024 +0800

    [INLONG-11220][SDK] Transform support GREATEST() and LEAST() function 
(#11255)
---
 .../process/function/GreatestFunction.java         | 71 ++++++++++++++++
 .../transform/process/function/LeastFunction.java  | 71 ++++++++++++++++
 .../AbstractFunctionArithmeticTestBase.java        |  2 +-
 .../function/arithmetic/TestGreatestFunction.java  | 97 ++++++++++++++++++++++
 .../function/arithmetic/TestLeastFunction.java     | 80 ++++++++++++++++++
 .../process/operator/TestIsBooleanOperator.java    | 24 ++++++
 6 files changed, 344 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GreatestFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GreatestFunction.java
new file mode 100644
index 0000000000..96218baa72
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/GreatestFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+/**
+ * GreatestFunction
+ * description: GREATEST(value1[, value2]*)--Returns the greatest value of the 
list of arguments.
+ *              Returns NULL if any argument is NULL.
+ */
+@TransformFunction(names = {"greatest"})
+public class GreatestFunction implements ValueParser {
+
+    private List<ValueParser> parserList;
+
+    public GreatestFunction(Function expr) {
+        if (expr.getParameters() == null) {
+            this.parserList = new ArrayList<>();
+        } else {
+            List<Expression> params = expr.getParameters().getExpressions();
+            parserList = new ArrayList<>(params.size());
+            for (Expression param : params) {
+                ValueParser node = OperatorTools.buildParser(param);
+                parserList.add(node);
+            }
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        BigDecimal maxValue = null;
+        for (ValueParser valueParser : parserList) {
+            Object valueObj = valueParser.parse(sourceData, rowIndex, context);
+            if (valueObj == null) {
+                return null;
+            }
+
+            BigDecimal value = OperatorTools.parseBigDecimal(valueObj);
+            if (maxValue == null || value.compareTo(maxValue) > 0) {
+                maxValue = value;
+            }
+        }
+        return maxValue;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LeastFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LeastFunction.java
new file mode 100644
index 0000000000..08fe5ca626
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LeastFunction.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+/**
+ * LeastFunction
+ * description: LEAST(value1[, value2]*)--Returns the least value of the list 
of arguments.
+ *              Returns NULL if any argument is NULL.
+ */
+@TransformFunction(names = {"least"})
+public class LeastFunction implements ValueParser {
+
+    private List<ValueParser> parserList;
+
+    public LeastFunction(Function expr) {
+        if (expr.getParameters() == null) {
+            this.parserList = new ArrayList<>();
+        } else {
+            List<Expression> params = expr.getParameters().getExpressions();
+            parserList = new ArrayList<>(params.size());
+            for (Expression param : params) {
+                ValueParser node = OperatorTools.buildParser(param);
+                parserList.add(node);
+            }
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        BigDecimal minValue = null;
+        for (ValueParser valueParser : parserList) {
+            Object valueObj = valueParser.parse(sourceData, rowIndex, context);
+            if (valueObj == null) {
+                return null;
+            }
+
+            BigDecimal value = OperatorTools.parseBigDecimal(valueObj);
+            if (minValue == null || value.compareTo(minValue) < 0) {
+                minValue = value;
+            }
+        }
+        return minValue;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/AbstractFunctionArithmeticTestBase.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/AbstractFunctionArithmeticTestBase.java
index 94e4912a7f..c37acb466d 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/AbstractFunctionArithmeticTestBase.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/AbstractFunctionArithmeticTestBase.java
@@ -37,7 +37,7 @@ public abstract class AbstractFunctionArithmeticTestBase {
     protected static final KvSinkInfo kvSink;
 
     static {
-        for (int i = 1; i < 5; i++) {
+        for (int i = 1; i <= 5; i++) {
             FieldInfo field = new FieldInfo();
             field.setName("numeric" + i);
             srcFields.add(field);
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGreatestFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGreatestFunction.java
new file mode 100644
index 0000000000..1dafa463c8
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestGreatestFunction.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.arithmetic;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestGreatestFunction extends AbstractFunctionArithmeticTestBase {
+
+    @Test
+    public void testGreatestFunction() throws Exception {
+        String transformSql1 = "select greatest(numeric1, greatest(numeric2, 
numeric3, numeric4)) from source";
+        TransformConfig config1 = new TransformConfig(transformSql1);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config1, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case: greatest(1, greatest(2, 3, 4))
+        List<String> output1 = processor1.transform("1|2|3|4", new 
HashMap<>());
+        Assert.assertEquals(1, output1.size());
+        Assert.assertEquals(output1.get(0), "result=4");
+
+        // case: greatest(3.14, greatest(7, 2, 1))
+        List<String> output2 = processor1.transform("3.14|7|2|1", new 
HashMap<>());
+        Assert.assertEquals(1, output2.size());
+        Assert.assertEquals(output2.get(0), "result=7");
+
+        String transformSql2 = "select greatest(numeric1, numeric2, 
greatest(numeric3, numeric4)) from source";
+        TransformConfig config2 = new TransformConfig(transformSql2);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config2, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case: greatest(3.141592653589793, 3, greatest(4, 1))
+        List<String> output3 = processor2.transform("3.141592653589793|3|4|1", 
new HashMap<>());
+        Assert.assertEquals(1, output3.size());
+        Assert.assertEquals(output3.get(0), "result=4");
+
+        // case: greatest(-9223372036854775808, 1, greatest(-2, 3))
+        List<String> output4 = 
processor2.transform("-9223372036854775808|1|-2|3", new HashMap<>());
+        Assert.assertEquals(1, output4.size());
+        Assert.assertEquals(output4.get(0), "result=3");
+
+        String transformSql3 =
+                "select greatest(numeric1, greatest(numeric2, numeric3), 
greatest(numeric4, numeric5)) from source";
+        TransformConfig config3 = new TransformConfig(transformSql3);
+        TransformProcessor<String, String> processor3 = TransformProcessor
+                .create(config3, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case: greatest(1, greatest(-2, -5), greatest(3.14836, 8))
+        List<String> output5 = processor3.transform("1|-2|-5|3.14836|8", new 
HashMap<>());
+        Assert.assertEquals(1, output5.size());
+        Assert.assertEquals(output5.get(0), "result=8");
+
+        String transformSql4 =
+                "select greatest(numeric1, least(numeric2, numeric3), 
greatest(numeric4, numeric5)) from source";
+        TransformConfig config4 = new TransformConfig(transformSql4);
+        TransformProcessor<String, String> processor4 = TransformProcessor
+                .create(config4, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case: greatest(1, least(89, 10), greatest(3.14836, 8))
+        List<String> output6 = processor4.transform("-1|89|10|3.14836|8", new 
HashMap<>());
+        Assert.assertEquals(1, output6.size());
+        Assert.assertEquals(output6.get(0), "result=10");
+
+        // case: greatest(1, least(-2, ), greatest(3.14836, 8))
+        List<String> output7 = processor4.transform("1|-2||3.14836|8", new 
HashMap<>());
+        Assert.assertEquals(1, output7.size());
+        Assert.assertEquals(output7.get(0), "result=");
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestLeastFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestLeastFunction.java
new file mode 100644
index 0000000000..9fe2698bc6
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/arithmetic/TestLeastFunction.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.arithmetic;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestLeastFunction extends AbstractFunctionArithmeticTestBase {
+
+    @Test
+    public void testLeastFunction() throws Exception {
+        String transformSql1 = "select least(numeric1, least(numeric2, 
numeric3, numeric4)) from source";
+        TransformConfig config1 = new TransformConfig(transformSql1);
+        TransformProcessor<String, String> processor1 = TransformProcessor
+                .create(config1, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case: least(3.14, least(7, 2, 1))
+        List<String> output2 = processor1.transform("3.14|7|2|1", new 
HashMap<>());
+        Assert.assertEquals(1, output2.size());
+        Assert.assertEquals(output2.get(0), "result=1");
+
+        String transformSql2 = "select least(numeric1, numeric2, 
least(numeric3, numeric4)) from source";
+        TransformConfig config2 = new TransformConfig(transformSql2);
+        TransformProcessor<String, String> processor2 = TransformProcessor
+                .create(config2, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case: least(3.141592653589793, 4, least(3.33, 3.4))
+        List<String> output3 = 
processor2.transform("3.141592653589793|4|3.33|3.4", new HashMap<>());
+        Assert.assertEquals(1, output3.size());
+        Assert.assertEquals(output3.get(0), "result=3.141592653589793");
+
+        // case: least(-9223372036854775808, 1, least(-2, 3))
+        List<String> output4 = 
processor2.transform("-9223372036854775808|1|-2|3", new HashMap<>());
+        Assert.assertEquals(1, output4.size());
+        Assert.assertEquals(output4.get(0), "result=-9223372036854775808");
+
+        String transformSql3 =
+                "select least(numeric1, least(numeric2, numeric3), 
least(numeric4, numeric5)) from source";
+        TransformConfig config3 = new TransformConfig(transformSql3);
+        TransformProcessor<String, String> processor3 = TransformProcessor
+                .create(config3, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case: least(1, least(-2, -5), least(3.14836, 8))
+        List<String> output5 = processor3.transform("1|-2|-5|3.14836|8", new 
HashMap<>());
+        Assert.assertEquals(1, output5.size());
+        Assert.assertEquals(output5.get(0), "result=-5");
+
+        // case: least(1, least(-2, -5), least(, 8))
+        List<String> output6 = processor3.transform("1|-2|-5||8", new 
HashMap<>());
+        Assert.assertEquals(1, output6.size());
+        Assert.assertEquals(output6.get(0), "result=");
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator/TestIsBooleanOperator.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator/TestIsBooleanOperator.java
index 8aec150567..934618f5e2 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator/TestIsBooleanOperator.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/operator/TestIsBooleanOperator.java
@@ -19,18 +19,42 @@ package org.apache.inlong.sdk.transform.process.operator;
 
 import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
 import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.TransformConfig;
 import org.apache.inlong.sdk.transform.process.TransformProcessor;
+import org.apache.inlong.sdk.transform.process.converter.BooleanConverter;
 import 
org.apache.inlong.sdk.transform.process.function.arithmetic.AbstractFunctionArithmeticTestBase;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
 public class TestIsBooleanOperator extends AbstractFunctionArithmeticTestBase {
 
+    private static final List<FieldInfo> srcFields = new ArrayList<>();
+    private static final List<FieldInfo> dstFields = new ArrayList<>();
+    private static final CsvSourceInfo csvSource;
+    private static final KvSinkInfo kvSink;
+
+    static {
+        for (int i = 1; i < 5; i++) {
+            FieldInfo field = new FieldInfo();
+            field.setName("numeric" + i);
+            srcFields.add(field);
+        }
+        srcFields.add(new FieldInfo("booleanVal", new BooleanConverter()));
+        FieldInfo field = new FieldInfo();
+        field.setName("result");
+        dstFields.add(field);
+        csvSource = new CsvSourceInfo("UTF-8", '|', '\\', srcFields);
+        kvSink = new KvSinkInfo("UTF-8", dstFields);
+    }
+
     @Test
     public void testIsBooleanOperator() throws Exception {
         String transformSql = null, data = null;

Reply via email to