This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 489529262 [INLONG-6548][Sort] Optimize metadata field naming for format of canal-json (#6549) 489529262 is described below commit 4895292623c22a1d268bc8ce25366143bcd18005 Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Wed Nov 16 11:08:21 2022 +0800 [INLONG-6548][Sort] Optimize metadata field naming for format of canal-json (#6549) --- .../protocol/node/extract/KafkaExtractNode.java | 2 +- .../sort/protocol/node/load/KafkaLoadNode.java | 8 +- .../node/extract/KafkaExtractNodeTest.java | 3 +- .../sort/protocol/node/load/KafkaLoadNodeTest.java | 2 +- .../canal/CanalJsonEnhancedDecodingFormat.java | 19 ++ .../CanalJsonEnhancedDeserializationSchema.java | 260 ++++++++++++--------- .../canal/CanalJsonEnhancedEncodingFormat.java | 19 ++ .../CanalJsonEnhancedSerializationSchema.java | 52 ++--- 8 files changed, 217 insertions(+), 148 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java index 02da2bef3..fb6693548 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java @@ -215,7 +215,7 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad metadataKey = "value.event-timestamp"; break; case OP_TYPE: - metadataKey = "value.op-type"; + metadataKey = "value.type"; break; case IS_DDL: metadataKey = "value.is-ddl"; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java index be326b478..26fa424ca 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java @@ -224,7 +224,7 @@ public class KafkaLoadNode extends LoadNode implements InlongMetric, Metadata, S metadataKey = "value.event-timestamp"; break; case OP_TYPE: - metadataKey = "value.op-type"; + metadataKey = "value.type"; break; case DATA: case DATA_CANAL: @@ -257,8 +257,8 @@ public class KafkaLoadNode extends LoadNode implements InlongMetric, Metadata, S @Override public Set<MetaField> supportedMetaFields() { return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.OP_TYPE, - MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS, - MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, MetaField.BATCH_ID, - MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA); + MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS, + MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, MetaField.BATCH_ID, + MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA); } } diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java index 661d6f547..1d41c96fd 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java @@ -34,7 +34,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - import static org.junit.Assert.assertEquals; /** @@ -86,7 +85,7 @@ public class KafkaExtractNodeTest extends SerializeBaseTest<KafkaExtractNode> { formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()"); formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'value.table'"); formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'value.database'"); - formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.op-type'"); + formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.type'"); formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'value.event-timestamp'"); formatMap.put(MetaField.IS_DDL, "BOOLEAN METADATA FROM 'value.is-ddl'"); formatMap.put(MetaField.TS, "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'"); diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java index a9f495c28..da58b1ac9 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java @@ -54,7 +54,7 @@ public class KafkaLoadNodeTest extends SerializeBaseTest<KafkaLoadNode> { formatMap.put(MetaField.DATA, "STRING METADATA FROM 'value.data_canal'"); formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 'value.table'"); formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 'value.database'"); - formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.op-type'"); + formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.type'"); formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 'value.event-timestamp'"); formatMap.put(MetaField.IS_DDL, "BOOLEAN METADATA FROM 'value.is-ddl'"); formatMap.put(MetaField.TS, "TIMESTAMP_LTZ(3) METADATA FROM 'value.ingestion-timestamp'"); diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java index dfc40e548..d79734aed 100644 --- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java +++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java @@ -225,6 +225,10 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali } }), // additional metadata + /** + * It is deprecated, please use {@link this#TYPE} instead + */ + @Deprecated OP_TYPE( "op-type", DataTypes.STRING().nullable(), @@ -232,6 +236,21 @@ public class CanalJsonEnhancedDecodingFormat implements DecodingFormat<Deseriali new MetadataConverter() { private static final long serialVersionUID = 1L; + @Override + public Object convert(GenericRowData row, int pos) { + if (row.isNullAt(pos)) { + return null; + } + return row.getString(pos); + } + }), + TYPE( + "type", + DataTypes.STRING().nullable(), + DataTypes.FIELD("type", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + @Override public Object convert(GenericRowData row, int pos) { if (row.isNullAt(pos)) { diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java index f7f3dc933..db12a13fb 100644 --- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java +++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java @@ -18,15 +18,6 @@ package org.apache.inlong.sort.formats.json.canal; -import static java.lang.String.format; - -import java.io.IOException; -import java.io.Serializable; -import java.util.List; -import java.util.Objects; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -44,6 +35,15 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat.ReadableMetadata; +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import static java.lang.String.format; + /** * Deserialization schema from Canal JSON to Flink Table/SQL internal data structure {@link * RowData}. The deserialization schema knows Canal's schema definition and can extract the database @@ -56,6 +56,7 @@ import org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat * @see <a href="https://github.com/alibaba/canal">Alibaba Canal</a> */ public final class CanalJsonEnhancedDeserializationSchema implements DeserializationSchema<RowData> { + private static final long serialVersionUID = 1L; private static final String FIELD_OLD = "old"; @@ -64,37 +65,61 @@ public final class CanalJsonEnhancedDeserializationSchema implements Deserializa private static final String OP_DELETE = "DELETE"; private static final String OP_CREATE = "CREATE"; - /** The deserializer to deserialize Canal JSON data. */ + /** + * The deserializer to deserialize Canal JSON data. + */ private final JsonRowDataDeserializationSchema jsonDeserializer; - /** Flag that indicates that an additional projection is required for metadata. */ + /** + * Flag that indicates that an additional projection is required for metadata. + */ private final boolean hasMetadata; - /** Metadata to be extracted for every record. */ + /** + * Metadata to be extracted for every record. + */ private final MetadataConverter[] metadataConverters; - /** {@link TypeInformation} of the produced {@link RowData} (physical + meta data). */ + /** + * {@link TypeInformation} of the produced {@link RowData} (physical + meta data). + */ private final TypeInformation<RowData> producedTypeInfo; - /** Only read changelogs from the specific database. */ - private final @Nullable String database; + /** + * Only read changelogs from the specific database. + */ + private final @Nullable + String database; - /** Only read changelogs from the specific table. */ - private final @Nullable String table; + /** + * Only read changelogs from the specific table. + */ + private final @Nullable + String table; - /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */ + /** + * Flag indicating whether to ignore invalid fields/rows (default: throw an exception). + */ private final boolean ignoreParseErrors; - /** Names of fields. */ + /** + * Names of fields. + */ private final List<String> fieldNames; - /** Number of fields. */ + /** + * Number of fields. + */ private final int fieldCount; - /** Pattern of the specific database. */ + /** + * Pattern of the specific database. + */ private final Pattern databasePattern; - /** Pattern of the specific table. */ + /** + * Pattern of the specific table. + */ private final Pattern tablePattern; private CanalJsonEnhancedDeserializationSchema( @@ -133,7 +158,9 @@ public final class CanalJsonEnhancedDeserializationSchema implements Deserializa // Builder // ------------------------------------------------------------------------------------------ - /** Creates A builder for building a {@link CanalJsonEnhancedDeserializationSchema}. */ + /** + * Creates A builder for building a {@link CanalJsonEnhancedDeserializationSchema}. + */ public static Builder builder( DataType physicalDataType, List<ReadableMetadata> requestedMetadata, @@ -141,60 +168,49 @@ public final class CanalJsonEnhancedDeserializationSchema implements Deserializa return new Builder(physicalDataType, requestedMetadata, producedTypeInfo); } - /** A builder for creating a {@link CanalJsonEnhancedDeserializationSchema}. */ - @Internal - public static final class Builder { - private final DataType physicalDataType; - private final List<ReadableMetadata> requestedMetadata; - private final TypeInformation<RowData> producedTypeInfo; - private String database = null; - private String table = null; - private boolean ignoreParseErrors = false; - private TimestampFormat timestampFormat = TimestampFormat.SQL; - - private Builder( - DataType physicalDataType, - List<ReadableMetadata> requestedMetadata, - TypeInformation<RowData> producedTypeInfo) { - this.physicalDataType = physicalDataType; - this.requestedMetadata = requestedMetadata; - this.producedTypeInfo = producedTypeInfo; - } - - public Builder setDatabase(String database) { - this.database = database; - return this; - } + private static RowType createJsonRowType( + DataType physicalDataType, List<ReadableMetadata> readableMetadata) { + // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them + DataType root = + DataTypes.ROW( + DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)), + DataTypes.FIELD("old", DataTypes.ARRAY(physicalDataType)), + ReadableMetadata.TYPE.requiredJsonField, + ReadableMetadata.DATABASE.requiredJsonField, + ReadableMetadata.TABLE.requiredJsonField); + // append fields that are required for reading metadata in the root + final List<DataTypes.Field> rootMetadataFields = + readableMetadata.stream() + .filter(m -> m != ReadableMetadata.DATABASE + && m != ReadableMetadata.TABLE + && m != ReadableMetadata.TYPE) + .map(m -> m.requiredJsonField) + .distinct() + .collect(Collectors.toList()); + return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType(); + } - public Builder setTable(String table) { - this.table = table; - return this; - } + // ------------------------------------------------------------------------------------------ - public Builder setIgnoreParseErrors(boolean ignoreParseErrors) { - this.ignoreParseErrors = ignoreParseErrors; - return this; - } + private static MetadataConverter[] createMetadataConverters( + RowType jsonRowType, List<ReadableMetadata> requestedMetadata) { + return requestedMetadata.stream() + .map(m -> convert(jsonRowType, m)) + .toArray(MetadataConverter[]::new); + } - public Builder setTimestampFormat(TimestampFormat timestampFormat) { - this.timestampFormat = timestampFormat; - return this; - } + private static MetadataConverter convert(RowType jsonRowType, ReadableMetadata metadata) { + final int pos = jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName()); + return new MetadataConverter() { + private static final long serialVersionUID = 1L; - public CanalJsonEnhancedDeserializationSchema build() { - return new CanalJsonEnhancedDeserializationSchema( - physicalDataType, - requestedMetadata, - producedTypeInfo, - database, - table, - ignoreParseErrors, - timestampFormat); - } + @Override + public Object convert(GenericRowData root, int unused) { + return metadata.converter.convert(root, pos); + } + }; } - // ------------------------------------------------------------------------------------------ - @Override public RowData deserialize(byte[] message) throws IOException { throw new RuntimeException( @@ -315,6 +331,8 @@ public final class CanalJsonEnhancedDeserializationSchema implements Deserializa return producedTypeInfo; } + // -------------------------------------------------------------------------------------------- + @Override public boolean equals(Object o) { if (this == o) { @@ -345,49 +363,6 @@ public final class CanalJsonEnhancedDeserializationSchema implements Deserializa fieldCount); } - // -------------------------------------------------------------------------------------------- - - private static RowType createJsonRowType( - DataType physicalDataType, List<ReadableMetadata> readableMetadata) { - // Canal JSON contains other information, e.g. "ts", "sql", but we don't need them - DataType root = - DataTypes.ROW( - DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)), - DataTypes.FIELD("old", DataTypes.ARRAY(physicalDataType)), - DataTypes.FIELD("type", DataTypes.STRING()), - ReadableMetadata.DATABASE.requiredJsonField, - ReadableMetadata.TABLE.requiredJsonField); - // append fields that are required for reading metadata in the root - final List<DataTypes.Field> rootMetadataFields = - readableMetadata.stream() - .filter(m -> m != ReadableMetadata.DATABASE && m != ReadableMetadata.TABLE) - .map(m -> m.requiredJsonField) - .distinct() - .collect(Collectors.toList()); - return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType(); - } - - private static MetadataConverter[] createMetadataConverters( - RowType jsonRowType, List<ReadableMetadata> requestedMetadata) { - return requestedMetadata.stream() - .map(m -> convert(jsonRowType, m)) - .toArray(MetadataConverter[]::new); - } - - private static MetadataConverter convert(RowType jsonRowType, ReadableMetadata metadata) { - final int pos = jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName()); - return new MetadataConverter() { - private static final long serialVersionUID = 1L; - - @Override - public Object convert(GenericRowData root, int unused) { - return metadata.converter.convert(root, pos); - } - }; - } - - // -------------------------------------------------------------------------------------------- - /** * Converter that extracts a metadata field from the row that comes out of the JSON schema and * converts it to the desired data type. @@ -401,4 +376,61 @@ public final class CanalJsonEnhancedDeserializationSchema implements Deserializa Object convert(GenericRowData row, int pos); } + + // -------------------------------------------------------------------------------------------- + + /** + * A builder for creating a {@link CanalJsonEnhancedDeserializationSchema}. + */ + @Internal + public static final class Builder { + + private final DataType physicalDataType; + private final List<ReadableMetadata> requestedMetadata; + private final TypeInformation<RowData> producedTypeInfo; + private String database = null; + private String table = null; + private boolean ignoreParseErrors = false; + private TimestampFormat timestampFormat = TimestampFormat.SQL; + + private Builder( + DataType physicalDataType, + List<ReadableMetadata> requestedMetadata, + TypeInformation<RowData> producedTypeInfo) { + this.physicalDataType = physicalDataType; + this.requestedMetadata = requestedMetadata; + this.producedTypeInfo = producedTypeInfo; + } + + public Builder setDatabase(String database) { + this.database = database; + return this; + } + + public Builder setTable(String table) { + this.table = table; + return this; + } + + public Builder setIgnoreParseErrors(boolean ignoreParseErrors) { + this.ignoreParseErrors = ignoreParseErrors; + return this; + } + + public Builder setTimestampFormat(TimestampFormat timestampFormat) { + this.timestampFormat = timestampFormat; + return this; + } + + public CanalJsonEnhancedDeserializationSchema build() { + return new CanalJsonEnhancedDeserializationSchema( + physicalDataType, + requestedMetadata, + producedTypeInfo, + database, + table, + ignoreParseErrors, + timestampFormat); + } + } } diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java index 2595fe477..a285a292e 100644 --- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java +++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java @@ -206,6 +206,25 @@ public class CanalJsonEnhancedEncodingFormat implements EncodingFormat<Serializa } }), // additional metadata + TYPE( + "type", + DataTypes.STRING().nullable(), + DataTypes.FIELD("type", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(RowData row, int pos) { + if (row.isNullAt(pos)) { + return null; + } + return row.getString(pos); + } + }), + /** + * It is deprecated, please use {@link this#TYPE} instead + */ + @Deprecated OP_TYPE( "op-type", DataTypes.STRING().nullable(), diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java index 2e8c65abd..ca3c80e1e 100644 --- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java +++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java @@ -55,18 +55,17 @@ public class CanalJsonEnhancedSerializationSchema implements SerializationSchema private static final StringData OP_INSERT = StringData.fromString("INSERT"); private static final StringData OP_DELETE = StringData.fromString("DELETE"); - - private transient GenericRowData reuse; - - /** The serializer to serialize Canal JSON data. */ + /** + * The serializer to serialize Canal JSON data. + */ private final JsonRowDataSerializationSchema jsonSerializer; - private final RowData.FieldGetter[] physicalFieldGetter; - private final RowData.FieldGetter[] wirteableMetadataFieldGetter; - - /** row schema that json serializer can parse output row to json format */ + /** + * row schema that json serializer can parse output row to json format + */ private final RowType jsonRowType; + private transient GenericRowData reuse; /** * Constructor of CanalJsonEnhancedSerializationSchema. @@ -105,6 +104,23 @@ public class CanalJsonEnhancedSerializationSchema implements SerializationSchema encodeDecimalAsPlainNumber); } + private static RowType createJsonRowType(DataType physicalDataType, List<WriteableMetadata> writeableMetadata) { + // Canal JSON contains other information, e.g. "database", "ts" + // but we don't need them + // and we don't need "old" , because can not support UPDATE_BEFORE,UPDATE_AFTER + DataType root = + DataTypes.ROW( + DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)), + WriteableMetadata.TYPE.requiredJsonField); + // append fields that are required for reading metadata in the root + final List<DataTypes.Field> metadataFields = + writeableMetadata.stream().filter(m -> m != WriteableMetadata.TYPE) + .map(m -> m.requiredJsonField) + .distinct() + .collect(Collectors.toList()); + return (RowType) DataTypeUtils.appendRowFields(root, metadataFields).getLogicalType(); + } + @Override public void open(InitializationContext context) { reuse = new GenericRowData(2 + wirteableMetadataFieldGetter.length); @@ -118,7 +134,7 @@ public class CanalJsonEnhancedSerializationSchema implements SerializationSchema IntStream.range(0, physicalFieldGetter.length) .forEach(targetField -> physicalData.setField(targetField, physicalFieldGetter[targetField].getFieldOrNull(row))); - ArrayData arrayData = new GenericArrayData(new RowData[] {physicalData}); + ArrayData arrayData = new GenericArrayData(new RowData[]{physicalData}); reuse.setField(0, arrayData); // mete data injection @@ -165,23 +181,6 @@ public class CanalJsonEnhancedSerializationSchema implements SerializationSchema } } - private static RowType createJsonRowType(DataType physicalDataType, List<WriteableMetadata> writeableMetadata) { - // Canal JSON contains other information, e.g. "database", "ts" - // but we don't need them - // and we don't need "old" , because can not support UPDATE_BEFORE,UPDATE_AFTER - DataType root = - DataTypes.ROW( - DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)), - DataTypes.FIELD("type", DataTypes.STRING())); - // append fields that are required for reading metadata in the root - final List<DataTypes.Field> metadataFields = - writeableMetadata.stream() - .map(m -> m.requiredJsonField) - .distinct() - .collect(Collectors.toList()); - return (RowType) DataTypeUtils.appendRowFields(root, metadataFields).getLogicalType(); - } - // -------------------------------------------------------------------------------------------- /** @@ -189,6 +188,7 @@ public class CanalJsonEnhancedSerializationSchema implements SerializationSchema * Finally all metadata field will splice into a GenericRowData, then json Serializer serialize it into json string. */ interface MetadataConverter extends Serializable { + Object convert(RowData inputRow, int pos); } }