This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new bb45659e1 [INLONG-7958][Sort] Fix MongoDB's schema becomes unordered after extracting the row data (#7960) bb45659e1 is described below commit bb45659e1b0d300d8d306f6cf47ab849aa8f906e Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Sun May 7 08:31:22 2023 +0800 [INLONG-7958][Sort] Fix MongoDB's schema becomes unordered after extracting the row data (#7960) --- .../sort/cdc/mongodb/debezium/DebeziumJson.java | 49 ++++++ .../MongoDBConnectorDeserializationSchema.java | 5 +- .../source/reader/MongoDBRecordEmitter.java | 3 +- .../source/reader/fetch/MongoDBScanFetchTask.java | 3 +- .../reader/fetch/MongoDBStreamFetchTask.java | 3 +- .../cdc/mongodb/source/utils/MetaDataUtils.java | 177 +++++++++++++++++++++ .../sort/cdc/mongodb/source/utils/MongoUtils.java | 3 +- .../cdc/mongodb/table/MongoDBReadableMetadata.java | 152 +----------------- 8 files changed, 241 insertions(+), 154 deletions(-) diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumJson.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumJson.java new file mode 100644 index 000000000..f64fc5e2a --- /dev/null +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumJson.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.cdc.mongodb.debezium; + +import io.debezium.relational.history.TableChanges; +import java.util.List; +import java.util.Map; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class DebeziumJson { + + private Map<String, String> before; + private Map<String, Object> after; + private Source source; + private TableChanges.TableChange tableChange; + private long tsMs; + private String op; + + @Builder + @Data + public static class Source { + + private String name; + private String db; + private String table; + private List<String> pkNames; + private Map<String, Integer> sqlType; + private Map<String, String> mysqlType; + } + +} diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java index 98e5cf4e9..b8a0a86cc 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java @@ -20,6 +20,7 @@ package org.apache.inlong.sort.cdc.mongodb.debezium.table; import com.mongodb.client.model.changestream.OperationType; import com.mongodb.internal.HexUtils; import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope; +import java.util.LinkedHashMap; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; @@ -809,8 +810,8 @@ public class MongoDBConnectorDeserializationSchema } return row; } else { - Map<String, Object> data = new HashMap<>(); - Map<String, String> dataType = new HashMap<>(); + Map<String, Object> data = new LinkedHashMap<>(); + Map<String, String> dataType = new LinkedHashMap<>(); document.forEach((key, value) -> { try { LogicalType logicalType = RecordUtils.convertLogicType(value); diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java index b9e17a624..5f647031e 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/MongoDBRecordEmitter.java @@ -55,8 +55,7 @@ import org.slf4j.LoggerFactory; */ public final class MongoDBRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> { - private static final Logger LOG = LoggerFactory.getLogger( - com.ververica.cdc.connectors.mongodb.source.reader.MongoDBRecordEmitter.class); + private static final Logger LOG = LoggerFactory.getLogger(MongoDBRecordEmitter.class); public MongoDBRecordEmitter( DebeziumDeserializationSchema<T> deserializationSchema, diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java index 88f4c2c2d..963461ea9 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBScanFetchTask.java @@ -65,8 +65,7 @@ import org.slf4j.LoggerFactory; */ public class MongoDBScanFetchTask implements FetchTask<SourceSplitBase> { - private static final Logger LOG = LoggerFactory.getLogger( - com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask.class); + private static final Logger LOG = LoggerFactory.getLogger(MongoDBScanFetchTask.class); private final SnapshotSplit snapshotSplit; private volatile boolean taskRunning = false; diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java index 69fa89772..420132b87 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java @@ -82,8 +82,7 @@ import org.slf4j.LoggerFactory; */ public class MongoDBStreamFetchTask implements FetchTask<SourceSplitBase> { - private static final Logger LOG = LoggerFactory.getLogger( - com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask.class); + private static final Logger LOG = LoggerFactory.getLogger(MongoDBStreamFetchTask.class); private final StreamSplit streamSplit; private volatile boolean taskRunning = false; diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MetaDataUtils.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MetaDataUtils.java new file mode 100644 index 000000000..28cc14d58 --- /dev/null +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MetaDataUtils.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.cdc.mongodb.source.utils; + +import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope; +import io.debezium.connector.AbstractSourceInfo; +import io.debezium.data.Envelope; +import io.debezium.data.Envelope.FieldName; +import io.debezium.relational.history.TableChanges.TableChange; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.inlong.sort.cdc.mongodb.debezium.DebeziumJson; +import org.apache.inlong.sort.cdc.mongodb.debezium.DebeziumJson.Source; +import org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils; +import org.apache.inlong.sort.formats.json.canal.CanalJson; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +public class MetaDataUtils { + + private static final String MONGODB_DEFAULT_PRIMARY_KEY = "_id"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** + * get collection name from record + */ + public static String getMetaData(SourceRecord record, String metaDataKey) { + Struct value = (Struct) record.value(); + Struct to = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD); + return to.getString(metaDataKey); + } + + /** + * get sql type from row data, represents the jdbc data type + */ + public static Map<String, Integer> getSqlType(@Nullable RowData rowData) { + if (rowData == null) { + return null; + } + GenericRowData data = (GenericRowData) rowData; + Map<String, String> mongoDbType = (Map<String, String>) data.getField(1); + Map<String, Integer> sqlType = new LinkedHashMap<>(); + mongoDbType.forEach((name, value) -> sqlType.put(name, RecordUtils.getSqlType(value))); + return sqlType; + } + + private static String getDebeziumOpType(RowData rowData) { + String opType; + switch (rowData.getRowKind()) { + case DELETE: + case UPDATE_BEFORE: + opType = "d"; + break; + case INSERT: + case UPDATE_AFTER: + opType = "c"; + break; + default: + throw new IllegalStateException("the record only have states in DELETE, " + + "UPDATE_BEFORE, INSERT and UPDATE_AFTER"); + } + return opType; + } + + private static String getCanalOpType(RowData rowData) { + String opType; + switch (rowData.getRowKind()) { + case DELETE: + case UPDATE_BEFORE: + opType = "DELETE"; + break; + case INSERT: + case UPDATE_AFTER: + opType = "INSERT"; + break; + default: + throw new IllegalStateException("the record only have states in DELETE, " + + "UPDATE_BEFORE, INSERT and UPDATE_AFTER"); + } + return opType; + } + + public static StringData getCanalData(SourceRecord record, RowData rowData, + TableChange tableSchema) { + // construct canal json + Struct messageStruct = (Struct) record.value(); + Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); + GenericRowData data = (GenericRowData) rowData; + Map<String, Object> field = (Map<String, Object>) data.getField(0); + Map<String, String> mongoDbType = (Map<String, String>) data.getField(1); + + String database = getMetaData(record, MongoDBEnvelope.NAMESPACE_DATABASE_FIELD); + String table = getMetaData(record, MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD); + Long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY); + long ts = (Long) messageStruct.get(FieldName.TIMESTAMP); + + List<Map<String, Object>> dataList = new ArrayList<>(); + dataList.add(field); + CanalJson canalJson = CanalJson.builder() + .data(dataList) + .database(database) + .sql("") + .es(opTs) + .isDdl(false) + .pkNames(Collections.singletonList(MONGODB_DEFAULT_PRIMARY_KEY)) + .mysqlType(mongoDbType) + .table(table) + .ts(ts) + .type(getCanalOpType(rowData)) + .sqlType(getSqlType(data)) + .build(); + try { + return StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson)); + } catch (Exception e) { + throw new IllegalStateException("exception occurs when get meta data", e); + } + } + + public static StringData getDebeziumData(SourceRecord record, TableChange tableSchema, + RowData rowData) { + // construct debezium json + Struct messageStruct = (Struct) record.value(); + GenericRowData data = (GenericRowData) rowData; + Map<String, Object> field = (Map<String, Object>) data.getField(0); + Map<String, String> mongoDbType = (Map<String, String>) data.getField(1); + + String database = getMetaData(record, MongoDBEnvelope.NAMESPACE_DATABASE_FIELD); + String table = getMetaData(record, MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD); + long ts = (Long) messageStruct.get(FieldName.TIMESTAMP); + String debeziumOp = getDebeziumOpType(rowData); + + Source source = Source.builder() + .db(database) + .table(table) + .name("mongodb_cdc_source") + .mysqlType(mongoDbType) + .sqlType(getSqlType(rowData)) + .pkNames(Collections.singletonList(MONGODB_DEFAULT_PRIMARY_KEY)) + .build(); + DebeziumJson debeziumJson = DebeziumJson.builder() + .source(source) + .after(field) + .tsMs(ts) + .op(debeziumOp) + .tableChange(tableSchema) + .build(); + try { + return StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson)); + } catch (Exception e) { + throw new IllegalStateException("exception occurs when get meta data", e); + } + } + +} diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java index 874cc0a59..979493809 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/source/utils/MongoUtils.java @@ -70,8 +70,7 @@ import org.slf4j.LoggerFactory; */ public class MongoUtils { - private static final Logger LOG = LoggerFactory.getLogger( - com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(MongoUtils.class); public static final BsonDouble COMMAND_SUCCEED_FLAG = new BsonDouble(1.0d); diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java index 19db352eb..0fa28fe37 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBReadableMetadata.java @@ -17,31 +17,22 @@ package org.apache.inlong.sort.cdc.mongodb.table; +import static org.apache.inlong.sort.cdc.mongodb.source.utils.MetaDataUtils.getCanalData; +import static org.apache.inlong.sort.cdc.mongodb.source.utils.MetaDataUtils.getDebeziumData; +import static org.apache.inlong.sort.cdc.mongodb.source.utils.MetaDataUtils.getMetaData; + import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope; import io.debezium.connector.AbstractSourceInfo; import io.debezium.data.Envelope; -import io.debezium.relational.Table; import io.debezium.relational.history.TableChanges; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; import javax.annotation.Nullable; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; import org.apache.inlong.sort.cdc.mongodb.debezium.table.MetadataConverter; -import org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils; -import org.apache.inlong.sort.formats.json.canal.CanalJson; -import org.apache.inlong.sort.formats.json.debezium.DebeziumJson; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -57,10 +48,7 @@ public enum MongoDBReadableMetadata { @Override public Object read(SourceRecord record) { - Struct value = (Struct) record.value(); - Struct to = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD); - return StringData.fromString( - to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD)); + return getMetaData(record, MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD); } }), @@ -73,10 +61,7 @@ public enum MongoDBReadableMetadata { @Override public Object read(SourceRecord record) { - Struct value = (Struct) record.value(); - Struct to = value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD); - return StringData.fromString( - to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD)); + return getMetaData(record, MongoDBEnvelope.NAMESPACE_DATABASE_FIELD); } }), @@ -111,40 +96,8 @@ public enum MongoDBReadableMetadata { @Override public Object read(SourceRecord record, @Nullable TableChanges.TableChange tableSchema, RowData rowData) { - // construct debezium json - Struct messageStruct = (Struct) record.value(); - Struct to = messageStruct.getStruct(MongoDBEnvelope.NAMESPACE_FIELD); - Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); - GenericRowData data = (GenericRowData) rowData; - Map<String, Object> field = (Map<String, Object>) data.getField(0); - Map<String, String> mysqlType = (Map<String, String>) data.getField(1); - Map<String, Integer> sqlType = new HashMap<>(); - mysqlType.forEach((name, value) -> sqlType.put(name, RecordUtils.getSqlType(value))); - String debeziumOp = getDebeziumOpType(rowData); - if (StringUtils.isBlank(debeziumOp)) { - return null; - } - DebeziumJson.Source source = DebeziumJson.Source.builder() - .db(to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD)) - .table(to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD)) - .name("mongo_binlog_source") - .mysqlType(mysqlType) - .sqlType(sqlType) - .pkNames(null) - .build(); - DebeziumJson debeziumJson = DebeziumJson.builder() - .source(source) - .after(field) - .tsMs((Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)) - .op(debeziumOp) - .tableChange(tableSchema) - .build(); - try { - return StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson)); - } catch (Exception e) { - throw new IllegalStateException("exception occurs when get meta data", e); - } + return getDebeziumData(record, tableSchema, rowData); } }), @@ -164,45 +117,13 @@ public enum MongoDBReadableMetadata { public Object read(SourceRecord record, @Nullable TableChanges.TableChange tableSchema, RowData rowData) { // construct canal json - Struct messageStruct = (Struct) record.value(); - Struct to = messageStruct.getStruct(MongoDBEnvelope.NAMESPACE_FIELD); - Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE); - String canalOp = getCanalOpType(rowData); - if (StringUtils.isBlank(canalOp)) { - return null; - } - GenericRowData data = (GenericRowData) rowData; - Map<String, Object> field = (Map<String, Object>) data.getField(0); - Map<String, String> mysqlType = (Map<String, String>) data.getField(1); - Map<String, Integer> sqlType = new HashMap<>(); - mysqlType.forEach((name, value) -> sqlType.put(name, RecordUtils.getSqlType(value))); - List<Map<String, Object>> dataList = new ArrayList<>(); - dataList.add(field); - CanalJson canalJson = CanalJson.builder() - .data(dataList) - .database(to.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD)) - .sql("") - .es((Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)) - .isDdl(false) - .pkNames(null) - .mysqlType(getMysqlType(tableSchema)) - .table(to.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD)) - .ts((Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY)) - .type(canalOp) - .sqlType(sqlType) - .build(); - try { - return StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson)); - } catch (Exception e) { - throw new IllegalStateException("exception occurs when get meta data", e); - } + return getCanalData(record, rowData, tableSchema); } }); private final String key; private final DataType dataType; private final MetadataConverter converter; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); MongoDBReadableMetadata(String key, DataType dataType, MetadataConverter converter) { this.key = key; @@ -210,63 +131,6 @@ public enum MongoDBReadableMetadata { this.converter = converter; } - public static Map<String, String> getMysqlType(@Nullable TableChanges.TableChange tableSchema) { - if (tableSchema == null) { - return null; - } - Map<String, String> mysqlType = new LinkedHashMap<>(); - final Table table = tableSchema.getTable(); - table.columns() - .forEach( - column -> { - mysqlType.put( - column.name(), - String.format( - "%s(%d)", - column.typeName(), - column.length())); - }); - return mysqlType; - } - - private static String getDebeziumOpType(RowData rowData) { - String opType = null; - switch (rowData.getRowKind()) { - case INSERT: - opType = "c"; - break; - case DELETE: - opType = "d"; - break; - case UPDATE_AFTER: - case UPDATE_BEFORE: - opType = "u"; - break; - default: - return null; - } - return opType; - } - - private static String getCanalOpType(RowData rowData) { - String opType = null; - switch (rowData.getRowKind()) { - case INSERT: - opType = "INSERT"; - break; - case DELETE: - opType = "DELETE"; - break; - case UPDATE_AFTER: - case UPDATE_BEFORE: - opType = "UPDATE"; - break; - default: - return null; - } - return opType; - } - public String getKey() { return key; }