This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 245307c621 [INLONG-9570][Sort] Support rowdata way of sort InLong 
message pb format (#9664)
245307c621 is described below

commit 245307c62155421cf1296dbcb0b7524543d7bff7
Author: baomingyu <baomingy...@163.com>
AuthorDate: Sun Feb 4 20:00:51 2024 +0800

    [INLONG-9570][Sort] Support rowdata way of sort InLong message pb format 
(#9664)
---
 .../format-inlongmsg-rowdata-pb/pom.xml            |  93 ++++++++
 .../inlongmsgpb/InLongMsgPbDecodingFormat.java     | 238 +++++++++++++++++++++
 .../InLongMsgPbDeserializationSchema.java          | 151 +++++++++++++
 .../inlongmsgpb/InLongMsgPbFormatFactory.java      | 102 +++++++++
 .../formats/inlongmsgpb/InLongMsgPbOptions.java    |  65 ++++++
 .../org.apache.flink.table.factories.Factory       |  18 ++
 inlong-sort/sort-formats/format-rowdata/pom.xml    |   1 +
 7 files changed, 668 insertions(+)

diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/pom.xml
new file mode 100644
index 0000000000..78b3a6ece7
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>format-rowdata</artifactId>
+        <version>1.11.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>sort-format-inlongmsg-rowdata-pb</artifactId>
+    <name>Apache InLong - Sort Format-InLongMsg-RowData-PB</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+        <!-- core dependencies -->
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-rowdata-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sdk-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            
<artifactId>sort-flink-dependencies-${sort.flink.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-csv</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDecodingFormat.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDecodingFormat.java
new file mode 100644
index 0000000000..b07a88b64b
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDecodingFormat.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgpb;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk;
+import org.apache.inlong.sdk.commons.utils.GzipUtils;
+import 
org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbDeserializationSchema.InLongPbMsgDecompressor;
+import 
org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbDeserializationSchema.MetadataConverter;
+
+import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource.Context;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xerial.snappy.Snappy;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * InLongMsg pb format decoding format.
+ */
+public class InLongMsgPbDecodingFormat implements 
DecodingFormat<DeserializationSchema<RowData>> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(InLongMsgPbDecodingFormat.class);
+
+    private final String innerFormatMetaPrefix;
+
+    private final DecodingFormat<DeserializationSchema<RowData>> 
innerDecodingFormat;
+
+    private List<String> metadataKeys;
+
+    private final boolean ignoreErrors;
+
+    private final String decompressType;
+
+    private final boolean ignoreTrailingUnmappable;
+
+    public InLongMsgPbDecodingFormat(
+            DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat,
+            String innerFormatMetaPrefix,
+            boolean ignoreErrors,
+            boolean ignoreTrailingUnmappable,
+            String decompressType) {
+        this.innerDecodingFormat = innerDecodingFormat;
+        this.innerFormatMetaPrefix = innerFormatMetaPrefix;
+        this.metadataKeys = Collections.emptyList();
+        this.ignoreErrors = ignoreErrors;
+        this.ignoreTrailingUnmappable = ignoreTrailingUnmappable;
+        this.decompressType = decompressType;
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(Context 
context, DataType physicalDataType) {
+        final MetadataConverter[] metadataConverters = 
Arrays.stream(ReadableMetadata.values())
+                .filter(metadata -> metadataKeys.contains(metadata.key))
+                .map(metadata -> metadata.converter)
+                .toArray(MetadataConverter[]::new);
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k -> Stream.of(ReadableMetadata.values())
+                                        .filter(rm -> rm.key.equals(k))
+                                        .findFirst()
+                                        
.orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, 
metadataFields);
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+        final InLongPbMsgDecompressor decompressor = 
getDecompressor(decompressType);
+
+        DeserializationSchema<RowData> innerSchema =
+                innerDecodingFormat.createRuntimeDecoder(context, 
physicalDataType);
+        if (innerSchema instanceof CsvRowDataDeserializationSchema && 
ignoreTrailingUnmappable) {
+            this.makeCsvInnerFormatIgnoreTrailingUnmappable(innerSchema);
+        }
+        return new InLongMsgPbDeserializationSchema(
+                innerSchema,
+                metadataConverters,
+                producedTypeInfo,
+                decompressor,
+                ignoreErrors);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+        // add inner format metadata with prefix
+        innerDecodingFormat
+                .listReadableMetadata()
+                .forEach((key, value) -> 
metadataMap.putIfAbsent(innerFormatMetaPrefix + key, value));
+
+        // add format metadata
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, 
m.dataType));
+
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys) {
+        // separate inner format and format metadata
+        final List<String> innerFormatMetadataKeys =
+                metadataKeys.stream()
+                        .filter(k -> k.startsWith(innerFormatMetaPrefix))
+                        .collect(Collectors.toList());
+        final List<String> formatMetadataKeys = new ArrayList<>(metadataKeys);
+        formatMetadataKeys.removeAll(innerFormatMetadataKeys);
+        this.metadataKeys = formatMetadataKeys;
+
+        // push down inner format metadata
+        final Map<String, DataType> formatMetadata = 
innerDecodingFormat.listReadableMetadata();
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    innerFormatMetadataKeys.stream()
+                            .map(k -> 
k.substring(innerFormatMetaPrefix.length()))
+                            .collect(Collectors.toList());
+            
innerDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return innerDecodingFormat.getChangelogMode();
+    }
+
+    /** only support gzip and snappy for now */
+    private InLongPbMsgDecompressor getDecompressor(String decompressType) {
+        switch (decompressType.toLowerCase(Locale.ROOT)) {
+            case "gzip":
+                return GzipUtils::decompress;
+            case "snappy":
+                return Snappy::uncompress;
+            case "no-compress":
+            default:
+                return unDecompress -> unDecompress;
+        }
+    }
+
+    /**
+     * Use reflection to make csv format ignore tailing unmappable.
+     */
+    private void 
makeCsvInnerFormatIgnoreTrailingUnmappable(DeserializationSchema<RowData> 
innerSchema) {
+        try {
+            Field readerField = 
CsvRowDataDeserializationSchema.class.getDeclaredField("objectReader");
+            readerField.setAccessible(true);
+            ObjectReader oldReader = (ObjectReader) 
readerField.get(innerSchema);
+
+            Field schemaField = ObjectReader.class.getDeclaredField("_schema");
+            schemaField.setAccessible(true);
+            CsvSchema oldSchema = (CsvSchema) schemaField.get(oldReader);
+
+            ObjectReader newReader = new CsvMapper()
+                    .enable(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE)
+                    .readerFor(JsonNode.class)
+                    .with(oldSchema);
+            readerField.set(innerSchema, newReader);
+        } catch (Throwable t) {
+            log.error("failed to make csv inner format to ignore trailing 
unmappable, ex is ", t);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+
+    enum ReadableMetadata {
+
+        CREATE_TIME(
+                "create-time",
+                DataTypes.STRING(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ProxySdk.MessageObj body) {
+                        String createTime = 
DateFormatUtils.format(body.getMsgTime(), "yyyyMMddHH");
+                        return StringData.fromString(createTime);
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        ReadableMetadata(String key, DataType dataType, MetadataConverter 
converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
new file mode 100644
index 0000000000..ce446bf13f
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgpb;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
+
+import com.google.common.base.Objects;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * InLongMsg pb format deserialization schema.
+ * Used to deserialize {@link MessageObj} msg.
+ */
+public class InLongMsgPbDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+    /** Inner {@link DeserializationSchema} to deserialize {@link InLongMsg} 
inner packaged
+     *  data buffer message */
+    private final DeserializationSchema<RowData> deserializationSchema;
+
+    /** {@link MetadataConverter} of how to produce metadata from {@link 
InLongMsg}. */
+    private final MetadataConverter[] metadataConverters;
+
+    /** {@link TypeInformation} of the produced {@link RowData} (physical + 
meta data). */
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    /** status of error */
+    private final boolean ignoreErrors;
+
+    /** decompressor */
+    private final InLongPbMsgDecompressor decompressor;
+
+    public InLongMsgPbDeserializationSchema(
+            DeserializationSchema<RowData> schema,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> producedTypeInfo,
+            InLongPbMsgDecompressor decompressor,
+            boolean ignoreErrors) {
+        this.deserializationSchema = schema;
+        this.metadataConverters = metadataConverters;
+        this.producedTypeInfo = producedTypeInfo;
+        this.decompressor = decompressor;
+        this.ignoreErrors = ignoreErrors;
+    }
+
+    @Override
+    public RowData deserialize(byte[] bytes) throws IOException {
+        throw new RuntimeException("Unsupported method, "
+                + "Please invoke DeserializationSchema#deserialize(byte[], 
Collector<RowData>) instead.");
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
+        byte[] decompressed = decompressor.decompress(message);
+        MessageObjs msgObjs = MessageObjs.parseFrom(decompressed);
+        List<MessageObj> msgList = msgObjs.getMsgsList();
+        for (MessageObj msg : msgList) {
+            RowData row = 
deserializationSchema.deserialize(msg.getBody().toByteArray());
+            this.emitRow(msg, (GenericRowData) row, out);
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData rowData) {
+        return false;
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof InLongMsgPbDeserializationSchema)) {
+            return false;
+        }
+        InLongMsgPbDeserializationSchema that = 
(InLongMsgPbDeserializationSchema) o;
+        return ignoreErrors == that.ignoreErrors
+                && 
Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()),
+                        
Arrays.stream(that.metadataConverters).collect(Collectors.toList()))
+                && Objects.equal(deserializationSchema, 
that.deserializationSchema)
+                && Objects.equal(decompressor, that.decompressor)
+                && Objects.equal(producedTypeInfo, that.producedTypeInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(deserializationSchema, metadataConverters, 
producedTypeInfo,
+                ignoreErrors, decompressor);
+    }
+
+    interface MetadataConverter extends Serializable {
+
+        Object read(MessageObj body);
+    }
+
+    interface InLongPbMsgDecompressor extends Serializable {
+
+        byte[] decompress(byte[] message) throws IOException;
+    }
+
+    /** add metadata column */
+    private void emitRow(MessageObj message, GenericRowData physicalRow, 
Collector<RowData> out) {
+        if (metadataConverters.length == 0) {
+            out.collect(physicalRow);
+            return;
+        }
+        final int physicalArity = physicalRow.getArity();
+        final int metadataArity = metadataConverters.length;
+        final GenericRowData producedRow =
+                new GenericRowData(physicalRow.getRowKind(), physicalArity + 
metadataArity);
+        for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
+            producedRow.setField(physicalPos, 
physicalRow.getField(physicalPos));
+        }
+        for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+            producedRow.setField(
+                    physicalArity + metadataPos, 
metadataConverters[metadataPos].read(message));
+        }
+        out.collect(producedRow);
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbFormatFactory.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbFormatFactory.java
new file mode 100644
index 0000000000..6b14b6ac0c
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbFormatFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgpb;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.DECOMPRESS_TYPE;
+import static 
org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.IGNORE_PARSE_ERRORS;
+import static 
org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.IGNORE_TRAILING_UNMAPPABLE;
+import static 
org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.INNER_FORMAT;
+import static 
org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.validateDecodingFormatOptions;
+
+/**
+ * factory class for inLong msg pb format
+ */
+public final class InLongMsgPbFormatFactory
+        implements
+            DeserializationFormatFactory,
+            SerializationFormatFactory {
+
+    public static final String IDENTIFIER = "inlong-msg-pb";
+
+    public static final String INLONG_PREFIX = "inlong-msg-pb.";
+
+    @Override
+    public DecodingFormat<DeserializationSchema<RowData>> 
createDecodingFormat(Context context,
+            ReadableConfig formatOptions) {
+        validateDecodingFormatOptions(formatOptions);
+
+        final DeserializationFormatFactory innerFactory = 
FactoryUtil.discoverFactory(
+                context.getClassLoader(),
+                DeserializationFormatFactory.class,
+                formatOptions.get(INNER_FORMAT));
+        Configuration allOptions = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+        String innerFormatMetaPrefix = formatOptions.get(INNER_FORMAT) + ".";
+        String innerFormatPrefix = INLONG_PREFIX + innerFormatMetaPrefix;
+        DecodingFormat<DeserializationSchema<RowData>> innerFormat =
+                innerFactory.createDecodingFormat(context, new 
DelegatingConfiguration(allOptions, innerFormatPrefix));
+        boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+        boolean ignoreTrailingUnmappable = 
formatOptions.get(IGNORE_TRAILING_UNMAPPABLE);
+        String decompressType = formatOptions.get(DECOMPRESS_TYPE);
+        return new InLongMsgPbDecodingFormat(innerFormat, 
innerFormatMetaPrefix,
+                ignoreErrors, ignoreTrailingUnmappable, decompressType);
+    }
+
+    @Override
+    public EncodingFormat<SerializationSchema<RowData>> 
createEncodingFormat(Context context,
+            ReadableConfig formatOptions) {
+        throw new RuntimeException("Do not support inlong pb format 
serialize.");
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(INNER_FORMAT);
+        options.add(DECOMPRESS_TYPE);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(IGNORE_PARSE_ERRORS);
+        options.add(IGNORE_TRAILING_UNMAPPABLE);
+        return options;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbOptions.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbOptions.java
new file mode 100644
index 0000000000..46545c2825
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbOptions.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgpb;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+public class InLongMsgPbOptions {
+
+    private InLongMsgPbOptions() {
+    }
+
+    public static final ConfigOption<String> INNER_FORMAT =
+            ConfigOptions.key("inner.format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Defines the format identifier for 
encoding attr data. \n"
+                            + "The identifier is used to discover a suitable 
format factory.");
+
+    public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS =
+            ConfigOptions.key("ignore-parse-errors")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Optional flag to skip fields and rows 
with parse errors instead of failing;\n"
+                            + "fields are set to null in case of errors");
+
+    public static final ConfigOption<String> DECOMPRESS_TYPE =
+            ConfigOptions.key("decompress.type")
+                    .stringType()
+                    .defaultValue("gzip")
+                    .withDescription("Specify the decompress type of inlong pb 
message. \n"
+                            + "The default type is Gzip");
+
+    public static final ConfigOption<Boolean> IGNORE_TRAILING_UNMAPPABLE =
+            ConfigOptions.key("ignore-trailing-unmappable")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Allows the case that real size exceeds 
the expected size.\n "
+                            + "The extra column will be skipped");
+
+    public static void validateDecodingFormatOptions(ReadableConfig config) {
+        String innerFormat = config.get(INNER_FORMAT);
+        if (innerFormat == null) {
+            throw new ValidationException(
+                    INNER_FORMAT.key() + " shouldn't be null.");
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000..342a7adaeb
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbFormatFactory
diff --git a/inlong-sort/sort-formats/format-rowdata/pom.xml 
b/inlong-sort/sort-formats/format-rowdata/pom.xml
index 12e23654e5..0334976986 100644
--- a/inlong-sort/sort-formats/format-rowdata/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/pom.xml
@@ -36,6 +36,7 @@
         <module>format-rowdata-json</module>
         <module>format-inlongmsg-rowdata-base</module>
         <module>format-inlongmsg-rowdata-binlog</module>
+        <module>format-inlongmsg-rowdata-pb</module>
     </modules>
 
     <properties>

Reply via email to