This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5ab962c50 [INLONG-7957][Sort] Fix Oracle connector output two data with the same UPDATE operation (#7961) 5ab962c50 is described below commit 5ab962c5005f4c4955bdcbf3ec9ef7567fda2587 Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Sun May 7 08:30:42 2023 +0800 [INLONG-7957][Sort] Fix Oracle connector output two data with the same UPDATE operation (#7961) --- .../cdc/oracle/table/OracleReadableMetaData.java | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java index 40621163c..515b7b08c 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleReadableMetaData.java @@ -384,7 +384,7 @@ public enum OracleReadableMetaData { .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema)) .oracleType(getOracleType(tableSchema)) .table(tableName).ts(ts) - .type(getOpType(record)).sqlType(getSqlType(tableSchema)).build(); + .type(getCanalOpType(data)).sqlType(getSqlType(tableSchema)).build(); try { ObjectMapper objectMapper = new ObjectMapper(); return StringData.fromString(objectMapper.writeValueAsString(canalJson)); @@ -425,6 +425,24 @@ public enum OracleReadableMetaData { return opType; } + public static String getCanalOpType(GenericRowData record) { + String opType; + switch (record.getRowKind()) { + case DELETE: + case UPDATE_BEFORE: + opType = OP_DELETE; + break; + case INSERT: + case UPDATE_AFTER: + opType = OP_INSERT; + break; + default: + throw new IllegalStateException("the record only have states in DELETE, " + + "UPDATE_BEFORE, INSERT and UPDATE_AFTER"); + } + return opType; + } + private static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) { if (tableSchema == null) { return null;