This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 25e786b828 [INLONG-11283][SDK]Transform enhance Map-related Collection Functions (#11317) 25e786b828 is described below commit 25e786b82886a682d36cd3d9cd045c1e2d3a9bfe Author: emptyOVO <118812562+empty...@users.noreply.github.com> AuthorDate: Thu Oct 10 11:33:44 2024 +0800 [INLONG-11283][SDK]Transform enhance Map-related Collection Functions (#11317) * [INLONG-11283][SDK]Transform enhance Map-related Collection Functions * feat: add description for functions --- .../process/function/MapEntriesFunction.java | 56 +++++++++++++ .../process/function/MapFromArraysFunction.java | 77 +++++++++++++++++ .../process/function/MapKeysFunction.java | 59 +++++++++++++ .../process/function/MapUnionFunction.java | 77 +++++++++++++++++ .../process/function/MapValueFunction.java | 60 ++++++++++++++ .../function/string/TestMapEntriesFunction.java | 90 ++++++++++++++++++++ .../function/string/TestMapFromArraysFunction.java | 90 ++++++++++++++++++++ .../function/string/TestMapKeysFunction.java | 89 ++++++++++++++++++++ .../function/string/TestMapUnionFunction.java | 96 ++++++++++++++++++++++ .../function/string/TestMapValuesFunction.java | 90 ++++++++++++++++++++ 10 files changed, 784 insertions(+) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapEntriesFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapEntriesFunction.java new file mode 100644 index 0000000000..dde1c82b78 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapEntriesFunction.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.Arrays; +import java.util.Map; +/** + * MapEntriesFunction + * description: MAP_ENTRIES(map)--Returns an array of all entries in the given map. No order guaranteed. + * for example: map_entries(Map('he',1,'xxd','cloud'))--return [he=1, xxd=cloud] + * map_entries(Map(1,2,'cloud','xxd'))--return [xxd=cloud, 1=2] + */ +@TransformFunction(names = {"map_entries"}) +public class MapEntriesFunction implements ValueParser { + + private final ValueParser mapParser; + + public MapEntriesFunction(Function expr) { + this.mapParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object mapObj = mapParser.parse(sourceData, rowIndex, context); + if (mapObj == null) { + return null; + } + if (mapObj instanceof Map) { + Map<?, ?> map = (Map<?, ?>) mapObj; + return Arrays.toString(map.entrySet().toArray(new Map.Entry[0])); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFromArraysFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFromArraysFunction.java new file mode 100644 index 0000000000..2e90e7d6e1 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFromArraysFunction.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import lombok.extern.slf4j.Slf4j; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +/** + * MapFromArraysFunction + * description: MAP_FROM_ARRAYS(array_of_keys, array_of_values)--Returns a map created from an arrays of keys and values. + * Note that the lengths of two arrays should be the same. + * for example: map_from_arrays(array('he', 'xxd'),array(1, 3))--return {he=1, xxd=3} + * map_from_arrays(array('xxd', array('cloud')),array(1, array(2)))--return {1=xxd, [2]=[cloud]} + */ +@Slf4j +@TransformFunction(names = {"map_from_arrays"}) +public class MapFromArraysFunction implements ValueParser { + + private ValueParser keyArrayParser; + + private ValueParser valueArrayParser; + + public MapFromArraysFunction(Function expr) { + if (expr.getParameters().getExpressions().size() >= 2) { + this.keyArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + this.valueArrayParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(1)); + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object keyArrayObj = keyArrayParser.parse(sourceData, rowIndex, context); + Object valueArrayObj = valueArrayParser.parse(sourceData, rowIndex, context); + if (keyArrayObj == null || valueArrayObj == null) { + return null; + } + if (keyArrayObj instanceof ArrayList && valueArrayObj instanceof ArrayList) { + ArrayList<?> keyArray = ((ArrayList<?>) keyArrayObj); + ArrayList<?> valueArray = ((ArrayList<?>) valueArrayObj); + + if (keyArray.size() != valueArray.size()) { + log.warn("The lengths of the keys and values arrays must be the same."); + return null; + } + Map<Object, Object> res = new LinkedHashMap<>(); + + for (int i = 0; i < keyArray.size(); i++) { + res.put(keyArray.get(i), valueArray.get(i)); + } + return res; + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapKeysFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapKeysFunction.java new file mode 100644 index 0000000000..b2d6953ca7 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapKeysFunction.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.Arrays; +import java.util.Map; +/** + * MapKeysFunction + * description: MAP_KEYS(map)--Returns the keys of the map as array. No order guaranteed. + * for example: map_keys(Map('he',1,'xxd','cloud'))--return [he, xxd] + * map_keys(Map('xxd','cloud',map(1,2),map(3,'apple')))--return [xxd, {1=2}] + */ +@TransformFunction(names = {"map_keys"}) +public class MapKeysFunction implements ValueParser { + + private final ValueParser mapParser; + + public MapKeysFunction(Function expr) { + this.mapParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object mapObj = mapParser.parse(sourceData, rowIndex, context); + if (mapObj == null) { + return null; + } + if (mapObj instanceof Map) { + Map<?, ?> map = (Map<?, ?>) mapObj; + if (map.isEmpty()) { + return null; + } + return Arrays.toString(map.keySet().toArray()); + } + return null; + } +} diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapUnionFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapUnionFunction.java new file mode 100644 index 0000000000..ab74d40b81 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapUnionFunction.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Expression; +import net.sf.jsqlparser.expression.Function; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +/** + * MapUnionFunction + * description: MAP_UNION(map1, …)--Returns a map created by merging at least one map. These maps should have a common + * map type. If there are overlapping keys, the value from ‘map2’ will overwrite the value from ‘map1’ + * , the value from ‘map3’ will overwrite the value from ‘map2’, the value from ‘mapn’ will overwrite + * the value from ‘map(n-1)’. If any of maps is null, return null. + * for example: map_union(map('he', 1),map('xxd', 3))--return {he=1, xxd=3} + * map_union(map('he', 1),map('he', 3))--return {he=3} + */ +@TransformFunction(names = {"map_union"}) +public class MapUnionFunction implements ValueParser { + + private List<ValueParser> parserList; + + public MapUnionFunction(Function expr) { + if (expr.getParameters() == null) { + this.parserList = new ArrayList<>(); + } else { + List<Expression> params = expr.getParameters().getExpressions(); + parserList = new ArrayList<>(params.size()); + for (Expression param : params) { + ValueParser node = OperatorTools.buildParser(param); + parserList.add(node); + } + } + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Map<Object, Object> res = new HashMap<>(); + for (ValueParser valueParser : parserList) { + Object mapObj = valueParser.parse(sourceData, rowIndex, context); + if (mapObj == null) { + return null; + } + if (mapObj instanceof Map) { + Map<?, ?> map = (Map<?, ?>) mapObj; + if (map.isEmpty()) { + return null; + } + res.putAll(map); + } + } + return res; + } +} \ No newline at end of file diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapValueFunction.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapValueFunction.java new file mode 100644 index 0000000000..e9aa26b7ac --- /dev/null +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapValueFunction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function; + +import org.apache.inlong.sdk.transform.decode.SourceData; +import org.apache.inlong.sdk.transform.process.Context; +import org.apache.inlong.sdk.transform.process.operator.OperatorTools; +import org.apache.inlong.sdk.transform.process.parser.ValueParser; + +import net.sf.jsqlparser.expression.Function; + +import java.util.Arrays; +import java.util.Map; +/** + * MapValueFunction + * description: MAP_VALUES(map)--Returns the values of the map as array. No order guaranteed. + * for example: map_values(Map('he',1,'xxd','cloud'))--return [1, cloud] + * map_values(Map('xxd','cloud',map(1,2),map(3,'apple')))--return [cloud, {3=apple}] + */ +@TransformFunction(names = {"map_values"}) +public class MapValueFunction implements ValueParser { + + private final ValueParser mapParser; + + public MapValueFunction(Function expr) { + this.mapParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0)); + } + + @Override + public Object parse(SourceData sourceData, int rowIndex, Context context) { + Object mapObj = mapParser.parse(sourceData, rowIndex, context); + if (mapObj == null) { + return null; + } + if (mapObj instanceof Map) { + Map<?, ?> map = (Map<?, ?>) mapObj; + if (map.isEmpty()) { + return null; + } + return Arrays.toString(map.values().toArray()); + } + return null; + } + +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapEntriesFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapEntriesFunction.java new file mode 100644 index 0000000000..b8db9fbb33 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapEntriesFunction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestMapEntriesFunction extends AbstractFunctionStringTestBase { + + @Test + public void testMapEntriesFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor<String, String> processor = null; + List<String> output = null; + + transformSql = "select map_entries(map(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: map_entries(Map('he',7,'xxd')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select map_entries(map(string1,numeric1,string2,string3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case2: map_entries(Map('he',1,'xxd','cloud')) + data = "he|xxd|cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he=1, xxd=cloud]", output.get(0)); + + transformSql = "select map_entries(map(numeric1,numeric2,string1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: map_entries(Map(1,2,'cloud','xxd')) + data = "1|2|3|xxd|cloud|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[xxd=cloud, 1=2]", output.get(0)); + + transformSql = + "select map_entries(map(numeric1,numeric2,map(string1,string2),map(string3,numeric3))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: map_entries(Map('xxd','cloud',map(1,2),map(3,'apple'))) + data = "1|2|3|xxd|cloud|apple"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[xxd=cloud, {1=2}={3=apple}]", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFromArraysFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFromArraysFunction.java new file mode 100644 index 0000000000..c08ee4d788 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFromArraysFunction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestMapFromArraysFunction extends AbstractFunctionStringTestBase { + + @Test + public void testMapFromArraysFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor<String, String> processor = null; + List<String> output = null; + + transformSql = "select map_from_arrays(map(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: map_from_arrays(Map('he',7,'xxd')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select map_from_arrays(array(string1,string2),array(numeric1,numeric2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case2: map_from_arrays(array('he', 'xxd'),array(1, 3)) + data = "he|xxd|cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={he=1, xxd=3}", output.get(0)); + + transformSql = "select map_from_arrays(array(string1,string2),array(numeric1)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: map_from_arrays(array(xxd,cloud),array(1)) + data = "1|2|3|xxd|cloud|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = + "select map_from_arrays(array(string1,array(string2)),array(numeric1,array(numeric2))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: map_from_arrays(array('xxd', array('cloud')),array(1, array(2))) + data = "1|2|3|xxd|cloud|apple"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={1=xxd, [2]=[cloud]}", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapKeysFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapKeysFunction.java new file mode 100644 index 0000000000..defdf0e2da --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapKeysFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestMapKeysFunction extends AbstractFunctionStringTestBase { + + @Test + public void testMapKeysFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor<String, String> processor = null; + List<String> output = null; + + transformSql = "select map_keys(map(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: map_keys(Map('he',7,'xxd')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select map_keys(map(string1,numeric1,string2,string3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case2: map_keys(Map('he',1,'xxd','cloud')) + data = "he|xxd|cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[he, xxd]", output.get(0)); + + transformSql = "select map_keys(map(numeric1,numeric2,string1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: map_keys(Map(1,2,'cloud','xxd')) + data = "1|2|3|xxd|cloud|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[xxd, 1]", output.get(0)); + + transformSql = "select map_keys(map(numeric1,numeric2,map(string1,string2),map(string3,numeric3))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: map_keys(Map('xxd','cloud',map(1,2),map(3,'apple'))) + data = "1|2|3|xxd|cloud|apple"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[xxd, {1=2}]", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapUnionFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapUnionFunction.java new file mode 100644 index 0000000000..13024bd5d6 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapUnionFunction.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestMapUnionFunction extends AbstractFunctionStringTestBase { + + @Test + public void testMapUnionFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor<String, String> processor = null; + List<String> output = null; + + transformSql = "select map_union(map(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: map_union(NULL) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select map_union(map(string1,numeric1),map(string2,numeric2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case2: map_union(map('he', 1),map('xxd', 3)) + data = "he|xxd|cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={he=1, xxd=3}", output.get(0)); + + // case3: map_union(map('he', 1),map('he', 3)) + data = "he|he|cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={he=3}", output.get(0)); + + transformSql = "select map_union(map(string1,numeric1),map(string2,numeric2)" + + ",map(string3,numeric3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: map_union(map('xxd', 1),map('cloud', 2),map('xxd', 3)) + data = "xxd|cloud|xxd|1|2|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result={cloud=2, xxd=3}", output.get(0)); + + transformSql = "select map_union(map(numeric1,numeric2),map(string1,string2),map()) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case5: map_union(map(1, 2),map('xxd', 'cloud'),NULL) + data = "1|2|3|xxd|cloud|apple"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + } +} diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapValuesFunction.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapValuesFunction.java new file mode 100644 index 0000000000..37b2684084 --- /dev/null +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapValuesFunction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.transform.process.function.string; + +import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory; +import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory; +import org.apache.inlong.sdk.transform.pojo.TransformConfig; +import org.apache.inlong.sdk.transform.process.TransformProcessor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; + +public class TestMapValuesFunction extends AbstractFunctionStringTestBase { + + @Test + public void testMapValuesFunction() throws Exception { + String transformSql = null, data = null; + TransformConfig config = null; + TransformProcessor<String, String> processor = null; + List<String> output = null; + + transformSql = "select map_values(map(string1,numeric1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case1: map_values(Map('he',7,'xxd')) + data = "he|xxd|cloud|7|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=", output.get(0)); + + transformSql = "select map_values(map(string1,numeric1,string2,string3)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case2: map_values(Map('he',1,'xxd','cloud')) + data = "he|xxd|cloud|1|3|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[1, cloud]", output.get(0)); + + transformSql = "select map_values(map(numeric1,numeric2,string1,string2)) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case3: map_values(Map(1,2,'cloud','xxd')) + data = "1|2|3|xxd|cloud|3"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[cloud, 2]", output.get(0)); + + transformSql = + "select map_values(map(numeric1,numeric2,map(string1,string2),map(string3,numeric3))) from source"; + config = new TransformConfig(transformSql); + processor = TransformProcessor + .create(config, SourceDecoderFactory.createCsvDecoder(csvSource), + SinkEncoderFactory.createKvEncoder(kvSink)); + + // case4: map_values(Map('xxd','cloud',map(1,2),map(3,'apple'))) + data = "1|2|3|xxd|cloud|apple"; + output = processor.transform(data, new HashMap<>()); + Assert.assertEquals(1, output.size()); + Assert.assertEquals("result=[cloud, {3=apple}]", output.get(0)); + + } +}