This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d3d7a36d4 [INLONG-7397][Sort] Fix MySql connector output two data with the same UPDATE operation (#7398) d3d7a36d4 is described below commit d3d7a36d4cf85a6d51eea858a902c576b1eb9d34 Author: Schnapps <zpen...@connect.ust.hk> AuthorDate: Wed Feb 22 18:53:58 2023 +0800 [INLONG-7397][Sort] Fix MySql connector output two data with the same UPDATE operation (#7398) --- .../cdc/mysql/table/MySqlReadableMetadata.java | 47 ++++++++++++++++------ 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java index 23b80f2df..c5f339911 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java @@ -172,7 +172,7 @@ public enum MySqlReadableMetadata { .mysqlType(getMysqlType(tableSchema)) .build(); DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source) - .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record)) + .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data)) .tableChange(tableSchema).build(); try { @@ -247,7 +247,7 @@ public enum MySqlReadableMetadata { @Override public Object read(SourceRecord record) { - return StringData.fromString(getCanalOpType(record)); + return StringData.fromString(getOpType(record)); } }), @@ -453,7 +453,7 @@ public enum MySqlReadableMetadata { .data(dataList).database(databaseName) .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema)) .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts) - .type(getCanalOpType(record)).sqlType(getSqlType(tableSchema)).build(); + .type(getCanalOpType(rowData)).sqlType(getSqlType(tableSchema)).build(); try { return StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson)); @@ -473,7 +473,7 @@ public enum MySqlReadableMetadata { this.converter = converter; } - private static String getCanalOpType(SourceRecord record) { + private static String getOpType(SourceRecord record) { String opType; final Envelope.Operation op = Envelope.operationFor(record); if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { @@ -486,15 +486,38 @@ public enum MySqlReadableMetadata { return opType; } - private static String getDebeziumOpType(SourceRecord record) { + private static String getCanalOpType(GenericRowData record) { String opType; - final Envelope.Operation op = Envelope.operationFor(record); - if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { - opType = "c"; - } else if (op == Envelope.Operation.DELETE) { - opType = "d"; - } else { - opType = "u"; + switch (record.getRowKind()) { + case DELETE: + case UPDATE_BEFORE: + opType = "DELETE"; + break; + case INSERT: + case UPDATE_AFTER: + opType = "INSERT"; + break; + default: + throw new IllegalStateException("the record only have states in DELETE, " + + "UPDATE_BEFORE, INSERT and UPDATE_AFTER"); + } + return opType; + } + + private static String getDebeziumOpType(GenericRowData record) { + String opType; + switch (record.getRowKind()) { + case DELETE: + case UPDATE_BEFORE: + opType = "d"; + break; + case INSERT: + case UPDATE_AFTER: + opType = "c"; + break; + default: + throw new IllegalStateException("the record only have states in DELETE, " + + "UPDATE_BEFORE, INSERT and UPDATE_AFTER"); } return opType; }