This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ab962c50 [INLONG-7957][Sort] Fix Oracle connector output two data 
with the same UPDATE operation (#7961)
5ab962c50 is described below

commit 5ab962c5005f4c4955bdcbf3ec9ef7567fda2587
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Sun May 7 08:30:42 2023 +0800

    [INLONG-7957][Sort] Fix Oracle connector output two data with the same 
UPDATE operation (#7961)
---
 .../cdc/oracle/table/OracleReadableMetaData.java     | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
index 40621163c..515b7b08c 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java
@@ -384,7 +384,7 @@ public enum OracleReadableMetaData {
                 .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
                 .oracleType(getOracleType(tableSchema))
                 .table(tableName).ts(ts)
-                
.type(getOpType(record)).sqlType(getSqlType(tableSchema)).build();
+                
.type(getCanalOpType(data)).sqlType(getSqlType(tableSchema)).build();
         try {
             ObjectMapper objectMapper = new ObjectMapper();
             return 
StringData.fromString(objectMapper.writeValueAsString(canalJson));
@@ -425,6 +425,24 @@ public enum OracleReadableMetaData {
         return opType;
     }
 
+    public static String getCanalOpType(GenericRowData record) {
+        String opType;
+        switch (record.getRowKind()) {
+            case DELETE:
+            case UPDATE_BEFORE:
+                opType = OP_DELETE;
+                break;
+            case INSERT:
+            case UPDATE_AFTER:
+                opType = OP_INSERT;
+                break;
+            default:
+                throw new IllegalStateException("the record only have states 
in DELETE, "
+                        + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+        }
+        return opType;
+    }
+
     private static List<String> getPkNames(@Nullable TableChanges.TableChange 
tableSchema) {
         if (tableSchema == null) {
             return null;

Reply via email to