This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ca4069293c [INLONG-11107][SDK] Avro Source Data Support Map Type (#11108) ca4069293c is described below commit ca4069293c9110a90f80f6262504f1c4639c2815 Author: Xincheng Huang <60057611+ying-...@users.noreply.github.com> AuthorDate: Fri Sep 20 17:28:12 2024 +0800 [INLONG-11107][SDK] Avro Source Data Support Map Type (#11108) --- .../inlong/sdk/transform/decode/AvroSourceData.java | 12 ++++++++++++ .../process/processor/AbstractProcessorTestBase.java | 19 +++++++++++-------- .../process/processor/TestAvro2CsvProcessor.java | 9 +++++---- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java index c060c89af4..42705433f4 100644 --- a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java +++ b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java @@ -20,12 +20,14 @@ package org.apache.inlong.sdk.transform.decode; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; import org.apache.commons.lang3.StringUtils; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class AvroSourceData implements SourceData { @@ -91,6 +93,16 @@ public class AvroSourceData implements SourceData { // parse other node for (int i = 1; i < childNodes.size(); i++) { AvroNode node = childNodes.get(i); + if (curSchema.getType() == Type.MAP) { + Map<?, ?> map = (Map<?, ?>) current; + Object mapValue = map.get(new Utf8(node.getName())); + if (mapValue == null) { + return ""; + } + curSchema = curSchema.getValueType(); + current = mapValue; + continue; + } if (curSchema.getType() != Type.RECORD) { // error data return ""; diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java index 9f05396667..0a71004789 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java @@ -67,14 +67,17 @@ public abstract class AbstractProcessorTestBase { } protected byte[] getAvroTestData() { - String srcString = "T2JqAQIWYXZyby5zY2hlbWHIBXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtEYXRhUmVxdWVzdCIs" - + "Im5hbWVzcGFjZSI6InRlc3QiLCJmaWVsZHMiOlt7Im5hbWUiOiJzaWQiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIj" - + "oibXNncyIsInR5cGUiOnsidHlwZSI6ImFycmF5IiwiaXRlbXMiOnsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtNZ" - + "XNzYWdlIiwiZmllbGRzIjpbeyJuYW1lIjoibXNnIiwidHlwZSI6ImJ5dGVzIn0seyJuYW1lIjoibXNnVGltZSIsInR5" - + "cGUiOiJsb25nIn0seyJuYW1lIjoiZXh0aW5mbyIsInR5cGUiOnsidHlwZSI6Im1hcCIsInZhbHVlcyI6InN0cmluZyJ" - + "9fV19fX0seyJuYW1lIjoicGFja2FnZUlEIiwidHlwZSI6ImxvbmcifV19AI7h/J8SaFCGp012msD3lKMCngEIc2lkMQ" - + "QKQXBwbGXyhcYJBAhrZXkxCGtleTEIa2V5Mgx2YWx1ZTEADEJhbmFuYeSLjBMECGtleTEIa2V5MghrZXkyDHZhbHVlM" - + "gAAgIkPjuH8nxJoUIanTXaawPeUow=="; + String srcString = "T2JqAQIWYXZyby5zY2hlbWHIBXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJTZGtE" + + "YXRhUmVxdWVzdCIsIm5hbWVzcGFjZSI6InRlc3QiLCJmaWVsZHMiOlt7Im5hbWUi" + + "OiJzaWQiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoibXNncyIsInR5cGUiOnsi" + + "dHlwZSI6ImFycmF5IiwiaXRlbXMiOnsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJT" + + "ZGtNZXNzYWdlIiwiZmllbGRzIjpbeyJuYW1lIjoibXNnIiwidHlwZSI6ImJ5dGVz" + + "In0seyJuYW1lIjoibXNnVGltZSIsInR5cGUiOiJsb25nIn0seyJuYW1lIjoiZXh0" + + "aW5mbyIsInR5cGUiOnsidHlwZSI6Im1hcCIsInZhbHVlcyI6InN0cmluZyJ9fV19" + + "fX0seyJuYW1lIjoicGFja2FnZUlEIiwidHlwZSI6ImxvbmcifV19AMt7kQjpgkXl" + + "EjM4Iv+oOJYClgEIc2lkMQQKQXBwbGXyhcYJBARrMQx2YWx1ZTEEazIMdmFsdWUy" + + "AAxCYW5hbmHki4wTBARrMQx2YWx1ZTMEazIMdmFsdWU0AACAiQ/Le5EI6YJF5RIz" + + "OCL/qDiW"; byte[] srcBytes = Base64.getDecoder().decode(srcString); return srcBytes; } diff --git a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java index fa0a361112..8ef3f9f3f5 100644 --- a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java +++ b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestAvro2CsvProcessor.java @@ -34,10 +34,11 @@ public class TestAvro2CsvProcessor extends AbstractProcessorTestBase { @Test public void testAvro2Csv() throws Exception { - List<FieldInfo> fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg"); + List<FieldInfo> fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg", "extinfo.k1"); AvroSourceInfo avroSource = new AvroSourceInfo("UTF-8", "msgs"); CsvSinkInfo csvSink = new CsvSinkInfo("UTF-8", '|', '\\', fields); - String transformSql = "select $root.sid,$root.packageID,$child.msgTime,$child.msg from source"; + String transformSql = + "select $root.sid,$root.packageID,$child.msgTime,$child.msg, $child.extinfo.k1 from source"; TransformConfig config = new TransformConfig(transformSql); // case1 TransformProcessor<byte[], String> processor = TransformProcessor @@ -46,7 +47,7 @@ public class TestAvro2CsvProcessor extends AbstractProcessorTestBase { byte[] srcBytes = this.getAvroTestData(); List<String> output = processor.transform(srcBytes); Assert.assertEquals(2, output.size()); - Assert.assertEquals(output.get(0), "sid1|123456|10011001|Apple"); - Assert.assertEquals(output.get(1), "sid1|123456|20022002|Banana"); + Assert.assertEquals(output.get(0), "sid1|123456|10011001|Apple|value1"); + Assert.assertEquals(output.get(1), "sid1|123456|20022002|Banana|value3"); } }