This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d3d7a36d4 [INLONG-7397][Sort] Fix MySql connector output two data with 
the same UPDATE operation (#7398)
d3d7a36d4 is described below

commit d3d7a36d4cf85a6d51eea858a902c576b1eb9d34
Author: Schnapps <zpen...@connect.ust.hk>
AuthorDate: Wed Feb 22 18:53:58 2023 +0800

    [INLONG-7397][Sort] Fix MySql connector output two data with the same 
UPDATE operation (#7398)
---
 .../cdc/mysql/table/MySqlReadableMetadata.java     | 47 ++++++++++++++++------
 1 file changed, 35 insertions(+), 12 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index 23b80f2df..c5f339911 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -172,7 +172,7 @@ public enum MySqlReadableMetadata {
                             .mysqlType(getMysqlType(tableSchema))
                             .build();
                     DebeziumJson debeziumJson = 
DebeziumJson.builder().after(field).source(source)
-                            
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record))
+                            
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
                             .tableChange(tableSchema).build();
 
                     try {
@@ -247,7 +247,7 @@ public enum MySqlReadableMetadata {
 
                 @Override
                 public Object read(SourceRecord record) {
-                    return StringData.fromString(getCanalOpType(record));
+                    return StringData.fromString(getOpType(record));
                 }
             }),
 
@@ -453,7 +453,7 @@ public enum MySqlReadableMetadata {
                 .data(dataList).database(databaseName)
                 .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
                 .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
-                
.type(getCanalOpType(record)).sqlType(getSqlType(tableSchema)).build();
+                
.type(getCanalOpType(rowData)).sqlType(getSqlType(tableSchema)).build();
 
         try {
             return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
@@ -473,7 +473,7 @@ public enum MySqlReadableMetadata {
         this.converter = converter;
     }
 
-    private static String getCanalOpType(SourceRecord record) {
+    private static String getOpType(SourceRecord record) {
         String opType;
         final Envelope.Operation op = Envelope.operationFor(record);
         if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
@@ -486,15 +486,38 @@ public enum MySqlReadableMetadata {
         return opType;
     }
 
-    private static String getDebeziumOpType(SourceRecord record) {
+    private static String getCanalOpType(GenericRowData record) {
         String opType;
-        final Envelope.Operation op = Envelope.operationFor(record);
-        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
-            opType = "c";
-        } else if (op == Envelope.Operation.DELETE) {
-            opType = "d";
-        } else {
-            opType = "u";
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "DELETE";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "INSERT";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states 
in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
+    private static String getDebeziumOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = "d";
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = "c";
+                break;
+            default:
+                throw new IllegalStateException("the record only have states 
in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
         }
         return opType;
     }

Reply via email to