This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ca4069293c [INLONG-11107][SDK] Avro Source Data Support Map Type 
(#11108)
ca4069293c is described below

commit ca4069293c9110a90f80f6262504f1c4639c2815
Author: Xincheng Huang <60057611+ying-...@users.noreply.github.com>
AuthorDate: Fri Sep 20 17:28:12 2024 +0800

    [INLONG-11107][SDK] Avro Source Data Support Map Type (#11108)
---
 .../inlong/sdk/transform/decode/AvroSourceData.java   | 12 ++++++++++++
 .../process/processor/AbstractProcessorTestBase.java  | 19 +++++++++++--------
 .../process/processor/TestAvro2CsvProcessor.java      |  9 +++++----
 3 files changed, 28 insertions(+), 12 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
index c060c89af4..42705433f4 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java
@@ -20,12 +20,14 @@ package org.apache.inlong.sdk.transform.decode;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
 import org.apache.commons.lang3.StringUtils;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class AvroSourceData implements SourceData {
 
@@ -91,6 +93,16 @@ public class AvroSourceData implements SourceData {
             // parse other node
             for (int i = 1; i < childNodes.size(); i++) {
                 AvroNode node = childNodes.get(i);
+                if (curSchema.getType() == Type.MAP) {
+                    Map<?, ?> map = (Map<?, ?>) current;
+                    Object mapValue = map.get(new Utf8(node.getName()));
+                    if (mapValue == null) {
+                        return "";
+                    }
+                    curSchema = curSchema.getValueType();
+                    current = mapValue;
+                    continue;
+                }
                 if (curSchema.getType() != Type.RECORD) {
                     // error data
                     return "";
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
index 9f05396667..0a71004789 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
@@ -67,14 +67,17 @@ public abstract class AbstractProcessorTestBase {
     }
 
     protected byte[] getAvroTestData() {
-        String srcString = 
"T2JqAQIWYXZyby5zY2hlbWHIBXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtEYXRhUmVxdWVzdCIs"
-                + 
"Im5hbWVzcGFjZSI6InRlc3QiLCJmaWVsZHMiOlt7Im5hbWUiOiJzaWQiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIj"
-                + 
"oibXNncyIsInR5cGUiOnsidHlwZSI6ImFycmF5IiwiaXRlbXMiOnsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtNZ"
-                + 
"XNzYWdlIiwiZmllbGRzIjpbeyJuYW1lIjoibXNnIiwidHlwZSI6ImJ5dGVzIn0seyJuYW1lIjoibXNnVGltZSIsInR5"
-                + 
"cGUiOiJsb25nIn0seyJuYW1lIjoiZXh0aW5mbyIsInR5cGUiOnsidHlwZSI6Im1hcCIsInZhbHVlcyI6InN0cmluZyJ"
-                + 
"9fV19fX0seyJuYW1lIjoicGFja2FnZUlEIiwidHlwZSI6ImxvbmcifV19AI7h/J8SaFCGp012msD3lKMCngEIc2lkMQ"
-                + 
"QKQXBwbGXyhcYJBAhrZXkxCGtleTEIa2V5Mgx2YWx1ZTEADEJhbmFuYeSLjBMECGtleTEIa2V5MghrZXkyDHZhbHVlM"
-                + "gAAgIkPjuH8nxJoUIanTXaawPeUow==";
+        String srcString = 
"T2JqAQIWYXZyby5zY2hlbWHIBXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtE"
+                + 
"YXRhUmVxdWVzdCIsIm5hbWVzcGFjZSI6InRlc3QiLCJmaWVsZHMiOlt7Im5hbWUi"
+                + 
"OiJzaWQiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoibXNncyIsInR5cGUiOnsi"
+                + 
"dHlwZSI6ImFycmF5IiwiaXRlbXMiOnsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJT"
+                + 
"ZGtNZXNzYWdlIiwiZmllbGRzIjpbeyJuYW1lIjoibXNnIiwidHlwZSI6ImJ5dGVz"
+                + 
"In0seyJuYW1lIjoibXNnVGltZSIsInR5cGUiOiJsb25nIn0seyJuYW1lIjoiZXh0"
+                + 
"aW5mbyIsInR5cGUiOnsidHlwZSI6Im1hcCIsInZhbHVlcyI6InN0cmluZyJ9fV19"
+                + 
"fX0seyJuYW1lIjoicGFja2FnZUlEIiwidHlwZSI6ImxvbmcifV19AMt7kQjpgkXl"
+                + 
"EjM4Iv+oOJYClgEIc2lkMQQKQXBwbGXyhcYJBARrMQx2YWx1ZTEEazIMdmFsdWUy"
+                + 
"AAxCYW5hbmHki4wTBARrMQx2YWx1ZTMEazIMdmFsdWU0AACAiQ/Le5EI6YJF5RIz"
+                + "OCL/qDiW";
         byte[] srcBytes = Base64.getDecoder().decode(srcString);
         return srcBytes;
     }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java
index fa0a361112..8ef3f9f3f5 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java
@@ -34,10 +34,11 @@ public class TestAvro2CsvProcessor extends 
AbstractProcessorTestBase {
 
     @Test
     public void testAvro2Csv() throws Exception {
-        List<FieldInfo> fields = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg");
+        List<FieldInfo> fields = this.getTestFieldList("sid", "packageID", 
"msgTime", "msg", "extinfo.k1");
         AvroSourceInfo avroSource = new AvroSourceInfo("UTF-8", "msgs");
         CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields);
-        String transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+        String transformSql =
+                "select $root.sid,$root.packageID,$child.msgTime,$child.msg, 
$child.extinfo.k1 from source";
         TransformConfig config = new TransformConfig(transformSql);
         // case1
         TransformProcessor<byte[], String> processor = TransformProcessor
@@ -46,7 +47,7 @@ public class TestAvro2CsvProcessor extends 
AbstractProcessorTestBase {
         byte[] srcBytes = this.getAvroTestData();
         List<String> output = processor.transform(srcBytes);
         Assert.assertEquals(2, output.size());
-        Assert.assertEquals(output.get(0), "sid1|123456|10011001|Apple");
-        Assert.assertEquals(output.get(1), "sid1|123456|20022002|Banana");
+        Assert.assertEquals(output.get(0), 
"sid1|123456|10011001|Apple|value1");
+        Assert.assertEquals(output.get(1), 
"sid1|123456|20022002|Banana|value3");
     }
 }

Reply via email to