This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 25e786b828 [INLONG-11283][SDK]Transform enhance Map-related Collection 
Functions (#11317)
25e786b828 is described below

commit 25e786b82886a682d36cd3d9cd045c1e2d3a9bfe
Author: emptyOVO <118812562+empty...@users.noreply.github.com>
AuthorDate: Thu Oct 10 11:33:44 2024 +0800

    [INLONG-11283][SDK]Transform enhance Map-related Collection Functions 
(#11317)
    
    * [INLONG-11283][SDK]Transform enhance Map-related Collection Functions
    
    * feat: add description for functions
---
 .../process/function/MapEntriesFunction.java       | 56 +++++++++++++
 .../process/function/MapFromArraysFunction.java    | 77 +++++++++++++++++
 .../process/function/MapKeysFunction.java          | 59 +++++++++++++
 .../process/function/MapUnionFunction.java         | 77 +++++++++++++++++
 .../process/function/MapValueFunction.java         | 60 ++++++++++++++
 .../function/string/TestMapEntriesFunction.java    | 90 ++++++++++++++++++++
 .../function/string/TestMapFromArraysFunction.java | 90 ++++++++++++++++++++
 .../function/string/TestMapKeysFunction.java       | 89 ++++++++++++++++++++
 .../function/string/TestMapUnionFunction.java      | 96 ++++++++++++++++++++++
 .../function/string/TestMapValuesFunction.java     | 90 ++++++++++++++++++++
 10 files changed, 784 insertions(+)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapEntriesFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapEntriesFunction.java
new file mode 100644
index 0000000000..dde1c82b78
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapEntriesFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.Arrays;
+import java.util.Map;
+/**
+ * MapEntriesFunction
+ * description: MAP_ENTRIES(map)--Returns an array of all entries in the given 
map. No order guaranteed.
+ * for example: map_entries(Map('he',1,'xxd','cloud'))--return [he=1, 
xxd=cloud]
+ *              map_entries(Map(1,2,'cloud','xxd'))--return [xxd=cloud, 1=2]
+ */
+@TransformFunction(names = {"map_entries"})
+public class MapEntriesFunction implements ValueParser {
+
+    private final ValueParser mapParser;
+
+    public MapEntriesFunction(Function expr) {
+        this.mapParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object mapObj = mapParser.parse(sourceData, rowIndex, context);
+        if (mapObj == null) {
+            return null;
+        }
+        if (mapObj instanceof Map) {
+            Map<?, ?> map = (Map<?, ?>) mapObj;
+            return Arrays.toString(map.entrySet().toArray(new Map.Entry[0]));
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFromArraysFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFromArraysFunction.java
new file mode 100644
index 0000000000..2e90e7d6e1
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapFromArraysFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import lombok.extern.slf4j.Slf4j;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+/**
+ * MapFromArraysFunction
+ * description: MAP_FROM_ARRAYS(array_of_keys, array_of_values)--Returns a map 
created from an arrays of keys and values.
+ *              Note that the lengths of two arrays should be the same.
+ * for example: map_from_arrays(array('he', 'xxd'),array(1, 3))--return {he=1, 
xxd=3}
+ *              map_from_arrays(array('xxd', array('cloud')),array(1, 
array(2)))--return {1=xxd, [2]=[cloud]}
+ */
+@Slf4j
+@TransformFunction(names = {"map_from_arrays"})
+public class MapFromArraysFunction implements ValueParser {
+
+    private ValueParser keyArrayParser;
+
+    private ValueParser valueArrayParser;
+
+    public MapFromArraysFunction(Function expr) {
+        if (expr.getParameters().getExpressions().size() >= 2) {
+            this.keyArrayParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
+            this.valueArrayParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(1));
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object keyArrayObj = keyArrayParser.parse(sourceData, rowIndex, 
context);
+        Object valueArrayObj = valueArrayParser.parse(sourceData, rowIndex, 
context);
+        if (keyArrayObj == null || valueArrayObj == null) {
+            return null;
+        }
+        if (keyArrayObj instanceof ArrayList && valueArrayObj instanceof 
ArrayList) {
+            ArrayList<?> keyArray = ((ArrayList<?>) keyArrayObj);
+            ArrayList<?> valueArray = ((ArrayList<?>) valueArrayObj);
+
+            if (keyArray.size() != valueArray.size()) {
+                log.warn("The lengths of the keys and values arrays must be 
the same.");
+                return null;
+            }
+            Map<Object, Object> res = new LinkedHashMap<>();
+
+            for (int i = 0; i < keyArray.size(); i++) {
+                res.put(keyArray.get(i), valueArray.get(i));
+            }
+            return res;
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapKeysFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapKeysFunction.java
new file mode 100644
index 0000000000..b2d6953ca7
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapKeysFunction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.Arrays;
+import java.util.Map;
+/**
+ * MapKeysFunction
+ * description: MAP_KEYS(map)--Returns the keys of the map as array. No order 
guaranteed.
+ * for example: map_keys(Map('he',1,'xxd','cloud'))--return [he, xxd]
+ *              map_keys(Map('xxd','cloud',map(1,2),map(3,'apple')))--return 
[xxd, {1=2}]
+ */
+@TransformFunction(names = {"map_keys"})
+public class MapKeysFunction implements ValueParser {
+
+    private final ValueParser mapParser;
+
+    public MapKeysFunction(Function expr) {
+        this.mapParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object mapObj = mapParser.parse(sourceData, rowIndex, context);
+        if (mapObj == null) {
+            return null;
+        }
+        if (mapObj instanceof Map) {
+            Map<?, ?> map = (Map<?, ?>) mapObj;
+            if (map.isEmpty()) {
+                return null;
+            }
+            return Arrays.toString(map.keySet().toArray());
+        }
+        return null;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapUnionFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapUnionFunction.java
new file mode 100644
index 0000000000..ab74d40b81
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapUnionFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+/**
+ * MapUnionFunction
+ * description: MAP_UNION(map1, …)--Returns a map created by merging at least 
one map. These maps should have a common
+ *              map type. If there are overlapping keys, the value from ‘map2’ 
will overwrite the value from ‘map1’
+ *              , the value from ‘map3’ will overwrite the value from ‘map2’, 
the value from ‘mapn’ will overwrite
+ *              the value from ‘map(n-1)’. If any of maps is null, return null.
+ * for example: map_union(map('he', 1),map('xxd', 3))--return {he=1, xxd=3}
+ *              map_union(map('he', 1),map('he', 3))--return {he=3}
+ */
+@TransformFunction(names = {"map_union"})
+public class MapUnionFunction implements ValueParser {
+
+    private List<ValueParser> parserList;
+
+    public MapUnionFunction(Function expr) {
+        if (expr.getParameters() == null) {
+            this.parserList = new ArrayList<>();
+        } else {
+            List<Expression> params = expr.getParameters().getExpressions();
+            parserList = new ArrayList<>(params.size());
+            for (Expression param : params) {
+                ValueParser node = OperatorTools.buildParser(param);
+                parserList.add(node);
+            }
+        }
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Map<Object, Object> res = new HashMap<>();
+        for (ValueParser valueParser : parserList) {
+            Object mapObj = valueParser.parse(sourceData, rowIndex, context);
+            if (mapObj == null) {
+                return null;
+            }
+            if (mapObj instanceof Map) {
+                Map<?, ?> map = (Map<?, ?>) mapObj;
+                if (map.isEmpty()) {
+                    return null;
+                }
+                res.putAll(map);
+            }
+        }
+        return res;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapValueFunction.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapValueFunction.java
new file mode 100644
index 0000000000..e9aa26b7ac
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/function/MapValueFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function;
+
+import org.apache.inlong.sdk.transform.decode.SourceData;
+import org.apache.inlong.sdk.transform.process.Context;
+import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ValueParser;
+
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.Arrays;
+import java.util.Map;
+/**
+ * MapValueFunction
+ * description: MAP_VALUES(map)--Returns the values of the map as array. No 
order guaranteed.
+ * for example: map_values(Map('he',1,'xxd','cloud'))--return [1, cloud]
+ *              map_values(Map('xxd','cloud',map(1,2),map(3,'apple')))--return 
[cloud, {3=apple}]
+ */
+@TransformFunction(names = {"map_values"})
+public class MapValueFunction implements ValueParser {
+
+    private final ValueParser mapParser;
+
+    public MapValueFunction(Function expr) {
+        this.mapParser = 
OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
+    }
+
+    @Override
+    public Object parse(SourceData sourceData, int rowIndex, Context context) {
+        Object mapObj = mapParser.parse(sourceData, rowIndex, context);
+        if (mapObj == null) {
+            return null;
+        }
+        if (mapObj instanceof Map) {
+            Map<?, ?> map = (Map<?, ?>) mapObj;
+            if (map.isEmpty()) {
+                return null;
+            }
+            return Arrays.toString(map.values().toArray());
+        }
+        return null;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapEntriesFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapEntriesFunction.java
new file mode 100644
index 0000000000..b8db9fbb33
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapEntriesFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.string;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestMapEntriesFunction extends AbstractFunctionStringTestBase {
+
+    @Test
+    public void testMapEntriesFunction() throws Exception {
+        String transformSql = null, data = null;
+        TransformConfig config = null;
+        TransformProcessor<String, String> processor = null;
+        List<String> output = null;
+
+        transformSql = "select map_entries(map(string1,numeric1,string2)) from 
source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case1: map_entries(Map('he',7,'xxd'))
+        data = "he|xxd|cloud|7|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=", output.get(0));
+
+        transformSql = "select 
map_entries(map(string1,numeric1,string2,string3)) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case2: map_entries(Map('he',1,'xxd','cloud'))
+        data = "he|xxd|cloud|1|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=[he=1, xxd=cloud]", output.get(0));
+
+        transformSql = "select 
map_entries(map(numeric1,numeric2,string1,string2)) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case3: map_entries(Map(1,2,'cloud','xxd'))
+        data = "1|2|3|xxd|cloud|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=[xxd=cloud, 1=2]", output.get(0));
+
+        transformSql =
+                "select 
map_entries(map(numeric1,numeric2,map(string1,string2),map(string3,numeric3))) 
from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case4: map_entries(Map('xxd','cloud',map(1,2),map(3,'apple')))
+        data = "1|2|3|xxd|cloud|apple";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=[xxd=cloud, {1=2}={3=apple}]", 
output.get(0));
+
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFromArraysFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFromArraysFunction.java
new file mode 100644
index 0000000000..c08ee4d788
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapFromArraysFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.string;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestMapFromArraysFunction extends AbstractFunctionStringTestBase {
+
+    @Test
+    public void testMapFromArraysFunction() throws Exception {
+        String transformSql = null, data = null;
+        TransformConfig config = null;
+        TransformProcessor<String, String> processor = null;
+        List<String> output = null;
+
+        transformSql = "select map_from_arrays(map(string1,numeric1,string2)) 
from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case1: map_from_arrays(Map('he',7,'xxd'))
+        data = "he|xxd|cloud|7|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=", output.get(0));
+
+        transformSql = "select 
map_from_arrays(array(string1,string2),array(numeric1,numeric2)) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case2: map_from_arrays(array('he', 'xxd'),array(1, 3))
+        data = "he|xxd|cloud|1|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result={he=1, xxd=3}", output.get(0));
+
+        transformSql = "select 
map_from_arrays(array(string1,string2),array(numeric1)) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case3: map_from_arrays(array(xxd,cloud),array(1))
+        data = "1|2|3|xxd|cloud|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=", output.get(0));
+
+        transformSql =
+                "select 
map_from_arrays(array(string1,array(string2)),array(numeric1,array(numeric2))) 
from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case4: map_from_arrays(array('xxd', array('cloud')),array(1, 
array(2)))
+        data = "1|2|3|xxd|cloud|apple";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result={1=xxd, [2]=[cloud]}", output.get(0));
+
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapKeysFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapKeysFunction.java
new file mode 100644
index 0000000000..defdf0e2da
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapKeysFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.string;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestMapKeysFunction extends AbstractFunctionStringTestBase {
+
+    @Test
+    public void testMapKeysFunction() throws Exception {
+        String transformSql = null, data = null;
+        TransformConfig config = null;
+        TransformProcessor<String, String> processor = null;
+        List<String> output = null;
+
+        transformSql = "select map_keys(map(string1,numeric1,string2)) from 
source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case1: map_keys(Map('he',7,'xxd'))
+        data = "he|xxd|cloud|7|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=", output.get(0));
+
+        transformSql = "select map_keys(map(string1,numeric1,string2,string3)) 
from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case2: map_keys(Map('he',1,'xxd','cloud'))
+        data = "he|xxd|cloud|1|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=[he, xxd]", output.get(0));
+
+        transformSql = "select 
map_keys(map(numeric1,numeric2,string1,string2)) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case3: map_keys(Map(1,2,'cloud','xxd'))
+        data = "1|2|3|xxd|cloud|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=[xxd, 1]", output.get(0));
+
+        transformSql = "select 
map_keys(map(numeric1,numeric2,map(string1,string2),map(string3,numeric3))) 
from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case4: map_keys(Map('xxd','cloud',map(1,2),map(3,'apple')))
+        data = "1|2|3|xxd|cloud|apple";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=[xxd, {1=2}]", output.get(0));
+
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapUnionFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapUnionFunction.java
new file mode 100644
index 0000000000..13024bd5d6
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapUnionFunction.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.string;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestMapUnionFunction extends AbstractFunctionStringTestBase {
+
+    @Test
+    public void testMapUnionFunction() throws Exception {
+        String transformSql = null, data = null;
+        TransformConfig config = null;
+        TransformProcessor<String, String> processor = null;
+        List<String> output = null;
+
+        transformSql = "select map_union(map(string1,numeric1,string2)) from 
source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case1: map_union(NULL)
+        data = "he|xxd|cloud|7|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=", output.get(0));
+
+        transformSql = "select 
map_union(map(string1,numeric1),map(string2,numeric2)) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case2: map_union(map('he', 1),map('xxd', 3))
+        data = "he|xxd|cloud|1|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result={he=1, xxd=3}", output.get(0));
+
+        // case3: map_union(map('he', 1),map('he', 3))
+        data = "he|he|cloud|1|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result={he=3}", output.get(0));
+
+        transformSql = "select 
map_union(map(string1,numeric1),map(string2,numeric2)" +
+                ",map(string3,numeric3)) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case4: map_union(map('xxd', 1),map('cloud', 2),map('xxd', 3))
+        data = "xxd|cloud|xxd|1|2|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result={cloud=2, xxd=3}", output.get(0));
+
+        transformSql = "select 
map_union(map(numeric1,numeric2),map(string1,string2),map()) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case5: map_union(map(1, 2),map('xxd', 'cloud'),NULL)
+        data = "1|2|3|xxd|cloud|apple";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=", output.get(0));
+
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapValuesFunction.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapValuesFunction.java
new file mode 100644
index 0000000000..37b2684084
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/function/string/TestMapValuesFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.function.string;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class TestMapValuesFunction extends AbstractFunctionStringTestBase {
+
+    @Test
+    public void testMapValuesFunction() throws Exception {
+        String transformSql = null, data = null;
+        TransformConfig config = null;
+        TransformProcessor<String, String> processor = null;
+        List<String> output = null;
+
+        transformSql = "select map_values(map(string1,numeric1,string2)) from 
source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case1: map_values(Map('he',7,'xxd'))
+        data = "he|xxd|cloud|7|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=", output.get(0));
+
+        transformSql = "select 
map_values(map(string1,numeric1,string2,string3)) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case2: map_values(Map('he',1,'xxd','cloud'))
+        data = "he|xxd|cloud|1|3|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=[1, cloud]", output.get(0));
+
+        transformSql = "select 
map_values(map(numeric1,numeric2,string1,string2)) from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case3: map_values(Map(1,2,'cloud','xxd'))
+        data = "1|2|3|xxd|cloud|3";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=[cloud, 2]", output.get(0));
+
+        transformSql =
+                "select 
map_values(map(numeric1,numeric2,map(string1,string2),map(string3,numeric3))) 
from source";
+        config = new TransformConfig(transformSql);
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createCsvDecoder(csvSource),
+                        SinkEncoderFactory.createKvEncoder(kvSink));
+
+        // case4: map_values(Map('xxd','cloud',map(1,2),map(3,'apple')))
+        data = "1|2|3|xxd|cloud|apple";
+        output = processor.transform(data, new HashMap<>());
+        Assert.assertEquals(1, output.size());
+        Assert.assertEquals("result=[cloud, {3=apple}]", output.get(0));
+
+    }
+}


Reply via email to