This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 245307c621 [INLONG-9570][Sort] Support rowdata way of sort InLong message pb format (#9664) 245307c621 is described below commit 245307c62155421cf1296dbcb0b7524543d7bff7 Author: baomingyu <baomingy...@163.com> AuthorDate: Sun Feb 4 20:00:51 2024 +0800 [INLONG-9570][Sort] Support rowdata way of sort InLong message pb format (#9664) --- .../format-inlongmsg-rowdata-pb/pom.xml | 93 ++++++++ .../inlongmsgpb/InLongMsgPbDecodingFormat.java | 238 +++++++++++++++++++++ .../InLongMsgPbDeserializationSchema.java | 151 +++++++++++++ .../inlongmsgpb/InLongMsgPbFormatFactory.java | 102 +++++++++ .../formats/inlongmsgpb/InLongMsgPbOptions.java | 65 ++++++ .../org.apache.flink.table.factories.Factory | 18 ++ inlong-sort/sort-formats/format-rowdata/pom.xml | 1 + 7 files changed, 668 insertions(+) diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/pom.xml b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/pom.xml new file mode 100644 index 0000000000..78b3a6ece7 --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/pom.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>format-rowdata</artifactId> + <version>1.11.0-SNAPSHOT</version> + </parent> + + <artifactId>sort-format-inlongmsg-rowdata-pb</artifactId> + <name>Apache InLong - Sort Format-InLongMsg-RowData-PB</name> + + <properties> + <inlong.root.dir>${project.parent.parent.parent.parent.basedir}</inlong.root.dir> + </properties> + + <dependencies> + <!-- core dependencies --> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-format-rowdata-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sdk-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-flink-dependencies-${sort.flink.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-csv</artifactId> + <scope>provided</scope> + </dependency> + + </dependencies> +</project> diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDecodingFormat.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDecodingFormat.java new file mode 100644 index 0000000000..b07a88b64b --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDecodingFormat.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgpb; + +import org.apache.inlong.sdk.commons.protocol.ProxySdk; +import org.apache.inlong.sdk.commons.utils.GzipUtils; +import org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbDeserializationSchema.InLongPbMsgDecompressor; +import org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbDeserializationSchema.MetadataConverter; + +import org.apache.commons.lang3.time.DateFormatUtils; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource.Context; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xerial.snappy.Snappy; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * InLongMsg pb format decoding format. + */ +public class InLongMsgPbDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> { + + private static final Logger log = LoggerFactory.getLogger(InLongMsgPbDecodingFormat.class); + + private final String innerFormatMetaPrefix; + + private final DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat; + + private List<String> metadataKeys; + + private final boolean ignoreErrors; + + private final String decompressType; + + private final boolean ignoreTrailingUnmappable; + + public InLongMsgPbDecodingFormat( + DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat, + String innerFormatMetaPrefix, + boolean ignoreErrors, + boolean ignoreTrailingUnmappable, + String decompressType) { + this.innerDecodingFormat = innerDecodingFormat; + this.innerFormatMetaPrefix = innerFormatMetaPrefix; + this.metadataKeys = Collections.emptyList(); + this.ignoreErrors = ignoreErrors; + this.ignoreTrailingUnmappable = ignoreTrailingUnmappable; + this.decompressType = decompressType; + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder(Context context, DataType physicalDataType) { + final MetadataConverter[] metadataConverters = Arrays.stream(ReadableMetadata.values()) + .filter(metadata -> metadataKeys.contains(metadata.key)) + .map(metadata -> metadata.converter) + .toArray(MetadataConverter[]::new); + final List<ReadableMetadata> readableMetadata = + metadataKeys.stream() + .map( + k -> Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .collect(Collectors.toList()); + final List<DataTypes.Field> metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + final DataType producedDataType = + DataTypeUtils.appendRowFields(physicalDataType, metadataFields); + final TypeInformation<RowData> producedTypeInfo = + context.createTypeInformation(producedDataType); + final InLongPbMsgDecompressor decompressor = getDecompressor(decompressType); + + DeserializationSchema<RowData> innerSchema = + innerDecodingFormat.createRuntimeDecoder(context, physicalDataType); + if (innerSchema instanceof CsvRowDataDeserializationSchema && ignoreTrailingUnmappable) { + this.makeCsvInnerFormatIgnoreTrailingUnmappable(innerSchema); + } + return new InLongMsgPbDeserializationSchema( + innerSchema, + metadataConverters, + producedTypeInfo, + decompressor, + ignoreErrors); + } + + @Override + public Map<String, DataType> listReadableMetadata() { + final Map<String, DataType> metadataMap = new LinkedHashMap<>(); + + // add inner format metadata with prefix + innerDecodingFormat + .listReadableMetadata() + .forEach((key, value) -> metadataMap.putIfAbsent(innerFormatMetaPrefix + key, value)); + + // add format metadata + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType)); + + return metadataMap; + } + + @Override + public void applyReadableMetadata(List<String> metadataKeys) { + // separate inner format and format metadata + final List<String> innerFormatMetadataKeys = + metadataKeys.stream() + .filter(k -> k.startsWith(innerFormatMetaPrefix)) + .collect(Collectors.toList()); + final List<String> formatMetadataKeys = new ArrayList<>(metadataKeys); + formatMetadataKeys.removeAll(innerFormatMetadataKeys); + this.metadataKeys = formatMetadataKeys; + + // push down inner format metadata + final Map<String, DataType> formatMetadata = innerDecodingFormat.listReadableMetadata(); + if (formatMetadata.size() > 0) { + final List<String> requestedFormatMetadataKeys = + innerFormatMetadataKeys.stream() + .map(k -> k.substring(innerFormatMetaPrefix.length())) + .collect(Collectors.toList()); + innerDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys); + } + } + + @Override + public ChangelogMode getChangelogMode() { + return innerDecodingFormat.getChangelogMode(); + } + + /** only support gzip and snappy for now */ + private InLongPbMsgDecompressor getDecompressor(String decompressType) { + switch (decompressType.toLowerCase(Locale.ROOT)) { + case "gzip": + return GzipUtils::decompress; + case "snappy": + return Snappy::uncompress; + case "no-compress": + default: + return unDecompress -> unDecompress; + } + } + + /** + * Use reflection to make csv format ignore tailing unmappable. + */ + private void makeCsvInnerFormatIgnoreTrailingUnmappable(DeserializationSchema<RowData> innerSchema) { + try { + Field readerField = CsvRowDataDeserializationSchema.class.getDeclaredField("objectReader"); + readerField.setAccessible(true); + ObjectReader oldReader = (ObjectReader) readerField.get(innerSchema); + + Field schemaField = ObjectReader.class.getDeclaredField("_schema"); + schemaField.setAccessible(true); + CsvSchema oldSchema = (CsvSchema) schemaField.get(oldReader); + + ObjectReader newReader = new CsvMapper() + .enable(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE) + .readerFor(JsonNode.class) + .with(oldSchema); + readerField.set(innerSchema, newReader); + } catch (Throwable t) { + log.error("failed to make csv inner format to ignore trailing unmappable, ex is ", t); + } + } + + // -------------------------------------------------------------------------------------------- + // Metadata handling + // -------------------------------------------------------------------------------------------- + + enum ReadableMetadata { + + CREATE_TIME( + "create-time", + DataTypes.STRING(), + new MetadataConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object read(ProxySdk.MessageObj body) { + String createTime = DateFormatUtils.format(body.getMsgTime(), "yyyyMMddHH"); + return StringData.fromString(createTime); + } + }); + + final String key; + + final DataType dataType; + + final MetadataConverter converter; + + ReadableMetadata(String key, DataType dataType, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.converter = converter; + } + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java new file mode 100644 index 0000000000..ce446bf13f --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgpb; + +import org.apache.inlong.common.msg.InLongMsg; +import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj; +import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs; + +import com.google.common.base.Objects; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * InLongMsg pb format deserialization schema. + * Used to deserialize {@link MessageObj} msg. + */ +public class InLongMsgPbDeserializationSchema implements DeserializationSchema<RowData> { + + /** Inner {@link DeserializationSchema} to deserialize {@link InLongMsg} inner packaged + * data buffer message */ + private final DeserializationSchema<RowData> deserializationSchema; + + /** {@link MetadataConverter} of how to produce metadata from {@link InLongMsg}. */ + private final MetadataConverter[] metadataConverters; + + /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */ + private final TypeInformation<RowData> producedTypeInfo; + + /** status of error */ + private final boolean ignoreErrors; + + /** decompressor */ + private final InLongPbMsgDecompressor decompressor; + + public InLongMsgPbDeserializationSchema( + DeserializationSchema<RowData> schema, + MetadataConverter[] metadataConverters, + TypeInformation<RowData> producedTypeInfo, + InLongPbMsgDecompressor decompressor, + boolean ignoreErrors) { + this.deserializationSchema = schema; + this.metadataConverters = metadataConverters; + this.producedTypeInfo = producedTypeInfo; + this.decompressor = decompressor; + this.ignoreErrors = ignoreErrors; + } + + @Override + public RowData deserialize(byte[] bytes) throws IOException { + throw new RuntimeException("Unsupported method, " + + "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead."); + } + + @Override + public void deserialize(byte[] message, Collector<RowData> out) throws IOException { + byte[] decompressed = decompressor.decompress(message); + MessageObjs msgObjs = MessageObjs.parseFrom(decompressed); + List<MessageObj> msgList = msgObjs.getMsgsList(); + for (MessageObj msg : msgList) { + RowData row = deserializationSchema.deserialize(msg.getBody().toByteArray()); + this.emitRow(msg, (GenericRowData) row, out); + } + } + + @Override + public boolean isEndOfStream(RowData rowData) { + return false; + } + + @Override + public TypeInformation<RowData> getProducedType() { + return producedTypeInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof InLongMsgPbDeserializationSchema)) { + return false; + } + InLongMsgPbDeserializationSchema that = (InLongMsgPbDeserializationSchema) o; + return ignoreErrors == that.ignoreErrors + && Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()), + Arrays.stream(that.metadataConverters).collect(Collectors.toList())) + && Objects.equal(deserializationSchema, that.deserializationSchema) + && Objects.equal(decompressor, that.decompressor) + && Objects.equal(producedTypeInfo, that.producedTypeInfo); + } + + @Override + public int hashCode() { + return Objects.hashCode(deserializationSchema, metadataConverters, producedTypeInfo, + ignoreErrors, decompressor); + } + + interface MetadataConverter extends Serializable { + + Object read(MessageObj body); + } + + interface InLongPbMsgDecompressor extends Serializable { + + byte[] decompress(byte[] message) throws IOException; + } + + /** add metadata column */ + private void emitRow(MessageObj message, GenericRowData physicalRow, Collector<RowData> out) { + if (metadataConverters.length == 0) { + out.collect(physicalRow); + return; + } + final int physicalArity = physicalRow.getArity(); + final int metadataArity = metadataConverters.length; + final GenericRowData producedRow = + new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity); + for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) { + producedRow.setField(physicalPos, physicalRow.getField(physicalPos)); + } + for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) { + producedRow.setField( + physicalArity + metadataPos, metadataConverters[metadataPos].read(message)); + } + out.collect(producedRow); + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbFormatFactory.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbFormatFactory.java new file mode 100644 index 0000000000..6b14b6ac0c --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbFormatFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgpb; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory.Context; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.DECOMPRESS_TYPE; +import static org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.IGNORE_PARSE_ERRORS; +import static org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.IGNORE_TRAILING_UNMAPPABLE; +import static org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.INNER_FORMAT; +import static org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbOptions.validateDecodingFormatOptions; + +/** + * factory class for inLong msg pb format + */ +public final class InLongMsgPbFormatFactory + implements + DeserializationFormatFactory, + SerializationFormatFactory { + + public static final String IDENTIFIER = "inlong-msg-pb"; + + public static final String INLONG_PREFIX = "inlong-msg-pb."; + + @Override + public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(Context context, + ReadableConfig formatOptions) { + validateDecodingFormatOptions(formatOptions); + + final DeserializationFormatFactory innerFactory = FactoryUtil.discoverFactory( + context.getClassLoader(), + DeserializationFormatFactory.class, + formatOptions.get(INNER_FORMAT)); + Configuration allOptions = Configuration.fromMap(context.getCatalogTable().getOptions()); + String innerFormatMetaPrefix = formatOptions.get(INNER_FORMAT) + "."; + String innerFormatPrefix = INLONG_PREFIX + innerFormatMetaPrefix; + DecodingFormat<DeserializationSchema<RowData>> innerFormat = + innerFactory.createDecodingFormat(context, new DelegatingConfiguration(allOptions, innerFormatPrefix)); + boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS); + boolean ignoreTrailingUnmappable = formatOptions.get(IGNORE_TRAILING_UNMAPPABLE); + String decompressType = formatOptions.get(DECOMPRESS_TYPE); + return new InLongMsgPbDecodingFormat(innerFormat, innerFormatMetaPrefix, + ignoreErrors, ignoreTrailingUnmappable, decompressType); + } + + @Override + public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(Context context, + ReadableConfig formatOptions) { + throw new RuntimeException("Do not support inlong pb format serialize."); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(INNER_FORMAT); + options.add(DECOMPRESS_TYPE); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(IGNORE_PARSE_ERRORS); + options.add(IGNORE_TRAILING_UNMAPPABLE); + return options; + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbOptions.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbOptions.java new file mode 100644 index 0000000000..46545c2825 --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbOptions.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgpb; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.ValidationException; + +public class InLongMsgPbOptions { + + private InLongMsgPbOptions() { + } + + public static final ConfigOption<String> INNER_FORMAT = + ConfigOptions.key("inner.format") + .stringType() + .noDefaultValue() + .withDescription("Defines the format identifier for encoding attr data. \n" + + "The identifier is used to discover a suitable format factory."); + + public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = + ConfigOptions.key("ignore-parse-errors") + .booleanType() + .defaultValue(false) + .withDescription("Optional flag to skip fields and rows with parse errors instead of failing;\n" + + "fields are set to null in case of errors"); + + public static final ConfigOption<String> DECOMPRESS_TYPE = + ConfigOptions.key("decompress.type") + .stringType() + .defaultValue("gzip") + .withDescription("Specify the decompress type of inlong pb message. \n" + + "The default type is Gzip"); + + public static final ConfigOption<Boolean> IGNORE_TRAILING_UNMAPPABLE = + ConfigOptions.key("ignore-trailing-unmappable") + .booleanType() + .defaultValue(true) + .withDescription("Allows the case that real size exceeds the expected size.\n " + + "The extra column will be skipped"); + + public static void validateDecodingFormatOptions(ReadableConfig config) { + String innerFormat = config.get(INNER_FORMAT); + if (innerFormat == null) { + throw new ValidationException( + INNER_FORMAT.key() + " shouldn't be null."); + } + } +} diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..342a7adaeb --- /dev/null +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.inlong.sort.formats.inlongmsgpb.InLongMsgPbFormatFactory diff --git a/inlong-sort/sort-formats/format-rowdata/pom.xml b/inlong-sort/sort-formats/format-rowdata/pom.xml index 12e23654e5..0334976986 100644 --- a/inlong-sort/sort-formats/format-rowdata/pom.xml +++ b/inlong-sort/sort-formats/format-rowdata/pom.xml @@ -36,6 +36,7 @@ <module>format-rowdata-json</module> <module>format-inlongmsg-rowdata-base</module> <module>format-inlongmsg-rowdata-binlog</module> + <module>format-inlongmsg-rowdata-pb</module> </modules> <properties>