This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 4038842  [Improve](mysqlSync)Add the configuration of whether to 
synchronize the default value (#152)
4038842 is described below

commit 403884239c9e7de58deef82bc26599d07f68e0d2
Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com>
AuthorDate: Fri Jul 21 15:07:51 2023 +0800

    [Improve](mysqlSync)Add the configuration of whether to synchronize the 
default value (#152)
---
 .../DorisJsonDebeziumDeserializationSchema.java    | 185 +++++++++++++++++++++
 .../sink/writer/JsonDebeziumSchemaSerializer.java  |   2 +-
 .../org/apache/doris/flink/tools/cdc/CdcTools.java |   3 +-
 .../apache/doris/flink/tools/cdc/DatabaseSync.java |   5 +-
 .../flink/tools/cdc/mysql/MysqlDatabaseSync.java   |  16 +-
 .../flink/tools/cdc/CdcMysqlSyncDatabaseCase.java  |   3 +-
 .../tools/cdc/CdcOraclelSyncDatabaseCase.java      |   3 +-
 7 files changed, 207 insertions(+), 10 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java
new file mode 100644
index 0000000..9c54ade
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/DorisJsonDebeziumDeserializationSchema.java
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.deserialization;
+
+import org.apache.doris.flink.exception.DorisException;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
+import 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
+import 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
+import 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Currently just use for synchronous mysql non-default.
+ */
+public class DorisJsonDebeziumDeserializationSchema implements 
DebeziumDeserializationSchema<String> {
+
+    private static final JsonNodeFactory JSON_NODE_FACTORY = 
JsonNodeFactory.withExactBigDecimals(true);
+    private final ObjectMapper objectMapper;
+
+    public DorisJsonDebeziumDeserializationSchema() {
+        objectMapper = new ObjectMapper();
+    }
+
+    @Override
+    public void deserialize(SourceRecord sourceRecord, Collector<String> 
collector) throws Exception {
+        Schema schema = sourceRecord.valueSchema();
+        Object value = sourceRecord.value();
+        JsonNode jsonValue = convertToJson(schema, value);
+        byte[] bytes = 
objectMapper.writeValueAsString(jsonValue).getBytes(StandardCharsets.UTF_8);
+        collector.collect(new String(bytes));
+    }
+
+    private JsonNode convertToJson(Schema schema, Object value) throws 
DorisException {
+        if (value == null) {
+            if (schema == null) // Any schema is valid and we don't have a 
default, so treat this as an optional schema
+            {
+                return null;
+            }
+            if (schema.isOptional()) {
+                return JSON_NODE_FACTORY.nullNode();
+            }
+            throw new DorisException(
+                    "Conversion error: null value for field that is required 
and has no default value");
+        }
+
+        try {
+            final Schema.Type schemaType;
+            if (schema == null) {
+                schemaType = ConnectSchema.schemaType(value.getClass());
+                if (schemaType == null) {
+                    throw new DorisException(
+                            "Java class " + value.getClass() + " does not have 
corresponding schema type.");
+                }
+            } else {
+                schemaType = schema.type();
+            }
+            switch (schemaType) {
+                case INT8:
+                    return JSON_NODE_FACTORY.numberNode((Byte) value);
+                case INT16:
+                    return JSON_NODE_FACTORY.numberNode((Short) value);
+                case INT32:
+                    return JSON_NODE_FACTORY.numberNode((Integer) value);
+                case INT64:
+                    return JSON_NODE_FACTORY.numberNode((Long) value);
+                case FLOAT32:
+                    return JSON_NODE_FACTORY.numberNode((Float) value);
+                case FLOAT64:
+                    return JSON_NODE_FACTORY.numberNode((Double) value);
+                case BOOLEAN:
+                    return JSON_NODE_FACTORY.booleanNode((Boolean) value);
+                case STRING:
+                    CharSequence charSeq = (CharSequence) value;
+                    return JSON_NODE_FACTORY.textNode(charSeq.toString());
+                case BYTES:
+                    if (value instanceof byte[]) {
+                        return JSON_NODE_FACTORY.binaryNode((byte[]) value);
+                    } else if (value instanceof ByteBuffer) {
+                        return JSON_NODE_FACTORY.binaryNode(((ByteBuffer) 
value).array());
+                    } else if (value instanceof BigDecimal) {
+                        return JSON_NODE_FACTORY.numberNode((BigDecimal) 
value);
+                    } else {
+                        throw new DorisException("Invalid type for bytes type: 
" + value.getClass());
+                    }
+                case ARRAY: {
+                    Collection<?> collection = (Collection<?>) value;
+                    ArrayNode list = JSON_NODE_FACTORY.arrayNode();
+                    for (Object elem : collection) {
+                        Schema valueSchema = schema == null ? null : 
schema.valueSchema();
+                        JsonNode fieldValue = convertToJson(valueSchema, elem);
+                        list.add(fieldValue);
+                    }
+                    return list;
+                }
+                case MAP: {
+                    Map<?, ?> map = (Map<?, ?>) value;
+                    // If true, using string keys and JSON object; if false, 
using non-string keys and Array-encoding
+                    boolean objectMode;
+                    if (schema == null) {
+                        objectMode = true;
+                        for (Map.Entry<?, ?> entry : map.entrySet()) {
+                            if (!(entry.getKey() instanceof String)) {
+                                objectMode = false;
+                                break;
+                            }
+                        }
+                    } else {
+                        objectMode = schema.keySchema().type() == 
Schema.Type.STRING;
+                    }
+                    ObjectNode obj = null;
+                    ArrayNode list = null;
+                    if (objectMode) {
+                        obj = JSON_NODE_FACTORY.objectNode();
+                    } else {
+                        list = JSON_NODE_FACTORY.arrayNode();
+                    }
+                    for (Map.Entry<?, ?> entry : map.entrySet()) {
+                        Schema keySchema = schema == null ? null : 
schema.keySchema();
+                        Schema valueSchema = schema == null ? null : 
schema.valueSchema();
+                        JsonNode mapKey = convertToJson(keySchema, 
entry.getKey());
+                        JsonNode mapValue = convertToJson(valueSchema, 
entry.getValue());
+
+                        if (objectMode) {
+                            obj.set(mapKey.asText(), mapValue);
+                        } else {
+                            
list.add(JSON_NODE_FACTORY.arrayNode().add(mapKey).add(mapValue));
+                        }
+                    }
+                    return objectMode ? obj : list;
+                }
+                case STRUCT: {
+                    Struct struct = (Struct) value;
+                    if (!struct.schema().equals(schema)) {
+                        throw new DorisException("Mismatching schema.");
+                    }
+                    ObjectNode obj = JSON_NODE_FACTORY.objectNode();
+                    for (Field field : schema.fields()) {
+                        obj.set(field.name(), convertToJson(field.schema(), 
struct.getWithoutDefault(field.name())));
+                    }
+                    return obj;
+                }
+            }
+            throw new DorisException("Couldn't convert " + value + " to 
JSON.");
+        } catch (ClassCastException e) {
+            String schemaTypeStr = (schema != null) ? schema.type().toString() 
: "unknown schema";
+            throw new DorisException("Invalid type for " + schemaTypeStr + ": 
" + value.getClass());
+        }
+    }
+
+    @Override
+    public TypeInformation<String> getProducedType() {
+        return BasicTypeInfo.STRING_TYPE_INFO;
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index c7295e2..3329b23 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -327,4 +327,4 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
 
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
index 85790d9..4a44be9 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/CdcTools.java
@@ -78,13 +78,14 @@ public class CdcTools {
         String includingTables = params.get("including-tables");
         String excludingTables = params.get("excluding-tables");
         boolean createTableOnly = params.has("create-table-only");
+        boolean ignoreDefaultValue = params.has("ignore-default-value");
 
         Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
         Map<String, String> tableMap = getConfigMap(params, "table-conf");
         Configuration sinkConfig = Configuration.fromMap(sinkMap);
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        databaseSync.create(env, database, config, tablePrefix, tableSuffix, 
includingTables, excludingTables, sinkConfig, tableMap, createTableOnly);
+        databaseSync.create(env, database, config, tablePrefix, tableSuffix, 
includingTables, excludingTables, ignoreDefaultValue, sinkConfig, tableMap, 
createTableOnly);
         databaseSync.build();
         if(StringUtils.isNullOrWhitespaceOnly(jobName)){
             jobName = String.format("%s-Doris Sync Database: %s", type, 
config.getString("database-name","db"));
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
index 799eff1..82424c1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java
@@ -57,6 +57,7 @@ public abstract class DatabaseSync {
     protected Pattern excludingPattern;
     protected Map<String, String> tableConfig;
     protected Configuration sinkConfig;
+    protected boolean ignoreDefaultValue;
     public StreamExecutionEnvironment env;
     private boolean createTableOnly = false;
 
@@ -68,13 +69,15 @@ public abstract class DatabaseSync {
 
     public void create(StreamExecutionEnvironment env, String database, 
Configuration config,
                        String tablePrefix, String tableSuffix, String 
includingTables,
-                       String excludingTables, Configuration sinkConfig, 
Map<String, String> tableConfig, boolean createTableOnly) {
+                       String excludingTables, boolean ignoreDefaultValue, 
Configuration sinkConfig,
+            Map<String, String> tableConfig, boolean createTableOnly) {
         this.env = env;
         this.config = config;
         this.database = database;
         this.converter = new TableNameConverter(tablePrefix, tableSuffix);
         this.includingPattern = includingTables == null ? null : 
Pattern.compile(includingTables);
         this.excludingPattern = excludingTables == null ? null : 
Pattern.compile(excludingTables);
+        this.ignoreDefaultValue = ignoreDefaultValue;
         this.sinkConfig = sinkConfig;
         this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;
         //default enable light schema change
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
index 4d6d250..ac047e4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.java
@@ -23,8 +23,11 @@ import 
com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
 import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
 import com.ververica.cdc.connectors.mysql.table.StartupOptions;
 import 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.DebeziumOptions;
+
+import 
org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
 import org.apache.doris.flink.tools.cdc.DatabaseSync;
 import org.apache.doris.flink.tools.cdc.DateToStringConverter;
 import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -178,11 +181,14 @@ public class MysqlDatabaseSync extends DatabaseSync {
         }
         sourceBuilder.jdbcProperties(jdbcProperties);
         sourceBuilder.debeziumProperties(debeziumProperties);
-
-        Map<String, Object> customConverterConfigs = new HashMap<>();
-        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
-        JsonDebeziumDeserializationSchema schema =
-                new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        DebeziumDeserializationSchema<String> schema;
+        if (ignoreDefaultValue) {
+            schema = new DorisJsonDebeziumDeserializationSchema();
+        } else {
+            Map<String, Object> customConverterConfigs = new HashMap<>();
+            
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
+            schema = new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+        }
         MySqlSource<String> mySqlSource = 
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
 
         DataStreamSource<String> streamSource = env.fromSource(
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
index c940477..4a109c2 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcMysqlSyncDatabaseCase.java
@@ -64,8 +64,9 @@ public class CdcMysqlSyncDatabaseCase {
 
         String includingTables = "tbl1|tbl2|tbl3";
         String excludingTables = "";
+        boolean ignoreDefaultValue = false;
         DatabaseSync databaseSync = new MysqlDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig,
 false);
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false);
         databaseSync.build();
         env.execute(String.format("MySQL-Doris Database Sync: %s", database));
 
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
index 23610ab..b3b4384 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcOraclelSyncDatabaseCase.java
@@ -70,8 +70,9 @@ public class CdcOraclelSyncDatabaseCase {
 
         String includingTables = "test.*";
         String excludingTables = "";
+        boolean ignoreDefaultValue = false;
         DatabaseSync databaseSync = new OracleDatabaseSync();
-        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,sinkConf,tableConfig,
 false);
+        
databaseSync.create(env,database,config,tablePrefix,tableSuffix,includingTables,excludingTables,ignoreDefaultValue,sinkConf,tableConfig,
 false);
         databaseSync.build();
         env.execute(String.format("Oracle-Doris Database Sync: %s", database));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to