This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 1fbf04550 [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408) 1fbf04550 is described below commit 1fbf0455046f8d066337893225862245b5c1adfe Author: Schnapps <zpen...@connect.ust.hk> AuthorDate: Sun Nov 6 08:41:25 2022 +0800 [INLONG-6370][Sort] The op type in debezium format should be u or update (#6408) --- .../sort/cdc/mysql/table/MySqlReadableMetadata.java | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java index fcfb636ac..bff2cc284 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java @@ -166,7 +166,7 @@ public enum MySqlReadableMetadata { .mysqlType(getMysqlType(tableSchema)) .build(); DebeziumJson debeziumJson = DebeziumJson.builder().after(field).source(source) - .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getOpType(record)) + .tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record)) .tableChange(tableSchema).build(); try { @@ -237,7 +237,7 @@ public enum MySqlReadableMetadata { @Override public Object read(SourceRecord record) { - return StringData.fromString(getOpType(record)); + return StringData.fromString(getCanalOpType(record)); } }), @@ -435,7 +435,7 @@ public enum MySqlReadableMetadata { .data(dataList).database(databaseName) .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema)) .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts) - .type(getOpType(record)).sqlType(getSqlType(tableSchema)).build(); + .type(getCanalOpType(record)).sqlType(getSqlType(tableSchema)).build(); try { return StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson)); @@ -455,7 +455,7 @@ public enum MySqlReadableMetadata { this.converter = converter; } - private static String getOpType(SourceRecord record) { + private static String getCanalOpType(SourceRecord record) { String opType; final Envelope.Operation op = Envelope.operationFor(record); if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { @@ -468,6 +468,19 @@ public enum MySqlReadableMetadata { return opType; } + private static String getDebeziumOpType(SourceRecord record) { + String opType; + final Envelope.Operation op = Envelope.operationFor(record); + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + opType = "c"; + } else if (op == Envelope.Operation.DELETE) { + opType = "d"; + } else { + opType = "u"; + } + return opType; + } + private static List<String> getPkNames(@Nullable TableChanges.TableChange tableSchema) { if (tableSchema == null) { return null;