This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bb45659e1 [INLONG-7958][Sort] Fix MongoDB's schema becomes unordered 
after extracting the row data (#7960)
bb45659e1 is described below

commit bb45659e1b0d300d8d306f6cf47ab849aa8f906e
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Sun May 7 08:31:22 2023 +0800

    [INLONG-7958][Sort] Fix MongoDB's schema becomes unordered after extracting 
the row data (#7960)
---
 .../sort/cdc/mongodb/debezium/DebeziumJson.java    |  49 ++++++
 .../MongoDBConnectorDeserializationSchema.java     |   5 +-
 .../source/reader/MongoDBRecordEmitter.java        |   3 +-
 .../source/reader/fetch/MongoDBScanFetchTask.java  |   3 +-
 .../reader/fetch/MongoDBStreamFetchTask.java       |   3 +-
 .../cdc/mongodb/source/utils/MetaDataUtils.java    | 177 +++++++++++++++++++++
 .../sort/cdc/mongodb/source/utils/MongoUtils.java  |   3 +-
 .../cdc/mongodb/table/MongoDBReadableMetadata.java | 152 +-----------------
 8 files changed, 241 insertions(+), 154 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumJson.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumJson.java
new file mode 100644
index 000000000..f64fc5e2a
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumJson.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb.debezium;
+
+import io.debezium.relational.history.TableChanges;
+import java.util.List;
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class DebeziumJson {
+
+    private Map<String, String> before;
+    private Map<String, Object> after;
+    private Source source;
+    private TableChanges.TableChange tableChange;
+    private long tsMs;
+    private String op;
+
+    @Builder
+    @Data
+    public static class Source {
+
+        private String name;
+        private String db;
+        private String table;
+        private List<String> pkNames;
+        private Map<String, Integer> sqlType;
+        private Map<String, String> mysqlType;
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index 98e5cf4e9..b8a0a86cc 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mongodb.debezium.table;
 import com.mongodb.client.model.changestream.OperationType;
 import com.mongodb.internal.HexUtils;
 import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
+import java.util.LinkedHashMap;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
@@ -809,8 +810,8 @@ public class MongoDBConnectorDeserializationSchema
                 }
                 return row;
             } else {
-                Map<String, Object> data = new HashMap<>();
-                Map<String, String> dataType = new HashMap<>();
+                Map<String, Object> data = new LinkedHashMap<>();
+                Map<String, String> dataType = new LinkedHashMap<>();
                 document.forEach((key, value) -> {
                     try {
                         LogicalType logicalType = 
RecordUtils.convertLogicType(value);
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java
index b9e17a624..5f647031e 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java
@@ -55,8 +55,7 @@ import org.slf4j.LoggerFactory;
  */
 public final class MongoDBRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(
-            
com.ververica.cdc.connectors.mongodb.source.reader.MongoDBRecordEmitter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBRecordEmitter.class);
 
     public MongoDBRecordEmitter(
             DebeziumDeserializationSchema<T> deserializationSchema,
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
index 88f4c2c2d..963461ea9 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
@@ -65,8 +65,7 @@ import org.slf4j.LoggerFactory;
  */
 public class MongoDBScanFetchTask implements FetchTask<SourceSplitBase> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(
-            
com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBScanFetchTask.class);
 
     private final SnapshotSplit snapshotSplit;
     private volatile boolean taskRunning = false;
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
index 69fa89772..420132b87 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
@@ -82,8 +82,7 @@ import org.slf4j.LoggerFactory;
  */
 public class MongoDBStreamFetchTask implements FetchTask<SourceSplitBase> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(
-            
com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDBStreamFetchTask.class);
 
     private final StreamSplit streamSplit;
     private volatile boolean taskRunning = false;
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MetaDataUtils.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MetaDataUtils.java
new file mode 100644
index 000000000..28cc14d58
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MetaDataUtils.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.mongodb.source.utils;
+
+import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.inlong.sort.cdc.mongodb.debezium.DebeziumJson;
+import org.apache.inlong.sort.cdc.mongodb.debezium.DebeziumJson.Source;
+import org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+public class MetaDataUtils {
+
+    private static final String MONGODB_DEFAULT_PRIMARY_KEY = "_id";
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    /**
+     * get collection name from record
+     */
+    public static String getMetaData(SourceRecord record, String metaDataKey) {
+        Struct value = (Struct) record.value();
+        Struct to = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
+        return to.getString(metaDataKey);
+    }
+
+    /**
+     * get sql type from row data, represents the jdbc data type
+     */
+    public static Map<String, Integer> getSqlType(@Nullable RowData rowData) {
+        if (rowData == null) {
+            return null;
+        }
+        GenericRowData data = (GenericRowData) rowData;
+        Map<String, String> mongoDbType = (Map<String, String>) 
data.getField(1);
+        Map<String, Integer> sqlType = new LinkedHashMap<>();
+        mongoDbType.forEach((name, value) -> sqlType.put(name, 
RecordUtils.getSqlType(value)));
+        return sqlType;
+    }
+
+    private static String getDebeziumOpType(RowData rowData) {
+        String opType;
+        switch (rowData.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "d";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "c";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states 
in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    private static String getCanalOpType(RowData rowData) {
+        String opType;
+        switch (rowData.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "DELETE";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "INSERT";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states 
in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    public static StringData getCanalData(SourceRecord record, RowData rowData,
+            TableChange tableSchema) {
+        // construct canal json
+        Struct messageStruct = (Struct) record.value();
+        Struct sourceStruct = 
messageStruct.getStruct(Envelope.FieldName.SOURCE);
+        GenericRowData data = (GenericRowData) rowData;
+        Map<String, Object> field = (Map<String, Object>) data.getField(0);
+        Map<String, String> mongoDbType = (Map<String, String>) 
data.getField(1);
+
+        String database = getMetaData(record, 
MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
+        String table = getMetaData(record, 
MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
+        Long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
+        long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
+
+        List<Map<String, Object>> dataList = new ArrayList<>();
+        dataList.add(field);
+        CanalJson canalJson = CanalJson.builder()
+                .data(dataList)
+                .database(database)
+                .sql("")
+                .es(opTs)
+                .isDdl(false)
+                
.pkNames(Collections.singletonList(MONGODB_DEFAULT_PRIMARY_KEY))
+                .mysqlType(mongoDbType)
+                .table(table)
+                .ts(ts)
+                .type(getCanalOpType(rowData))
+                .sqlType(getSqlType(data))
+                .build();
+        try {
+            return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
+        } catch (Exception e) {
+            throw new IllegalStateException("exception occurs when get meta 
data", e);
+        }
+    }
+
+    public static StringData getDebeziumData(SourceRecord record, TableChange 
tableSchema,
+            RowData rowData) {
+        // construct debezium json
+        Struct messageStruct = (Struct) record.value();
+        GenericRowData data = (GenericRowData) rowData;
+        Map<String, Object> field = (Map<String, Object>) data.getField(0);
+        Map<String, String> mongoDbType = (Map<String, String>) 
data.getField(1);
+
+        String database = getMetaData(record, 
MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
+        String table = getMetaData(record, 
MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
+        long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
+        String debeziumOp = getDebeziumOpType(rowData);
+
+        Source source = Source.builder()
+                .db(database)
+                .table(table)
+                .name("mongodb_cdc_source")
+                .mysqlType(mongoDbType)
+                .sqlType(getSqlType(rowData))
+                
.pkNames(Collections.singletonList(MONGODB_DEFAULT_PRIMARY_KEY))
+                .build();
+        DebeziumJson debeziumJson = DebeziumJson.builder()
+                .source(source)
+                .after(field)
+                .tsMs(ts)
+                .op(debeziumOp)
+                .tableChange(tableSchema)
+                .build();
+        try {
+            return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
+        } catch (Exception e) {
+            throw new IllegalStateException("exception occurs when get meta 
data", e);
+        }
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java
index 874cc0a59..979493809 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java
@@ -70,8 +70,7 @@ import org.slf4j.LoggerFactory;
  */
 public class MongoUtils {
 
-    private static final Logger LOG = LoggerFactory.getLogger(
-            
com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoUtils.class);
 
     public static final BsonDouble COMMAND_SUCCEED_FLAG = new BsonDouble(1.0d);
 
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java
index 19db352eb..0fa28fe37 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java
@@ -17,31 +17,22 @@
 
 package org.apache.inlong.sort.cdc.mongodb.table;
 
+import static 
org.apache.inlong.sort.cdc.mongodb.source.utils.MetaDataUtils.getCanalData;
+import static 
org.apache.inlong.sort.cdc.mongodb.source.utils.MetaDataUtils.getDebeziumData;
+import static 
org.apache.inlong.sort.cdc.mongodb.source.utils.MetaDataUtils.getMetaData;
+
 import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.data.Envelope;
-import io.debezium.relational.Table;
 import io.debezium.relational.history.TableChanges;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
 import javax.annotation.Nullable;
 
-import org.apache.commons.lang3.StringUtils;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.inlong.sort.cdc.mongodb.debezium.table.MetadataConverter;
-import org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils;
-import org.apache.inlong.sort.formats.json.canal.CanalJson;
-import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
@@ -57,10 +48,7 @@ public enum MongoDBReadableMetadata {
 
                 @Override
                 public Object read(SourceRecord record) {
-                    Struct value = (Struct) record.value();
-                    Struct to = 
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
-                    return StringData.fromString(
-                            
to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD));
+                    return getMetaData(record, 
MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
                 }
             }),
 
@@ -73,10 +61,7 @@ public enum MongoDBReadableMetadata {
 
                 @Override
                 public Object read(SourceRecord record) {
-                    Struct value = (Struct) record.value();
-                    Struct to = 
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
-                    return StringData.fromString(
-                            
to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD));
+                    return getMetaData(record, 
MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
                 }
             }),
 
@@ -111,40 +96,8 @@ public enum MongoDBReadableMetadata {
                 @Override
                 public Object read(SourceRecord record,
                         @Nullable TableChanges.TableChange tableSchema, 
RowData rowData) {
-
                     // construct debezium json
-                    Struct messageStruct = (Struct) record.value();
-                    Struct to = 
messageStruct.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
-                    Struct sourceStruct = 
messageStruct.getStruct(Envelope.FieldName.SOURCE);
-                    GenericRowData data = (GenericRowData) rowData;
-                    Map<String, Object> field = (Map<String, Object>) 
data.getField(0);
-                    Map<String, String> mysqlType = (Map<String, String>) 
data.getField(1);
-                    Map<String, Integer> sqlType = new HashMap<>();
-                    mysqlType.forEach((name, value) -> sqlType.put(name, 
RecordUtils.getSqlType(value)));
-                    String debeziumOp = getDebeziumOpType(rowData);
-                    if (StringUtils.isBlank(debeziumOp)) {
-                        return null;
-                    }
-                    DebeziumJson.Source source = DebeziumJson.Source.builder()
-                            
.db(to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD))
-                            
.table(to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD))
-                            .name("mongo_binlog_source")
-                            .mysqlType(mysqlType)
-                            .sqlType(sqlType)
-                            .pkNames(null)
-                            .build();
-                    DebeziumJson debeziumJson = DebeziumJson.builder()
-                            .source(source)
-                            .after(field)
-                            .tsMs((Long) 
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY))
-                            .op(debeziumOp)
-                            .tableChange(tableSchema)
-                            .build();
-                    try {
-                        return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
-                    } catch (Exception e) {
-                        throw new IllegalStateException("exception occurs when 
get meta data", e);
-                    }
+                    return getDebeziumData(record, tableSchema, rowData);
                 }
             }),
 
@@ -164,45 +117,13 @@ public enum MongoDBReadableMetadata {
                 public Object read(SourceRecord record,
                         @Nullable TableChanges.TableChange tableSchema, 
RowData rowData) {
                     // construct canal json
-                    Struct messageStruct = (Struct) record.value();
-                    Struct to = 
messageStruct.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
-                    Struct sourceStruct = 
messageStruct.getStruct(Envelope.FieldName.SOURCE);
-                    String canalOp = getCanalOpType(rowData);
-                    if (StringUtils.isBlank(canalOp)) {
-                        return null;
-                    }
-                    GenericRowData data = (GenericRowData) rowData;
-                    Map<String, Object> field = (Map<String, Object>) 
data.getField(0);
-                    Map<String, String> mysqlType = (Map<String, String>) 
data.getField(1);
-                    Map<String, Integer> sqlType = new HashMap<>();
-                    mysqlType.forEach((name, value) -> sqlType.put(name, 
RecordUtils.getSqlType(value)));
-                    List<Map<String, Object>> dataList = new ArrayList<>();
-                    dataList.add(field);
-                    CanalJson canalJson = CanalJson.builder()
-                            .data(dataList)
-                            
.database(to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD))
-                            .sql("")
-                            .es((Long) 
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY))
-                            .isDdl(false)
-                            .pkNames(null)
-                            .mysqlType(getMysqlType(tableSchema))
-                            
.table(to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD))
-                            .ts((Long) 
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY))
-                            .type(canalOp)
-                            .sqlType(sqlType)
-                            .build();
-                    try {
-                        return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
-                    } catch (Exception e) {
-                        throw new IllegalStateException("exception occurs when 
get meta data", e);
-                    }
+                    return getCanalData(record, rowData, tableSchema);
                 }
             });
 
     private final String key;
     private final DataType dataType;
     private final MetadataConverter converter;
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     MongoDBReadableMetadata(String key, DataType dataType, MetadataConverter 
converter) {
         this.key = key;
@@ -210,63 +131,6 @@ public enum MongoDBReadableMetadata {
         this.converter = converter;
     }
 
-    public static Map<String, String> getMysqlType(@Nullable 
TableChanges.TableChange tableSchema) {
-        if (tableSchema == null) {
-            return null;
-        }
-        Map<String, String> mysqlType = new LinkedHashMap<>();
-        final Table table = tableSchema.getTable();
-        table.columns()
-                .forEach(
-                        column -> {
-                            mysqlType.put(
-                                    column.name(),
-                                    String.format(
-                                            "%s(%d)",
-                                            column.typeName(),
-                                            column.length()));
-                        });
-        return mysqlType;
-    }
-
-    private static String getDebeziumOpType(RowData rowData) {
-        String opType = null;
-        switch (rowData.getRowKind()) {
-            case INSERT:
-                opType = "c";
-                break;
-            case DELETE:
-                opType = "d";
-                break;
-            case UPDATE_AFTER:
-            case UPDATE_BEFORE:
-                opType = "u";
-                break;
-            default:
-                return null;
-        }
-        return opType;
-    }
-
-    private static String getCanalOpType(RowData rowData) {
-        String opType = null;
-        switch (rowData.getRowKind()) {
-            case INSERT:
-                opType = "INSERT";
-                break;
-            case DELETE:
-                opType = "DELETE";
-                break;
-            case UPDATE_AFTER:
-            case UPDATE_BEFORE:
-                opType = "UPDATE";
-                break;
-            default:
-                return null;
-        }
-        return opType;
-    }
-
     public String getKey() {
         return key;
     }

Reply via email to