This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 26e9330c06 [INLONG-10603][SDK] Transform SQL support arithmetic functions(Including power, abs, sqrt and ln) (#10606) 26e9330c06 is described below commit 26e9330c06178eff3a9bba6c5d30bb6c8348f70b Author: LeeWY <61183968+yfsn...@users.noreply.github.com> AuthorDate: Thu Jul 11 19:25:02 2024 +0800 [INLONG-10603][SDK] Transform SQL support arithmetic functions(Including power, abs, sqrt and ln) (#10606) Co-authored-by: jameswyli <jamesw...@tencent.com> --- .../transform/process/function/AbsFunction.java | 56 ++++++++++ .../sdk/transform/process/function/LnFunction.java | 56 ++++++++++ .../transform/process/function/PowerFunction.java | 60 +++++++++++ .../transform/process/function/SqrtFunction.java | 56 ++++++++++ .../transform/process/operator/OperatorTools.java | 33 ++++-- .../TestTransformArithmeticFunctionsProcessor.java | 120 +++++++++++++++++++++ 6 files changed, 373 insertions(+), 8 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java new file mode 100644 index 0000000000..cc56afb85e --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/AbsFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * AbsFunction + * description: abs(numeric)--returns the absolute value of numeric + */ +public class AbsFunction implements ValueParser { + + private ValueParser number; + + /** + * Constructor + * @param expr + */ + public AbsFunction(Function expr) { + number = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + Object numberObj = number.parse(sourceData, rowIndex); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); + return numberValue.abs(); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java new file mode 100644 index 0000000000..fde53b6db1 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/LnFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * LnFunction + * description: ln(numeric)--returns the natural logarithm (base e) of numeric + */ +public class LnFunction implements ValueParser { + + private ValueParser number; + + /** + * Constructor + * @param expr + */ + public LnFunction(Function expr) { + number = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + Object numberObj = number.parse(sourceData, rowIndex); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); + return Math.log(numberValue.doubleValue()); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java new file mode 100644 index 0000000000..598c8c45f1 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/PowerFunction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * PowerFunction + * description: power(numeric1, numeric2)--returns numeric1.power(numeric2) + */ +public class PowerFunction implements ValueParser { + + private ValueParser base; + private ValueParser exponent; + + /** + * Constructor + * @param expr + */ + public PowerFunction(Function expr) { + base = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + exponent = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1)); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + Object baseObj = base.parse(sourceData, rowIndex); + Object exponentObj = exponent.parse(sourceData, rowIndex); + BigDecimal baseValue = OperatorTools.parseBigDecimal(baseObj); + BigDecimal exponentValue = OperatorTools.parseBigDecimal(exponentObj); + return Math.pow(baseValue.doubleValue(), exponentValue.doubleValue()); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java new file mode 100644 index 0000000000..b3038ecaba --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/SqrtFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.math.BigDecimal; + +/** + * SqrtFunction + * description: sqrt(numeric)--returns the square root of numeric + */ +public class SqrtFunction implements ValueParser { + + private ValueParser number; + + /** + * Constructor + * @param expr + */ + public SqrtFunction(Function expr) { + number = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + /** + * parse + * @param sourceData + * @param rowIndex + * @return + */ + @Override + public Object parse(SourceData sourceData, int rowIndex) { + Object numberObj = number.parse(sourceData, rowIndex); + BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj); + return Math.sqrt(numberValue.doubleValue()); + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java index 14e7c20d4b..6e4dbace0f 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/operator/OperatorTools.java @@ -17,8 +17,12 @@ package org.apache.inlong.sdk.transform.process.operator; +import org.apache.inlong.sdk.transform.process.function.AbsFunction; import org.apache.inlong.sdk.transform.process.function.ConcatFunction; +import org.apache.inlong.sdk.transform.process.function.LnFunction; import org.apache.inlong.sdk.transform.process.function.NowFunction; +import org.apache.inlong.sdk.transform.process.function.PowerFunction; +import org.apache.inlong.sdk.transform.process.function.SqrtFunction; import org.apache.inlong.sdk.transform.process.parser.AdditionParser; import org.apache.inlong.sdk.transform.process.parser.ColumnParser; import org.apache.inlong.sdk.transform.process.parser.DivisionParser; @@ -51,6 +55,8 @@ import net.sf.jsqlparser.schema.Column; import org.apache.commons.lang.ObjectUtils; import java.math.BigDecimal; +import java.util.HashMap; +import java.util.Map; /** * OperatorTools @@ -62,6 +68,17 @@ public class OperatorTools { public static final String CHILD_KEY = "$child"; + private static final Map<String, java.util.function.Function<Function, ValueParser>> functionMap = new HashMap<>(); + + static { + functionMap.put("concat", ConcatFunction::new); + functionMap.put("now", NowFunction::new); + functionMap.put("power", PowerFunction::new); + functionMap.put("abs", AbsFunction::new); + functionMap.put("sqrt", SqrtFunction::new); + functionMap.put("ln", LnFunction::new); + } + public static ExpressionOperator buildOperator(Expression expr) { if (expr instanceof AndExpression) { return new AndOperator((AndExpression) expr); @@ -111,13 +128,12 @@ public class OperatorTools { } else { // TODO Function func = (Function) expr; - switch (func.getName()) { - case "concat": - return new ConcatFunction(func); - case "now": - return new NowFunction(func); - default: - return new ColumnParser(func); + java.util.function.Function<Function, ValueParser> valueParserConstructor = + functionMap.get(func.getName().toLowerCase()); + if (valueParserConstructor != null) { + return valueParserConstructor.apply(func); + } else { + return new ColumnParser(func); } } } @@ -139,7 +155,8 @@ public class OperatorTools { /** * compareValue - * @param value + * @param left + * @param right * @return */ @SuppressWarnings("rawtypes") diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java new file mode 100644 index 0000000000..fef3d23e1f --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/TestTransformArithmeticFunctionsProcessor.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process; + +import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo; +import org.apache.inlong.sdk.transform.pojo.FieldInfo; +import org.apache.inlong.sdk.transform.pojo.KvSinkInfo; +import org.apache.inlong.sdk.transform.pojo.SinkInfo; +import org.apache.inlong.sdk.transform.pojo.SourceInfo; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * TestArithmeticFunctionsTransformProcessor + * description: test the arithmetic functions in transform processor + */ +public class TestTransformArithmeticFunctionsProcessor { + + private static final List<FieldInfo> srcFields = new ArrayList<>(); + private static final List<FieldInfo> dstFields = new ArrayList<>(); + private static final SourceInfo csvSource; + private static final SinkInfo kvSink; + static { + for (int i = 1; i < 5; i++) { + FieldInfo field = new FieldInfo(); + field.setName("numeric" + i); + srcFields.add(field); + } + FieldInfo field = new FieldInfo(); + field.setName("result"); + dstFields.add(field); + csvSource = new CsvSourceInfo("UTF-8", "|", "\\", srcFields); + kvSink = new KvSinkInfo("UTF-8", dstFields); + } + + @Test + public void testPowerFunction() throws Exception { + String transformSql = "select power(numeric1, numeric2) from source"; + TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + // case1: 2^4 + TransformProcessor processor = new TransformProcessor(config); + List<String> output1 = processor.transform("2|4|6|8", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "result=16.0"); + // case2: 2^(-2) + List<String> output2 = processor.transform("2|-2|6|8", new HashMap<>()); + Assert.assertTrue(output2.size() == 1); + Assert.assertEquals(output2.get(0), "result=0.25"); + // case3: 4^(0.5) + List<String> output3 = processor.transform("4|0.5|6|8", new HashMap<>()); + Assert.assertTrue(output3.size() == 1); + Assert.assertEquals(output3.get(0), "result=2.0"); + } + + @Test + public void testAbsFunction() throws Exception { + String transformSql = "select abs(numeric1) from source"; + TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + // case1: |2| + TransformProcessor processor = new TransformProcessor(config); + List<String> output1 = processor.transform("2|4|6|8", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "result=2"); + // case2: |-4.25| + List<String> output2 = processor.transform("-4.25|4|6|8", new HashMap<>()); + Assert.assertTrue(output2.size() == 1); + Assert.assertEquals(output2.get(0), "result=4.25"); + } + + @Test + public void testSqrtFunction() throws Exception { + String transformSql = "select sqrt(numeric1) from source"; + TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + // case1: sqrt(9) + TransformProcessor processor = new TransformProcessor(config); + List<String> output1 = processor.transform("9|4|6|8", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "result=3.0"); + // case2: sqrt(5) + List<String> output2 = processor.transform("5|4|6|8", new HashMap<>()); + Assert.assertTrue(output2.size() == 1); + Assert.assertEquals(output2.get(0), "result=2.23606797749979"); + } + + @Test + public void testLnFunction() throws Exception { + String transformSql = "select ln(numeric1) from source"; + TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql); + // case1: ln(1) + TransformProcessor processor = new TransformProcessor(config); + List<String> output1 = processor.transform("1|4|6|8", new HashMap<>()); + Assert.assertTrue(output1.size() == 1); + Assert.assertEquals(output1.get(0), "result=0.0"); + // case2: ln(10) + List<String> output2 = processor.transform("10|4|6|8", new HashMap<>()); + Assert.assertTrue(output2.size() == 1); + Assert.assertEquals(output2.get(0), "result=2.302585092994046"); + } +}