This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 4038842 [Improve](mysqlSync)Add the configuration of whether to synchronize the default value (#152) 4038842 is described below commit 403884239c9e7de58deef82bc26599d07f68e0d2 Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Fri Jul 21 15:07:51 2023 +0800 [Improve](mysqlSync)Add the configuration of whether to synchronize the default value (#152) --- .../DorisJsonDebeziumDeserializationSchema.java | 185 +++++++++++++++++++++ .../sink/writer/JsonDebeziumSchemaSerializer.java | 2 +- .../org/apache/doris/flink/tools/cdc/CdcTools.java | 3 +- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 5 +- .../flink/tools/cdc/mysql/MysqlDatabaseSync.java | 16 +- .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java | 3 +- .../tools/cdc/CdcOraclelSyncDatabaseCase.java | 3 +- 7 files changed, 207 insertions(+), 10 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java new file mode 100644 index 0000000..9c54ade --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.deserialization; + +import org.apache.doris.flink.exception.DorisException; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct; +import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.util.Collector; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Map; + +/** + * Currently just use for synchronous mysql non-default. + */ +public class DorisJsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> { + + private static final JsonNodeFactory JSON_NODE_FACTORY = JsonNodeFactory.withExactBigDecimals(true); + private final ObjectMapper objectMapper; + + public DorisJsonDebeziumDeserializationSchema() { + objectMapper = new ObjectMapper(); + } + + @Override + public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception { + Schema schema = sourceRecord.valueSchema(); + Object value = sourceRecord.value(); + JsonNode jsonValue = convertToJson(schema, value); + byte[] bytes = objectMapper.writeValueAsString(jsonValue).getBytes(StandardCharsets.UTF_8); + collector.collect(new String(bytes)); + } + + private JsonNode convertToJson(Schema schema, Object value) throws DorisException { + if (value == null) { + if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema + { + return null; + } + if (schema.isOptional()) { + return JSON_NODE_FACTORY.nullNode(); + } + throw new DorisException( + "Conversion error: null value for field that is required and has no default value"); + } + + try { + final Schema.Type schemaType; + if (schema == null) { + schemaType = ConnectSchema.schemaType(value.getClass()); + if (schemaType == null) { + throw new DorisException( + "Java class " + value.getClass() + " does not have corresponding schema type."); + } + } else { + schemaType = schema.type(); + } + switch (schemaType) { + case INT8: + return JSON_NODE_FACTORY.numberNode((Byte) value); + case INT16: + return JSON_NODE_FACTORY.numberNode((Short) value); + case INT32: + return JSON_NODE_FACTORY.numberNode((Integer) value); + case INT64: + return JSON_NODE_FACTORY.numberNode((Long) value); + case FLOAT32: + return JSON_NODE_FACTORY.numberNode((Float) value); + case FLOAT64: + return JSON_NODE_FACTORY.numberNode((Double) value); + case BOOLEAN: + return JSON_NODE_FACTORY.booleanNode((Boolean) value); + case STRING: + CharSequence charSeq = (CharSequence) value; + return JSON_NODE_FACTORY.textNode(charSeq.toString()); + case BYTES: + if (value instanceof byte[]) { + return JSON_NODE_FACTORY.binaryNode((byte[]) value); + } else if (value instanceof ByteBuffer) { + return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) value).array()); + } else if (value instanceof BigDecimal) { + return JSON_NODE_FACTORY.numberNode((BigDecimal) value); + } else { + throw new DorisException("Invalid type for bytes type: " + value.getClass()); + } + case ARRAY: { + Collection<?> collection = (Collection<?>) value; + ArrayNode list = JSON_NODE_FACTORY.arrayNode(); + for (Object elem : collection) { + Schema valueSchema = schema == null ? null : schema.valueSchema(); + JsonNode fieldValue = convertToJson(valueSchema, elem); + list.add(fieldValue); + } + return list; + } + case MAP: { + Map<?, ?> map = (Map<?, ?>) value; + // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding + boolean objectMode; + if (schema == null) { + objectMode = true; + for (Map.Entry<?, ?> entry : map.entrySet()) { + if (!(entry.getKey() instanceof String)) { + objectMode = false; + break; + } + } + } else { + objectMode = schema.keySchema().type() == Schema.Type.STRING; + } + ObjectNode obj = null; + ArrayNode list = null; + if (objectMode) { + obj = JSON_NODE_FACTORY.objectNode(); + } else { + list = JSON_NODE_FACTORY.arrayNode(); + } + for (Map.Entry<?, ?> entry : map.entrySet()) { + Schema keySchema = schema == null ? null : schema.keySchema(); + Schema valueSchema = schema == null ? null : schema.valueSchema(); + JsonNode mapKey = convertToJson(keySchema, entry.getKey()); + JsonNode mapValue = convertToJson(valueSchema, entry.getValue()); + + if (objectMode) { + obj.set(mapKey.asText(), mapValue); + } else { + list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue)); + } + } + return objectMode ? obj : list; + } + case STRUCT: { + Struct struct = (Struct) value; + if (!struct.schema().equals(schema)) { + throw new DorisException("Mismatching schema."); + } + ObjectNode obj = JSON_NODE_FACTORY.objectNode(); + for (Field field : schema.fields()) { + obj.set(field.name(), convertToJson(field.schema(), struct.getWithoutDefault(field.name()))); + } + return obj; + } + } + throw new DorisException("Couldn't convert " + value + " to JSON."); + } catch (ClassCastException e) { + String schemaTypeStr = (schema != null) ? schema.type().toString() : "unknown schema"; + throw new DorisException("Invalid type for " + schemaTypeStr + ": " + value.getClass()); + } + } + + @Override + public TypeInformation<String> getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java index c7295e2..3329b23 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java @@ -327,4 +327,4 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin } -} +} \ No newline at end of file diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java index 85790d9..4a44be9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java @@ -78,13 +78,14 @@ public class CdcTools { String includingTables = params.get("including-tables"); String excludingTables = params.get("excluding-tables"); boolean createTableOnly = params.has("create-table-only"); + boolean ignoreDefaultValue = params.has("ignore-default-value"); Map<String, String> sinkMap = getConfigMap(params, "sink-conf"); Map<String, String> tableMap = getConfigMap(params, "table-conf"); Configuration sinkConfig = Configuration.fromMap(sinkMap); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, sinkConfig, tableMap, createTableOnly); + databaseSync.create(env, database, config, tablePrefix, tableSuffix, includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, createTableOnly); databaseSync.build(); if(StringUtils.isNullOrWhitespaceOnly(jobName)){ jobName = String.format("%s-Doris Sync Database: %s", type, config.getString("database-name","db")); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 799eff1..82424c1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -57,6 +57,7 @@ public abstract class DatabaseSync { protected Pattern excludingPattern; protected Map<String, String> tableConfig; protected Configuration sinkConfig; + protected boolean ignoreDefaultValue; public StreamExecutionEnvironment env; private boolean createTableOnly = false; @@ -68,13 +69,15 @@ public abstract class DatabaseSync { public void create(StreamExecutionEnvironment env, String database, Configuration config, String tablePrefix, String tableSuffix, String includingTables, - String excludingTables, Configuration sinkConfig, Map<String, String> tableConfig, boolean createTableOnly) { + String excludingTables, boolean ignoreDefaultValue, Configuration sinkConfig, + Map<String, String> tableConfig, boolean createTableOnly) { this.env = env; this.config = config; this.database = database; this.converter = new TableNameConverter(tablePrefix, tableSuffix); this.includingPattern = includingTables == null ? null : Pattern.compile(includingTables); this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); + this.ignoreDefaultValue = ignoreDefaultValue; this.sinkConfig = sinkConfig; this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig; //default enable light schema change diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java index 4d6d250..ac047e4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java @@ -23,8 +23,11 @@ import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.DebeziumOptions; + +import org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema; import org.apache.doris.flink.tools.cdc.DatabaseSync; import org.apache.doris.flink.tools.cdc.DateToStringConverter; import org.apache.doris.flink.tools.cdc.SourceSchema; @@ -178,11 +181,14 @@ public class MysqlDatabaseSync extends DatabaseSync { } sourceBuilder.jdbcProperties(jdbcProperties); sourceBuilder.debeziumProperties(debeziumProperties); - - Map<String, Object> customConverterConfigs = new HashMap<>(); - customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); - JsonDebeziumDeserializationSchema schema = - new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + DebeziumDeserializationSchema<String> schema; + if (ignoreDefaultValue) { + schema = new DorisJsonDebeziumDeserializationSchema(); + } else { + Map<String, Object> customConverterConfigs = new HashMap<>(); + customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); + schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + } MySqlSource<String> mySqlSource = sourceBuilder.deserializer(schema).includeSchemaChanges(true).build(); DataStreamSource<String> streamSource = env.fromSource( diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java index c940477..4a109c2 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java @@ -64,8 +64,9 @@ public class CdcMysqlSyncDatabaseCase { String includingTables = "tbl1|tbl2|tbl3"; String excludingTables = ""; + boolean ignoreDefaultValue = false; DatabaseSync databaseSync = new MysqlDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig, false); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false); databaseSync.build(); env.execute(String.format("MySQL-Doris Database Sync: %s", database)); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java index 23610ab..b3b4384 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java @@ -70,8 +70,9 @@ public class CdcOraclelSyncDatabaseCase { String includingTables = "test.*"; String excludingTables = ""; + boolean ignoreDefaultValue = false; DatabaseSync databaseSync = new OracleDatabaseSync(); - databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig, false); + databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig, false); databaseSync.build(); env.execute(String.format("Oracle-Doris Database Sync: %s", database)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org