This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f82cc83585 [INLONG-11241][SDK] Transform support ARRAY() function (#11274) f82cc83585 is described below commit f82cc83585759531cf369e17fe34263ba5fa38ba Author: emptyOVO <118812562+empty...@users.noreply.github.com> AuthorDate: Tue Oct 8 12:48:55 2024 +0800 [INLONG-11241][SDK] Transform support ARRAY() function (#11274) --- .../transform/process/function/ArrayFunction.java | 64 +++++++++++++ .../process/function/string/TestArrayFunction.java | 101 +++++++++++++++++++++ 2 files changed, 165 insertions(+) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ArrayFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ArrayFunction.java new file mode 100644 index 0000000000..300d4a09db --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/ArrayFunction.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.List; +/** + * ArrayFunction + * description: ARRAY(ANY1, ANY2, ...)--Returns an array created from a list of values (value1, value2, …). + * for example: array('he',7,'xxd')--return [he, 7, xxd] + * array(array('he',5),'xxd')--return [[he, 5], xxd] + * array(array('he',5),array('',''))--return [[he, 5], [, ]] + */ +@TransformFunction(names = {"array"}) +public class ArrayFunction implements ValueParser { + + private List<ValueParser> parserList; + + public ArrayFunction(Function expr) { + if (expr.getParameters() == null) { + this.parserList = new ArrayList<>(); + } else { + List<Expression> params = expr.getParameters().getExpressions(); + parserList = new ArrayList<>(params.size()); + for (Expression param : params) { + ValueParser node = OperatorTools.buildParser(param); + parserList.add(node); + } + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + ArrayList<Object> res = new ArrayList<>(); + for (ValueParser valueParser : parserList) { + Object parseObj = valueParser.parse(sourceData, rowIndex, context); + res.add(parseObj); + } + return res; + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestArrayFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestArrayFunction.java new file mode 100644 index 0000000000..a18118b688 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestArrayFunction.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestArrayFunction extends AbstractFunctionStringTestBase { + + @Test + public void testArrayFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor<String, String> processor = null; + List<String> output = null; + + transformSql = "select array(string1,numeric1,string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: array('he',7,'xxd') + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 7, xxd]", output.get(0)); + + // case2: array('he', 1, '') + data = "he||cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, 1, ]", output.get(0)); + + // case4: array('he',-1,'xxd') + data = "he|xxd|cloud|-1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, -1, xxd]", output.get(0)); + + transformSql = "select array(array(string1,numeric1),string2) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: array(array('he',5),'xxd') + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[he, 5], xxd]", output.get(0)); + + // case5: array(array('he',5),'') + data = "he||cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[he, 5], ]", output.get(0)); + + transformSql = "select array(array(string1,numeric1),array(string2,string3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case6: array(array('he',5),array('xxd','cloud')) + data = "he|xxd|cloud|5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[he, 5], [xxd, cloud]]", output.get(0)); + + // case6: array(array('he',5),array('','')) + data = "he|||5|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[[he, 5], [, ]]", output.get(0)); + } + +}