This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f489c51f0e [INLONG-11227][SDK] Add Parquet formatted data sink for 
Transform (#11245)
f489c51f0e is described below

commit f489c51f0e36f23cd9672437fcff3f9330248096
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Wed Oct 9 12:49:54 2024 +0800

    [INLONG-11227][SDK] Add Parquet formatted data sink for Transform (#11245)
---
 .../transform/encode/ParquetByteArrayWriter.java   | 177 ++++++++++++++++++
 .../transform/encode/ParquetOutputByteArray.java   |  60 ++++++
 .../sdk/transform/encode/ParquetSinkEncoder.java   | 106 +++++++++++
 ...EncoderFactory.java => ParquetValueWriter.java} |  18 +-
 ...EncoderFactory.java => ParquetWriteRunner.java} |  23 +--
 .../sdk/transform/encode/SinkEncoderFactory.java   |   5 +
 .../inlong/sdk/transform/pojo/ParquetSinkInfo.java |  64 +++++++
 .../apache/inlong/sdk/transform/pojo/SinkInfo.java |   1 +
 .../processor/AbstractProcessorTestBase.java       |  49 +++++
 .../processor/TestJson2ParquetProcessor.java       | 208 +++++++++++++++++++++
 10 files changed, 679 insertions(+), 32 deletions(-)

diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java
new file mode 100644
index 0000000000..bfb072ea66
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetByteArrayWriter.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+
+public final class ParquetByteArrayWriter<T> implements Closeable {
+
+    private final org.apache.parquet.hadoop.ParquetWriter<T> writer;
+    private final ParquetOutputByteArray outputByteArray;
+
+    public static <T> ParquetByteArrayWriter<T> buildWriter(MessageType 
schema, ParquetWriteRunner<T> writeRunner)
+            throws IOException {
+        return new ParquetByteArrayWriter<>(new ParquetOutputByteArray(), 
schema, writeRunner);
+    }
+
+    private ParquetByteArrayWriter(ParquetOutputByteArray outputFile, 
MessageType schema,
+            ParquetWriteRunner<T> writeRunner)
+            throws IOException {
+        this.writer = new Builder<T>(outputFile)
+                .withType(schema)
+                .withWriteRunner(writeRunner)
+                .withCompressionCodec(CompressionCodecName.SNAPPY)
+                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
+                .build();
+        outputByteArray = outputFile;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.writer.close();
+    }
+
+    public void write(T record) throws IOException {
+        this.writer.write(record);
+    }
+
+    public ByteArrayOutputStream getByteArrayOutputStream() {
+        return outputByteArray.getByteArrayOutputStream();
+    }
+
+    private static final class Builder<T>
+            extends
+                org.apache.parquet.hadoop.ParquetWriter.Builder<T, 
ParquetByteArrayWriter.Builder<T>> {
+
+        private MessageType schema;
+        private ParquetWriteRunner<T> writeRunner;
+
+        private Builder(OutputFile file) {
+            super(file);
+        }
+
+        public Builder<T> withType(MessageType schema) {
+            this.schema = schema;
+            return this;
+        }
+
+        public Builder<T> withWriteRunner(ParquetWriteRunner<T> writeRunner) {
+            this.writeRunner = writeRunner;
+            return this;
+        }
+
+        @Override
+        protected Builder<T> self() {
+            return this;
+        }
+
+        @Override
+        protected WriteSupport<T> getWriteSupport(Configuration conf) {
+            return new ParquetByteArrayWriter.SimpleWriteSupport<>(schema, 
writeRunner);
+        }
+    }
+
+    private static class SimpleWriteSupport<T> extends WriteSupport<T> {
+
+        private final MessageType schema;
+        private final ParquetWriteRunner<T> writeRunner;
+        private final ParquetValueWriter valueWriter;
+
+        private RecordConsumer recordConsumer;
+
+        SimpleWriteSupport(MessageType schema, ParquetWriteRunner<T> 
writeRunner) {
+            this.schema = schema;
+            this.writeRunner = writeRunner;
+            this.valueWriter = this::write;
+        }
+
+        public void write(String name, Object value) {
+            int fieldIndex = schema.getFieldIndex(name);
+            PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType();
+            recordConsumer.startField(name, fieldIndex);
+
+            switch (type.getPrimitiveTypeName()) {
+                case INT32:
+                    recordConsumer.addInteger((int) value);
+                    break;
+                case INT64:
+                    recordConsumer.addLong((long) value);
+                    break;
+                case DOUBLE:
+                    recordConsumer.addDouble((double) value);
+                    break;
+                case BOOLEAN:
+                    recordConsumer.addBoolean((boolean) value);
+                    break;
+                case FLOAT:
+                    recordConsumer.addFloat((float) value);
+                    break;
+                case BINARY:
+                    if (type.getLogicalTypeAnnotation() == 
LogicalTypeAnnotation.stringType()) {
+                        recordConsumer.addBinary(Binary.fromString((String) 
value));
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "Don't support writing " + 
type.getLogicalTypeAnnotation());
+                    }
+                    break;
+                default:
+                    throw new UnsupportedOperationException("Don't support 
writing " + type.getPrimitiveTypeName());
+            }
+            recordConsumer.endField(name, fieldIndex);
+        }
+
+        @Override
+        public WriteContext init(Configuration configuration) {
+            return new WriteContext(schema, Collections.emptyMap());
+        }
+
+        @Override
+        public void prepareForWrite(RecordConsumer recordConsumer) {
+            this.recordConsumer = recordConsumer;
+        }
+
+        @Override
+        public void write(T record) {
+            recordConsumer.startMessage();
+            writeRunner.doWrite(record, valueWriter);
+            recordConsumer.endMessage();
+        }
+
+        @Override
+        public String getName() {
+            return null;
+        }
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java
new file mode 100644
index 0000000000..bf60301a10
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetOutputByteArray.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.parquet.io.DelegatingPositionOutputStream;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class ParquetOutputByteArray implements OutputFile {
+
+    private final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+
+    public ByteArrayOutputStream getByteArrayOutputStream() {
+        return byteArrayOutputStream;
+    }
+
+    @Override
+    public PositionOutputStream create(long blockSizeHint) throws IOException {
+        return createOrOverwrite(blockSizeHint);
+    }
+
+    @Override
+    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws 
IOException {
+        return new DelegatingPositionOutputStream(byteArrayOutputStream) {
+
+            @Override
+            public long getPos() throws IOException {
+                return byteArrayOutputStream.size();
+            }
+        };
+    }
+
+    @Override
+    public boolean supportsBlockSize() {
+        return false;
+    }
+
+    @Override
+    public long defaultBlockSize() {
+        return 1024L;
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
new file mode 100644
index 0000000000..168d7d0c44
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.encode;
+
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
+import org.apache.inlong.sdk.transform.process.Context;
+
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+
+/**
+ * ParquetSinkEncoder
+ */
+public class ParquetSinkEncoder implements SinkEncoder<ByteArrayOutputStream> {
+
+    protected ParquetSinkInfo sinkInfo;
+    protected Charset sinkCharset = Charset.defaultCharset();
+
+    private final List<FieldInfo> fields;
+    private ParquetByteArrayWriter<Object[]> writer;
+
+    public ParquetSinkEncoder(ParquetSinkInfo sinkInfo) {
+        this.sinkInfo = sinkInfo;
+        this.fields = sinkInfo.getFields();
+        ArrayList<Type> typesList = new ArrayList<>();
+        for (FieldInfo fieldInfo : this.fields) {
+            typesList.add(Types.required(BINARY)
+                    .as(LogicalTypeAnnotation.stringType())
+                    .named(fieldInfo.getName()));
+        }
+        MessageType schema = new MessageType("Output", typesList);
+        ParquetWriteRunner<Object[]> writeRunner = (record, valueWriter) -> {
+            for (int i = 0; i < record.length; i++) {
+                valueWriter.write(this.fields.get(i).getName(), record[i]);
+            }
+        };
+        try {
+            writer = ParquetByteArrayWriter.buildWriter(schema, writeRunner);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public ByteArrayOutputStream encode(SinkData sinkData, Context context) {
+        int size = this.fields.size();
+        Object[] rowsInfo = new Object[size];
+        Arrays.fill(rowsInfo, "");
+        for (int i = 0; i < size; i++) {
+            String fieldData = sinkData.getField(this.fields.get(i).getName());
+            if (fieldData == null) {
+                continue;
+            }
+            rowsInfo[i] = fieldData;
+        }
+        try {
+            writer.write(rowsInfo);
+        } catch (Exception ignored) {
+
+        }
+        return writer.getByteArrayOutputStream();
+    }
+
+    @Override
+    public List<FieldInfo> getFields() {
+        return this.fields;
+    }
+    public byte[] mergeByteArray(List<ByteArrayOutputStream> list) {
+        if (list.isEmpty()) {
+            return null;
+        }
+        try {
+            this.writer.close(); // need firstly close
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return list.get(0).toByteArray();
+    }
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java
similarity index 58%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java
index 30619078ac..0e3e57b999 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetValueWriter.java
@@ -17,21 +17,7 @@
 
 package org.apache.inlong.sdk.transform.encode;
 
-import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+public interface ParquetValueWriter {
 
-public class SinkEncoderFactory {
-
-    public static CsvSinkEncoder createCsvEncoder(CsvSinkInfo csvSinkInfo) {
-        return new CsvSinkEncoder(csvSinkInfo);
-    }
-
-    public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) {
-        return new KvSinkEncoder(kvSinkInfo);
-    }
-
-    public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) {
-        return new MapSinkEncoder(mapSinkInfo);
-    }
+    void write(String name, Object value);
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java
similarity index 58%
copy from 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
copy to 
inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java
index 30619078ac..91c6fa590a 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetWriteRunner.java
@@ -17,21 +17,12 @@
 
 package org.apache.inlong.sdk.transform.encode;
 
-import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
-import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+public interface ParquetWriteRunner<T> {
 
-public class SinkEncoderFactory {
-
-    public static CsvSinkEncoder createCsvEncoder(CsvSinkInfo csvSinkInfo) {
-        return new CsvSinkEncoder(csvSinkInfo);
-    }
-
-    public static KvSinkEncoder createKvEncoder(KvSinkInfo kvSinkInfo) {
-        return new KvSinkEncoder(kvSinkInfo);
-    }
-
-    public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) {
-        return new MapSinkEncoder(mapSinkInfo);
-    }
+    /**
+     * Write the specified record into the Parquet row by the supplied writer.
+     * @param record data that needs to be written
+     * @param valueWriter parquet data writer
+     */
+    void doWrite(T record, ParquetValueWriter valueWriter);
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
index 30619078ac..0fa308162b 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoderFactory.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.transform.encode;
 import org.apache.inlong.sdk.transform.pojo.CsvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.KvSinkInfo;
 import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
 
 public class SinkEncoderFactory {
 
@@ -34,4 +35,8 @@ public class SinkEncoderFactory {
     public static MapSinkEncoder createMapEncoder(MapSinkInfo mapSinkInfo) {
         return new MapSinkEncoder(mapSinkInfo);
     }
+
+    public static ParquetSinkEncoder createParquetEncoder(ParquetSinkInfo 
parquetSinkInfo) {
+        return new ParquetSinkEncoder(parquetSinkInfo);
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java
new file mode 100644
index 0000000000..c54670e44a
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/ParquetSinkInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.pojo;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ParquetSinkInfo
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ParquetSinkInfo extends SinkInfo {
+
+    private List<FieldInfo> fields;
+
+    @JsonCreator
+    public ParquetSinkInfo(
+            @JsonProperty("charset") String charset,
+            @JsonProperty("fields") List<FieldInfo> fields) {
+        super(SinkInfo.PARQUET, charset);
+        if (fields != null) {
+            this.fields = fields;
+        } else {
+            this.fields = new ArrayList<>();
+        }
+    }
+
+    /**
+     * get fields
+     * @return the fields
+     */
+    @JsonProperty("fields")
+    public List<FieldInfo> getFields() {
+        return fields;
+    }
+
+    /**
+     * set fields
+     * @param fields the fields to set
+     */
+    public void setFields(List<FieldInfo> fields) {
+        this.fields = fields;
+    }
+
+}
diff --git 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
index 9c61c6b46c..3c976c1b4c 100644
--- 
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
+++ 
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/SinkInfo.java
@@ -44,6 +44,7 @@ public abstract class SinkInfo {
     public static final String CSV = "csv";
     public static final String KV = "kv";
     public static final String ES_MAP = "es_map";
+    public static final String PARQUET = "parquet";
 
     @JsonIgnore
     private String type;
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
index e99a3c83c4..3322d83199 100644
--- 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/AbstractProcessorTestBase.java
@@ -17,11 +17,26 @@
 
 package org.apache.inlong.sdk.transform.process.processor;
 
+import org.apache.inlong.sdk.transform.decode.ParquetInputByteArray;
 import org.apache.inlong.sdk.transform.pojo.FieldInfo;
 
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
+
 /**
  * AbstractProcessorTestBase
  * description: define static parameters for Processor tests
@@ -123,4 +138,38 @@ public abstract class AbstractProcessorTestBase {
         byte[] srcBytes = Base64.getDecoder().decode(srcString);
         return srcBytes;
     }
+
+    public static List<String> ParquetByteArray2CsvStr(byte[] parquetBytes) 
throws IOException {
+        InputFile inputFile = new ParquetInputByteArray(parquetBytes);
+        List<String> strRows = new ArrayList<>();
+        try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) {
+            ParquetMetadata footer = reader.getFooter();
+            MessageType schema = footer.getFileMetaData().getSchema();
+            int fieldSize = schema.getFields().size();
+            PageReadStore pages;
+
+            while ((pages = reader.readNextRowGroup()) != null) {
+                long rows = pages.getRowCount();
+
+                ColumnIOFactory factory = new ColumnIOFactory();
+                MessageColumnIO columnIO = factory.getColumnIO(schema);
+
+                RecordMaterializer<Group> recordMaterializer = new 
GroupRecordConverter(schema);
+
+                RecordReader<Group> recordReader = 
columnIO.getRecordReader(pages, recordMaterializer);
+
+                for (int i = 0; i < rows; i++) {
+                    Group group = recordReader.read();
+                    if (group != null) {
+                        StringBuilder builder = new StringBuilder();
+                        for (int j = 0; j < fieldSize; j++) {
+                            builder.append(group.getValueToString(j, 0) + "|");
+                        }
+                        strRows.add(builder.substring(0, builder.length() - 
1));
+                    }
+                }
+            }
+        }
+        return strRows;
+    }
 }
diff --git 
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java
 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java
new file mode 100644
index 0000000000..30e2d4f9e5
--- /dev/null
+++ 
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestJson2ParquetProcessor.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.transform.process.processor;
+
+import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
+import org.apache.inlong.sdk.transform.encode.ParquetSinkEncoder;
+import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
+import org.apache.inlong.sdk.transform.pojo.JsonSourceInfo;
+import org.apache.inlong.sdk.transform.pojo.ParquetSinkInfo;
+import org.apache.inlong.sdk.transform.pojo.TransformConfig;
+import org.apache.inlong.sdk.transform.process.TransformProcessor;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.List;
+
+public class TestJson2ParquetProcessor extends AbstractProcessorTestBase {
+
+    @Test
+    public void testJson2Parquet() throws Exception {
+        List<FieldInfo> fields;
+        JsonSourceInfo jsonSource;
+        ParquetSinkInfo parquetSinkInfo;
+        ParquetSinkEncoder parquetEncoder;
+        String transformSql;
+        TransformConfig config;
+        TransformProcessor<String, ByteArrayOutputStream> processor;
+        String srcString;
+        List<ByteArrayOutputStream> output;
+        List<String> result;
+        byte[] bytes;
+
+        fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg");
+        jsonSource = new JsonSourceInfo("UTF-8", "msgs");
+        parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+        parquetEncoder = 
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+        transformSql = "select 
$root.sid,$root.packageID,$child.msgTime,$child.msg from source";
+        config = new TransformConfig(transformSql);
+        // case1
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createJsonDecoder(jsonSource),
+                        parquetEncoder);
+        srcString = "{\n"
+                + "  \"sid\":\"value1\",\n"
+                + "  \"packageID\":\"value2\",\n"
+                + "  \"msgs\":[\n"
+                + "  {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+                + "  {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+                + "  ]\n"
+                + "}";
+        output = processor.transform(srcString, new HashMap<>());
+        bytes = parquetEncoder.mergeByteArray(output);
+        result = ParquetByteArray2CsvStr(bytes);
+        Assert.assertEquals(2, result.size());
+        Assert.assertEquals("value1|value2|1713243918000|value4", 
result.get(0));
+        Assert.assertEquals("value1|value2|1713243918000|v4", result.get(1));
+
+        fields = this.getTestFieldList("id", "itemId", "subItemId", "msg");
+        jsonSource = new JsonSourceInfo("UTF-8", "items");
+        parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+        parquetEncoder = 
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+        transformSql = "select 
$root.id,$child.itemId,$child.subItems(0).subItemId,$child.subItems(1).msg from 
source";
+        config = new TransformConfig(transformSql);
+        // case2
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createJsonDecoder(jsonSource),
+                        parquetEncoder);
+        srcString = "{\n"
+                + "  \"id\":\"value1\",\n"
+                + "  \"name\":\"value2\",\n"
+                + "  \"items\":[\n"
+                + "    {\"itemId\":\"item1\",\n"
+                + "     \"subItems\":[\n"
+                + "       {\"subItemId\":\"1001\", \"msg\":\"1001msg\"},\n"
+                + "       {\"subItemId\":\"1002\", \"msg\":\"1002msg\"}\n"
+                + "     ]\n"
+                + "    },\n"
+                + "    {\"itemId\":\"item2\",\n"
+                + "     \"subItems\":[\n"
+                + "       {\"subItemId\":\"2001\", \"msg\":\"2001msg\"},\n"
+                + "       {\"subItemId\":\"2002\", \"msg\":\"2002msg\"}\n"
+                + "     ]\n"
+                + "    }\n"
+                + "  ]\n"
+                + "}";
+        output = processor.transform(srcString, new HashMap<>());
+        bytes = parquetEncoder.mergeByteArray(output);
+        result = ParquetByteArray2CsvStr(bytes);
+        Assert.assertEquals(2, result.size());
+        Assert.assertEquals("value1|item1|1001|1002msg", result.get(0));
+        Assert.assertEquals("value1|item2|2001|2002msg", result.get(1));
+
+        fields = this.getTestFieldList("matrix(0,0)", "matrix(1,1)", 
"matrix(2,2)");
+        jsonSource = new JsonSourceInfo("UTF-8", "");
+        parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+        parquetEncoder = 
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+        transformSql = "select $root.matrix(0, 0), $root.matrix(1, 1), 
$root.matrix(2, 2) from source";
+        config = new TransformConfig(transformSql);
+        // case3
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createJsonDecoder(jsonSource),
+                        parquetEncoder);
+        srcString = "{\n"
+                + "  \"matrix\": [\n"
+                + "    [1, 2, 3],\n"
+                + "    [4, 5, 6],\n"
+                + "    [7, 8, 9]\n"
+                + "  ]\n"
+                + "}";
+        output = processor.transform(srcString, new HashMap<>());
+        bytes = parquetEncoder.mergeByteArray(output);
+        result = ParquetByteArray2CsvStr(bytes);
+        Assert.assertEquals(1, result.size());
+        Assert.assertEquals("1|5|9", result.get(0));
+
+        fields = this.getTestFieldList("department_name", "course_id", "num");
+        jsonSource = new JsonSourceInfo("UTF-8", "");
+        parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+        parquetEncoder = 
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+        transformSql =
+                "select $root.departments(0).name, 
$root.departments(0).courses(0,1).courseId, 
sqrt($root.departments(0).courses(0,1).courseId - 2) from source";
+        config = new TransformConfig(transformSql);
+        // case4
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createJsonDecoder(jsonSource),
+                        parquetEncoder);
+        srcString = "{\n" +
+                "  \"departments\": [\n" +
+                "    {\n" +
+                "      \"name\": \"Mathematics\",\n" +
+                "      \"courses\": [\n" +
+                "        [\n" +
+                "          {\"courseId\": \"101\", \"title\": \"Calculus 
I\"},\n" +
+                "          {\"courseId\": \"102\", \"title\": \"Linear 
Algebra\"}\n" +
+                "        ],\n" +
+                "        [\n" +
+                "          {\"courseId\": \"201\", \"title\": \"Calculus 
II\"},\n" +
+                "          {\"courseId\": \"202\", \"title\": \"Abstract 
Algebra\"}\n" +
+                "        ]\n" +
+                "      ]\n" +
+                "    }\n" +
+                "  ]\n" +
+                "}";
+        output = processor.transform(srcString, new HashMap<>());
+        bytes = parquetEncoder.mergeByteArray(output);
+        result = ParquetByteArray2CsvStr(bytes);
+        Assert.assertEquals(1, result.size());
+        Assert.assertEquals("Mathematics|102|10.0", result.get(0));
+    }
+
+    @Test
+    public void testJson2ParquetForOne() throws Exception {
+        List<FieldInfo> fields;
+        JsonSourceInfo jsonSource;
+        ParquetSinkInfo parquetSinkInfo;
+        ParquetSinkEncoder parquetEncoder;
+        String transformSql;
+        TransformConfig config;
+        TransformProcessor<String, ByteArrayOutputStream> processor;
+        String srcString;
+        List<ByteArrayOutputStream> output;
+        List<String> result;
+        byte[] bytes;
+
+        fields = this.getTestFieldList("sid", "packageID", "msgTime", "msg");
+        jsonSource = new JsonSourceInfo("UTF-8", "");
+        parquetSinkInfo = new ParquetSinkInfo("UTF-8", fields);
+        parquetEncoder = 
SinkEncoderFactory.createParquetEncoder(parquetSinkInfo);
+        transformSql = "select 
$root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source";
+        config = new TransformConfig(transformSql);
+        // case1
+        processor = TransformProcessor
+                .create(config, 
SourceDecoderFactory.createJsonDecoder(jsonSource),
+                        parquetEncoder);
+        srcString = "{\n"
+                + "  \"sid\":\"value1\",\n"
+                + "  \"packageID\":\"value2\",\n"
+                + "  \"msgs\":[\n"
+                + "  {\"msg\":\"value4\",\"msgTime\":1713243918000},\n"
+                + "  {\"msg\":\"v4\",\"msgTime\":1713243918000}\n"
+                + "  ]\n"
+                + "}";
+        output = processor.transform(srcString, new HashMap<>());
+        bytes = parquetEncoder.mergeByteArray(output);
+        result = ParquetByteArray2CsvStr(bytes);
+        Assert.assertEquals(1, result.size());
+        Assert.assertEquals("value1|value2|1713243918000|value4", 
result.get(0));
+    }
+}


Reply via email to