This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 489529262 [INLONG-6548][Sort] Optimize metadata field naming for 
format of canal-json (#6549)
489529262 is described below

commit 4895292623c22a1d268bc8ce25366143bcd18005
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Wed Nov 16 11:08:21 2022 +0800

    [INLONG-6548][Sort] Optimize metadata field naming for format of canal-json 
(#6549)
---
 .../protocol/node/extract/KafkaExtractNode.java    |   2 +-
 .../sort/protocol/node/load/KafkaLoadNode.java     |   8 +-
 .../node/extract/KafkaExtractNodeTest.java         |   3 +-
 .../sort/protocol/node/load/KafkaLoadNodeTest.java |   2 +-
 .../canal/CanalJsonEnhancedDecodingFormat.java     |  19 ++
 .../CanalJsonEnhancedDeserializationSchema.java    | 260 ++++++++++++---------
 .../canal/CanalJsonEnhancedEncodingFormat.java     |  19 ++
 .../CanalJsonEnhancedSerializationSchema.java      |  52 ++---
 8 files changed, 217 insertions(+), 148 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 02da2bef3..fb6693548 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -215,7 +215,7 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
                 metadataKey = "value.event-timestamp";
                 break;
             case OP_TYPE:
-                metadataKey = "value.op-type";
+                metadataKey = "value.type";
                 break;
             case IS_DDL:
                 metadataKey = "value.is-ddl";
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index be326b478..26fa424ca 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -224,7 +224,7 @@ public class KafkaLoadNode extends LoadNode implements 
InlongMetric, Metadata, S
                 metadataKey = "value.event-timestamp";
                 break;
             case OP_TYPE:
-                metadataKey = "value.op-type";
+                metadataKey = "value.type";
                 break;
             case DATA:
             case DATA_CANAL:
@@ -257,8 +257,8 @@ public class KafkaLoadNode extends LoadNode implements 
InlongMetric, Metadata, S
     @Override
     public Set<MetaField> supportedMetaFields() {
         return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, 
MetaField.OP_TYPE,
-            MetaField.DATABASE_NAME, MetaField.SQL_TYPE, MetaField.PK_NAMES, 
MetaField.TS,
-            MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, 
MetaField.BATCH_ID,
-            MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA);
+                MetaField.DATABASE_NAME, MetaField.SQL_TYPE, 
MetaField.PK_NAMES, MetaField.TS,
+                MetaField.OP_TS, MetaField.IS_DDL, MetaField.MYSQL_TYPE, 
MetaField.BATCH_ID,
+                MetaField.UPDATE_BEFORE, MetaField.DATA_CANAL, MetaField.DATA);
     }
 }
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index 661d6f547..1d41c96fd 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -34,7 +34,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -86,7 +85,7 @@ public class KafkaExtractNodeTest extends 
SerializeBaseTest<KafkaExtractNode> {
         formatMap.put(MetaField.PROCESS_TIME, "AS PROCTIME()");
         formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 
'value.table'");
         formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 
'value.database'");
-        formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 
'value.op-type'");
+        formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.type'");
         formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 
'value.event-timestamp'");
         formatMap.put(MetaField.IS_DDL, "BOOLEAN METADATA FROM 
'value.is-ddl'");
         formatMap.put(MetaField.TS, "TIMESTAMP_LTZ(3) METADATA FROM 
'value.ingestion-timestamp'");
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
index a9f495c28..da58b1ac9 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNodeTest.java
@@ -54,7 +54,7 @@ public class KafkaLoadNodeTest extends 
SerializeBaseTest<KafkaLoadNode> {
         formatMap.put(MetaField.DATA, "STRING METADATA FROM 
'value.data_canal'");
         formatMap.put(MetaField.TABLE_NAME, "STRING METADATA FROM 
'value.table'");
         formatMap.put(MetaField.DATABASE_NAME, "STRING METADATA FROM 
'value.database'");
-        formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 
'value.op-type'");
+        formatMap.put(MetaField.OP_TYPE, "STRING METADATA FROM 'value.type'");
         formatMap.put(MetaField.OP_TS, "TIMESTAMP_LTZ(3) METADATA FROM 
'value.event-timestamp'");
         formatMap.put(MetaField.IS_DDL, "BOOLEAN METADATA FROM 
'value.is-ddl'");
         formatMap.put(MetaField.TS, "TIMESTAMP_LTZ(3) METADATA FROM 
'value.ingestion-timestamp'");
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
index dfc40e548..d79734aed 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
+++ 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDecodingFormat.java
@@ -225,6 +225,10 @@ public class CanalJsonEnhancedDecodingFormat implements 
DecodingFormat<Deseriali
                     }
                 }),
         // additional metadata
+        /**
+         * It is deprecated, please use {@link this#TYPE} instead
+         */
+        @Deprecated
         OP_TYPE(
                 "op-type",
                 DataTypes.STRING().nullable(),
@@ -232,6 +236,21 @@ public class CanalJsonEnhancedDecodingFormat implements 
DecodingFormat<Deseriali
                 new MetadataConverter() {
                     private static final long serialVersionUID = 1L;
 
+                    @Override
+                    public Object convert(GenericRowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getString(pos);
+                    }
+                }),
+        TYPE(
+                "type",
+                DataTypes.STRING().nullable(),
+                DataTypes.FIELD("type", DataTypes.STRING()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
                     @Override
                     public Object convert(GenericRowData row, int pos) {
                         if (row.isNullAt(pos)) {
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
index f7f3dc933..db12a13fb 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedDeserializationSchema.java
@@ -18,15 +18,6 @@
 
 package org.apache.inlong.sort.formats.json.canal;
 
-import static java.lang.String.format;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Objects;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -44,6 +35,15 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import 
org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat.ReadableMetadata;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import static java.lang.String.format;
+
 /**
  * Deserialization schema from Canal JSON to Flink Table/SQL internal data 
structure {@link
  * RowData}. The deserialization schema knows Canal's schema definition and 
can extract the database
@@ -56,6 +56,7 @@ import 
org.apache.inlong.sort.formats.json.canal.CanalJsonEnhancedDecodingFormat
  * @see <a href="https://github.com/alibaba/canal";>Alibaba Canal</a>
  */
 public final class CanalJsonEnhancedDeserializationSchema implements 
DeserializationSchema<RowData> {
+
     private static final long serialVersionUID = 1L;
 
     private static final String FIELD_OLD = "old";
@@ -64,37 +65,61 @@ public final class CanalJsonEnhancedDeserializationSchema 
implements Deserializa
     private static final String OP_DELETE = "DELETE";
     private static final String OP_CREATE = "CREATE";
 
-    /** The deserializer to deserialize Canal JSON data. */
+    /**
+     * The deserializer to deserialize Canal JSON data.
+     */
     private final JsonRowDataDeserializationSchema jsonDeserializer;
 
-    /** Flag that indicates that an additional projection is required for 
metadata. */
+    /**
+     * Flag that indicates that an additional projection is required for 
metadata.
+     */
     private final boolean hasMetadata;
 
-    /** Metadata to be extracted for every record. */
+    /**
+     * Metadata to be extracted for every record.
+     */
     private final MetadataConverter[] metadataConverters;
 
-    /** {@link TypeInformation} of the produced {@link RowData} (physical + 
meta data). */
+    /**
+     * {@link TypeInformation} of the produced {@link RowData} (physical + 
meta data).
+     */
     private final TypeInformation<RowData> producedTypeInfo;
 
-    /** Only read changelogs from the specific database. */
-    private final @Nullable String database;
+    /**
+     * Only read changelogs from the specific database.
+     */
+    private final @Nullable
+    String database;
 
-    /** Only read changelogs from the specific table. */
-    private final @Nullable String table;
+    /**
+     * Only read changelogs from the specific table.
+     */
+    private final @Nullable
+    String table;
 
-    /** Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception). */
+    /**
+     * Flag indicating whether to ignore invalid fields/rows (default: throw 
an exception).
+     */
     private final boolean ignoreParseErrors;
 
-    /** Names of fields. */
+    /**
+     * Names of fields.
+     */
     private final List<String> fieldNames;
 
-    /** Number of fields. */
+    /**
+     * Number of fields.
+     */
     private final int fieldCount;
 
-    /** Pattern of the specific database. */
+    /**
+     * Pattern of the specific database.
+     */
     private final Pattern databasePattern;
 
-    /** Pattern of the specific table. */
+    /**
+     * Pattern of the specific table.
+     */
     private final Pattern tablePattern;
 
     private CanalJsonEnhancedDeserializationSchema(
@@ -133,7 +158,9 @@ public final class CanalJsonEnhancedDeserializationSchema 
implements Deserializa
     // Builder
     // 
------------------------------------------------------------------------------------------
 
-    /** Creates A builder for building a {@link 
CanalJsonEnhancedDeserializationSchema}. */
+    /**
+     * Creates A builder for building a {@link 
CanalJsonEnhancedDeserializationSchema}.
+     */
     public static Builder builder(
             DataType physicalDataType,
             List<ReadableMetadata> requestedMetadata,
@@ -141,60 +168,49 @@ public final class CanalJsonEnhancedDeserializationSchema 
implements Deserializa
         return new Builder(physicalDataType, requestedMetadata, 
producedTypeInfo);
     }
 
-    /** A builder for creating a {@link 
CanalJsonEnhancedDeserializationSchema}. */
-    @Internal
-    public static final class Builder {
-        private final DataType physicalDataType;
-        private final List<ReadableMetadata> requestedMetadata;
-        private final TypeInformation<RowData> producedTypeInfo;
-        private String database = null;
-        private String table = null;
-        private boolean ignoreParseErrors = false;
-        private TimestampFormat timestampFormat = TimestampFormat.SQL;
-
-        private Builder(
-                DataType physicalDataType,
-                List<ReadableMetadata> requestedMetadata,
-                TypeInformation<RowData> producedTypeInfo) {
-            this.physicalDataType = physicalDataType;
-            this.requestedMetadata = requestedMetadata;
-            this.producedTypeInfo = producedTypeInfo;
-        }
-
-        public Builder setDatabase(String database) {
-            this.database = database;
-            return this;
-        }
+    private static RowType createJsonRowType(
+            DataType physicalDataType, List<ReadableMetadata> 
readableMetadata) {
+        // Canal JSON contains other information, e.g. "ts", "sql", but we 
don't need them
+        DataType root =
+                DataTypes.ROW(
+                        DataTypes.FIELD("data", 
DataTypes.ARRAY(physicalDataType)),
+                        DataTypes.FIELD("old", 
DataTypes.ARRAY(physicalDataType)),
+                        ReadableMetadata.TYPE.requiredJsonField,
+                        ReadableMetadata.DATABASE.requiredJsonField,
+                        ReadableMetadata.TABLE.requiredJsonField);
+        // append fields that are required for reading metadata in the root
+        final List<DataTypes.Field> rootMetadataFields =
+                readableMetadata.stream()
+                        .filter(m -> m != ReadableMetadata.DATABASE
+                                && m != ReadableMetadata.TABLE
+                                && m != ReadableMetadata.TYPE)
+                        .map(m -> m.requiredJsonField)
+                        .distinct()
+                        .collect(Collectors.toList());
+        return (RowType) DataTypeUtils.appendRowFields(root, 
rootMetadataFields).getLogicalType();
+    }
 
-        public Builder setTable(String table) {
-            this.table = table;
-            return this;
-        }
+    // 
------------------------------------------------------------------------------------------
 
-        public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
-            this.ignoreParseErrors = ignoreParseErrors;
-            return this;
-        }
+    private static MetadataConverter[] createMetadataConverters(
+            RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
+        return requestedMetadata.stream()
+                .map(m -> convert(jsonRowType, m))
+                .toArray(MetadataConverter[]::new);
+    }
 
-        public Builder setTimestampFormat(TimestampFormat timestampFormat) {
-            this.timestampFormat = timestampFormat;
-            return this;
-        }
+    private static MetadataConverter convert(RowType jsonRowType, 
ReadableMetadata metadata) {
+        final int pos = 
jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+        return new MetadataConverter() {
+            private static final long serialVersionUID = 1L;
 
-        public CanalJsonEnhancedDeserializationSchema build() {
-            return new CanalJsonEnhancedDeserializationSchema(
-                    physicalDataType,
-                    requestedMetadata,
-                    producedTypeInfo,
-                    database,
-                    table,
-                    ignoreParseErrors,
-                    timestampFormat);
-        }
+            @Override
+            public Object convert(GenericRowData root, int unused) {
+                return metadata.converter.convert(root, pos);
+            }
+        };
     }
 
-    // 
------------------------------------------------------------------------------------------
-
     @Override
     public RowData deserialize(byte[] message) throws IOException {
         throw new RuntimeException(
@@ -315,6 +331,8 @@ public final class CanalJsonEnhancedDeserializationSchema 
implements Deserializa
         return producedTypeInfo;
     }
 
+    // 
--------------------------------------------------------------------------------------------
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -345,49 +363,6 @@ public final class CanalJsonEnhancedDeserializationSchema 
implements Deserializa
                 fieldCount);
     }
 
-    // 
--------------------------------------------------------------------------------------------
-
-    private static RowType createJsonRowType(
-            DataType physicalDataType, List<ReadableMetadata> 
readableMetadata) {
-        // Canal JSON contains other information, e.g. "ts", "sql", but we 
don't need them
-        DataType root =
-                DataTypes.ROW(
-                        DataTypes.FIELD("data", 
DataTypes.ARRAY(physicalDataType)),
-                        DataTypes.FIELD("old", 
DataTypes.ARRAY(physicalDataType)),
-                        DataTypes.FIELD("type", DataTypes.STRING()),
-                        ReadableMetadata.DATABASE.requiredJsonField,
-                        ReadableMetadata.TABLE.requiredJsonField);
-        // append fields that are required for reading metadata in the root
-        final List<DataTypes.Field> rootMetadataFields =
-                readableMetadata.stream()
-                        .filter(m -> m != ReadableMetadata.DATABASE && m != 
ReadableMetadata.TABLE)
-                        .map(m -> m.requiredJsonField)
-                        .distinct()
-                        .collect(Collectors.toList());
-        return (RowType) DataTypeUtils.appendRowFields(root, 
rootMetadataFields).getLogicalType();
-    }
-
-    private static MetadataConverter[] createMetadataConverters(
-            RowType jsonRowType, List<ReadableMetadata> requestedMetadata) {
-        return requestedMetadata.stream()
-                .map(m -> convert(jsonRowType, m))
-                .toArray(MetadataConverter[]::new);
-    }
-
-    private static MetadataConverter convert(RowType jsonRowType, 
ReadableMetadata metadata) {
-        final int pos = 
jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
-        return new MetadataConverter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public Object convert(GenericRowData root, int unused) {
-                return metadata.converter.convert(root, pos);
-            }
-        };
-    }
-
-    // 
--------------------------------------------------------------------------------------------
-
     /**
      * Converter that extracts a metadata field from the row that comes out of 
the JSON schema and
      * converts it to the desired data type.
@@ -401,4 +376,61 @@ public final class CanalJsonEnhancedDeserializationSchema 
implements Deserializa
 
         Object convert(GenericRowData row, int pos);
     }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * A builder for creating a {@link CanalJsonEnhancedDeserializationSchema}.
+     */
+    @Internal
+    public static final class Builder {
+
+        private final DataType physicalDataType;
+        private final List<ReadableMetadata> requestedMetadata;
+        private final TypeInformation<RowData> producedTypeInfo;
+        private String database = null;
+        private String table = null;
+        private boolean ignoreParseErrors = false;
+        private TimestampFormat timestampFormat = TimestampFormat.SQL;
+
+        private Builder(
+                DataType physicalDataType,
+                List<ReadableMetadata> requestedMetadata,
+                TypeInformation<RowData> producedTypeInfo) {
+            this.physicalDataType = physicalDataType;
+            this.requestedMetadata = requestedMetadata;
+            this.producedTypeInfo = producedTypeInfo;
+        }
+
+        public Builder setDatabase(String database) {
+            this.database = database;
+            return this;
+        }
+
+        public Builder setTable(String table) {
+            this.table = table;
+            return this;
+        }
+
+        public Builder setIgnoreParseErrors(boolean ignoreParseErrors) {
+            this.ignoreParseErrors = ignoreParseErrors;
+            return this;
+        }
+
+        public Builder setTimestampFormat(TimestampFormat timestampFormat) {
+            this.timestampFormat = timestampFormat;
+            return this;
+        }
+
+        public CanalJsonEnhancedDeserializationSchema build() {
+            return new CanalJsonEnhancedDeserializationSchema(
+                    physicalDataType,
+                    requestedMetadata,
+                    producedTypeInfo,
+                    database,
+                    table,
+                    ignoreParseErrors,
+                    timestampFormat);
+        }
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
index 2595fe477..a285a292e 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
+++ 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedEncodingFormat.java
@@ -206,6 +206,25 @@ public class CanalJsonEnhancedEncodingFormat implements 
EncodingFormat<Serializa
                     }
                 }),
         // additional metadata
+        TYPE(
+                "type",
+                DataTypes.STRING().nullable(),
+                DataTypes.FIELD("type", DataTypes.STRING()),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getString(pos);
+                    }
+                }),
+        /**
+         * It is deprecated, please use {@link this#TYPE} instead
+         */
+        @Deprecated
         OP_TYPE(
                 "op-type",
                 DataTypes.STRING().nullable(),
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
index 2e8c65abd..ca3c80e1e 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java
@@ -55,18 +55,17 @@ public class CanalJsonEnhancedSerializationSchema 
implements SerializationSchema
 
     private static final StringData OP_INSERT = 
StringData.fromString("INSERT");
     private static final StringData OP_DELETE = 
StringData.fromString("DELETE");
-
-    private transient GenericRowData reuse;
-
-    /** The serializer to serialize Canal JSON data. */
+    /**
+     * The serializer to serialize Canal JSON data.
+     */
     private final JsonRowDataSerializationSchema jsonSerializer;
-
     private final RowData.FieldGetter[] physicalFieldGetter;
-
     private final RowData.FieldGetter[] wirteableMetadataFieldGetter;
-
-    /** row schema that json serializer can parse output row to json format */
+    /**
+     * row schema that json serializer can parse output row to json format
+     */
     private final RowType jsonRowType;
+    private transient GenericRowData reuse;
 
     /**
      * Constructor of CanalJsonEnhancedSerializationSchema.
@@ -105,6 +104,23 @@ public class CanalJsonEnhancedSerializationSchema 
implements SerializationSchema
                         encodeDecimalAsPlainNumber);
     }
 
+    private static RowType createJsonRowType(DataType physicalDataType, 
List<WriteableMetadata> writeableMetadata) {
+        // Canal JSON contains other information, e.g. "database", "ts"
+        // but we don't need them
+        // and we don't need "old" , because can not support 
UPDATE_BEFORE,UPDATE_AFTER
+        DataType root =
+                DataTypes.ROW(
+                        DataTypes.FIELD("data", 
DataTypes.ARRAY(physicalDataType)),
+                        WriteableMetadata.TYPE.requiredJsonField);
+        // append fields that are required for reading metadata in the root
+        final List<DataTypes.Field> metadataFields =
+                writeableMetadata.stream().filter(m -> m != 
WriteableMetadata.TYPE)
+                        .map(m -> m.requiredJsonField)
+                        .distinct()
+                        .collect(Collectors.toList());
+        return (RowType) DataTypeUtils.appendRowFields(root, 
metadataFields).getLogicalType();
+    }
+
     @Override
     public void open(InitializationContext context) {
         reuse = new GenericRowData(2 + wirteableMetadataFieldGetter.length);
@@ -118,7 +134,7 @@ public class CanalJsonEnhancedSerializationSchema 
implements SerializationSchema
             IntStream.range(0, physicalFieldGetter.length)
                     .forEach(targetField ->
                             physicalData.setField(targetField, 
physicalFieldGetter[targetField].getFieldOrNull(row)));
-            ArrayData arrayData = new GenericArrayData(new RowData[] 
{physicalData});
+            ArrayData arrayData = new GenericArrayData(new 
RowData[]{physicalData});
             reuse.setField(0, arrayData);
 
             // mete data injection
@@ -165,23 +181,6 @@ public class CanalJsonEnhancedSerializationSchema 
implements SerializationSchema
         }
     }
 
-    private static RowType createJsonRowType(DataType physicalDataType, 
List<WriteableMetadata> writeableMetadata) {
-        // Canal JSON contains other information, e.g. "database", "ts"
-        // but we don't need them
-        // and we don't need "old" , because can not support 
UPDATE_BEFORE,UPDATE_AFTER
-        DataType root =
-                DataTypes.ROW(
-                                DataTypes.FIELD("data", 
DataTypes.ARRAY(physicalDataType)),
-                                DataTypes.FIELD("type", DataTypes.STRING()));
-        // append fields that are required for reading metadata in the root
-        final List<DataTypes.Field> metadataFields =
-                writeableMetadata.stream()
-                        .map(m -> m.requiredJsonField)
-                        .distinct()
-                        .collect(Collectors.toList());
-        return (RowType) DataTypeUtils.appendRowFields(root, 
metadataFields).getLogicalType();
-    }
-
     // 
--------------------------------------------------------------------------------------------
 
     /**
@@ -189,6 +188,7 @@ public class CanalJsonEnhancedSerializationSchema 
implements SerializationSchema
      * Finally all metadata field will splice into a GenericRowData, then json 
Serializer serialize it into json string.
      */
     interface MetadataConverter extends Serializable {
+
         Object convert(RowData inputRow, int pos);
     }
 }

Reply via email to