This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 222ce60  [Improve](schemaChange)schema change type adapts to other 
connectors (#205)
222ce60 is described below

commit 222ce60833ae114f54497e2404dfee8d93993357
Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com>
AuthorDate: Wed Oct 11 10:05:03 2023 +0800

    [Improve](schemaChange)schema change type adapts to other connectors (#205)
---
 .../sink/writer/JsonDebeziumSchemaSerializer.java  | 156 +++++++++++++--------
 .../flink/sink/writer/SchemaChangeHelper.java      |   2 +
 .../doris/flink/tools/cdc/SourceConnector.java     |  36 +++++
 .../writer/TestJsonDebeziumSchemaSerializer.java   | 138 ++++++++++++++----
 4 files changed, 250 insertions(+), 82 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
index b2d88c6..fd3c92a 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.NullNode;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.CollectionUtils;
+
 import org.apache.doris.flink.catalog.doris.FieldSchema;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
@@ -33,7 +34,12 @@ import 
org.apache.doris.flink.exception.IllegalArgumentException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.HttpGetWithEntity;
 import org.apache.doris.flink.sink.writer.SchemaChangeHelper.DDLSchema;
+import org.apache.doris.flink.tools.cdc.SourceConnector;
 import org.apache.doris.flink.tools.cdc.mysql.MysqlType;
+import org.apache.doris.flink.tools.cdc.oracle.OracleType;
+import org.apache.doris.flink.tools.cdc.postgres.PostgresType;
+import org.apache.doris.flink.tools.cdc.sqlserver.SqlServerType;
+
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.StringUtils;
 import org.apache.http.HttpHeaders;
@@ -73,14 +79,15 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private static final String OP_UPDATE = "u"; // update
     private static final String OP_DELETE = "d"; // delete
 
-    public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; 
//alter table tbl add cloumn aca int
-    private static final String addDropDDLRegex = 
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
+    public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; 
// alter table tbl add cloumn aca int
+    private static final String addDropDDLRegex
+            = 
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
     private final Pattern addDropDDLPattern;
     private DorisOptions dorisOptions;
     private ObjectMapper objectMapper = new ObjectMapper();
     private String database;
     private String table;
-    //table name of the cdc upstream, format is db.tbl
+    // table name of the cdc upstream, format is db.tbl
     private String sourceTableName;
     private boolean firstLoad;
     private boolean firstSchemaChange;
@@ -88,11 +95,12 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private final boolean newSchemaChange;
     private String lineDelimiter = LINE_DELIMITER_DEFAULT;
     private boolean ignoreUpdateBefore = true;
+    private SourceConnector sourceConnector;
 
     public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
-                                        Pattern pattern,
-                                        String sourceTableName,
-                                        boolean newSchemaChange) {
+            Pattern pattern,
+            String sourceTableName,
+            boolean newSchemaChange) {
         this.dorisOptions = dorisOptions;
         this.addDropDDLPattern = pattern == null ? 
Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
         String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
@@ -109,13 +117,14 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions,
-                                        Pattern pattern,
-                                        String sourceTableName,
-                                        boolean newSchemaChange,
-                                        DorisExecutionOptions 
executionOptions){
+            Pattern pattern,
+            String sourceTableName,
+            boolean newSchemaChange,
+            DorisExecutionOptions executionOptions) {
         this(dorisOptions, pattern, sourceTableName, newSchemaChange);
-        if(executionOptions != null){
-            this.lineDelimiter = 
executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT);
+        if (executionOptions != null) {
+            this.lineDelimiter = executionOptions.getStreamLoadProp()
+                    .getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
             this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
         }
     }
@@ -126,7 +135,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
         String op = extractJsonNode(recordRoot, "op");
         if (Objects.isNull(op)) {
-            //schema change ddl
+            // schema change ddl
             if (newSchemaChange) {
                 schemaChangeV2(recordRoot);
             } else {
@@ -160,20 +169,21 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
 
     /**
      * Change the update event into two
+     *
      * @param recordRoot
      * @return
      */
     private byte[] extractUpdate(JsonNode recordRoot) throws 
JsonProcessingException {
         StringBuilder updateRow = new StringBuilder();
-        if(!ignoreUpdateBefore){
-            //convert delete
+        if (!ignoreUpdateBefore) {
+            // convert delete
             Map<String, Object> beforeRow = extractBeforeRow(recordRoot);
             addDeleteSign(beforeRow, true);
             updateRow.append(objectMapper.writeValueAsString(beforeRow))
                     .append(this.lineDelimiter);
         }
 
-        //convert insert
+        // convert insert
         Map<String, Object> afterRow = extractAfterRow(recordRoot);
         addDeleteSign(afterRow, false);
         updateRow.append(objectMapper.writeValueAsString(afterRow));
@@ -207,14 +217,15 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     private boolean checkSchemaChange(DDLSchema ddlSchema) throws IOException, 
IllegalArgumentException {
-        String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, 
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
-        Map<String,Object> param = buildRequestParam(ddlSchema);
+        String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
+                RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), 
database, table);
+        Map<String, Object> param = buildRequestParam(ddlSchema);
         HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
         httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
         httpGet.setEntity(new 
StringEntity(objectMapper.writeValueAsString(param)));
         boolean success = handleResponse(httpGet);
         if (!success) {
-            LOG.warn("schema change can not do table {}.{}",database,table);
+            LOG.warn("schema change can not do table {}.{}", database, table);
         }
         return success;
     }
@@ -224,18 +235,18 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         JsonNode historyRecord = extractHistoryRecord(record);
         JsonNode tableChanges = historyRecord.get("tableChanges");
         String ddl = extractJsonNode(historyRecord, "ddl");
-        if(Objects.isNull(tableChanges) || Objects.isNull(ddl)){
+        if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) {
             return new ArrayList<>();
         }
         LOG.debug("received debezium ddl :{}", ddl);
         JsonNode tableChange = tableChanges.get(0);
-        Matcher matcher = addDropDDLPattern.matcher(ddl);
-        if (Objects.isNull(tableChange)|| 
!tableChange.get("type").asText().equals("ALTER") || !matcher.find()) {
+        if (Objects.isNull(tableChange) || 
!tableChange.get("type").asText().equals("ALTER")) {
             return null;
         }
 
         JsonNode columns = tableChange.get("table").get("columns");
         if (firstSchemaChange) {
+            sourceConnector = 
SourceConnector.valueOf(record.get("source").get("connector").asText().toUpperCase());
             fillOriginSchema(columns);
         }
         Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
@@ -243,25 +254,30 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             buildFieldSchema(updateFiledSchema, column);
         }
         SchemaChangeHelper.compareSchema(updateFiledSchema, 
originFieldSchemaMap);
+        // In order to avoid operations such as rename or change, which may 
lead to the accidental deletion of the doris column.
+        Matcher matcher = addDropDDLPattern.matcher(ddl);
+        if (!matcher.find()) {
+            return null;
+        }
         return 
SchemaChangeHelper.generateDDLSql(dorisOptions.getTableIdentifier());
     }
 
     @VisibleForTesting
     public boolean schemaChange(JsonNode recordRoot) {
         boolean status = false;
-        try{
-            if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && 
!checkTable(recordRoot)){
+        try {
+            if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && 
!checkTable(recordRoot)) {
                 return false;
             }
             String ddl = extractDDL(recordRoot);
-            if(StringUtils.isNullOrWhitespaceOnly(ddl)){
+            if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
                 LOG.info("ddl can not do schema change:{}", recordRoot);
                 return false;
             }
             boolean doSchemaChange = checkSchemaChange(ddl);
             status = doSchemaChange && execSchemaChange(ddl);
             LOG.info("schema change status:{}", status);
-        }catch (Exception ex){
+        } catch (Exception ex) {
             LOG.warn("schema change error :", ex);
         }
         return status;
@@ -278,17 +294,18 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     private void addDeleteSign(Map<String, Object> valueMap, boolean delete) {
-        if(delete){
+        if (delete) {
             valueMap.put(DORIS_DELETE_SIGN, "1");
-        }else{
+        } else {
             valueMap.put(DORIS_DELETE_SIGN, "0");
         }
     }
 
     private boolean checkSchemaChange(String ddl) throws IOException, 
IllegalArgumentException {
-        String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, 
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
-        Map<String,Object> param = buildRequestParam(ddl);
-        if(param.size() != 2){
+        String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API,
+                RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), 
database, table);
+        Map<String, Object> param = buildRequestParam(ddl);
+        if (param.size() != 2) {
             return false;
         }
         HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
@@ -296,7 +313,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         httpGet.setEntity(new 
StringEntity(objectMapper.writeValueAsString(param)));
         boolean success = handleResponse(httpGet);
         if (!success) {
-            LOG.warn("schema change can not do table {}.{}",database,table);
+            LOG.warn("schema change can not do table {}.{}", database, table);
         }
         return success;
     }
@@ -316,9 +333,9 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
      * }
      */
     protected Map<String, Object> buildRequestParam(String ddl) {
-        Map<String,Object> params = new HashMap<>();
+        Map<String, Object> params = new HashMap<>();
         Matcher matcher = addDropDDLPattern.matcher(ddl);
-        if(matcher.find()){
+        if (matcher.find()) {
             String op = matcher.group(1);
             String col = matcher.group(3);
             params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
@@ -330,7 +347,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     private boolean execSchemaChange(String ddl) throws IOException, 
IllegalArgumentException {
         Map<String, String> param = new HashMap<>();
         param.put("stmt", ddl);
-        String requestUrl = String.format(SCHEMA_CHANGE_API, 
RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database);
+        String requestUrl = String.format(SCHEMA_CHANGE_API,
+                RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), 
database);
         HttpPost httpPost = new HttpPost(requestUrl);
         httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
         httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
@@ -340,10 +358,10 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     protected String extractDatabase(JsonNode record) {
-        if(record.get("source").has("schema")){
-            //compatible with schema
+        if (record.get("source").has("schema")) {
+            // compatible with schema
             return extractJsonNode(record.get("source"), "schema");
-        }else{
+        } else {
             return extractJsonNode(record.get("source"), "db");
         }
     }
@@ -366,7 +384,7 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
                     LOG.error("schema change response:{}", loadResult);
                 }
             }
-        }catch(Exception e){
+        } catch (Exception e) {
             LOG.error("http request error,", e);
         }
         return false;
@@ -392,10 +410,10 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     private JsonNode extractHistoryRecord(JsonNode record) throws 
JsonProcessingException {
-        if(record.has("historyRecord")){
+        if (record.has("historyRecord")) {
             return objectMapper.readTree(record.get("historyRecord").asText());
         }
-        //The ddl passed by some scenes will not be included in the 
historyRecord, such as DebeziumSourceFunction
+        // The ddl passed by some scenes will not be included in the 
historyRecord, such as DebeziumSourceFunction
         return record;
     }
 
@@ -404,9 +422,9 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         String ddl = extractJsonNode(historyRecord, "ddl");
         LOG.debug("received debezium ddl :{}", ddl);
         if (!Objects.isNull(ddl)) {
-            //filter add/drop operation
+            // filter add/drop operation
             Matcher matcher = addDropDDLPattern.matcher(ddl);
-            if(matcher.find()){
+            if (matcher.find()) {
                 String op = matcher.group(1);
                 String col = matcher.group(3);
                 String type = matcher.group(5);
@@ -420,7 +438,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
     }
 
     private String authHeader() {
-        return "Basic " + new 
String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + 
dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
+        return "Basic " + new String(Base64.encodeBase64(
+                (dorisOptions.getUsername() + ":" + 
dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
     }
 
     @VisibleForTesting
@@ -429,16 +448,12 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
             for (JsonNode column : columns) {
                 String fieldName = column.get("name").asText();
                 if (originFieldSchemaMap.containsKey(fieldName)) {
-                    JsonNode length = column.get("length");
-                    JsonNode scale = column.get("scale");
-                    String type = 
MysqlType.toDorisType(column.get("typeName").asText(),
-                            length == null ? 0 : length.asInt(),
-                            scale == null ? 0 : scale.asInt());
+                    String dorisTypeName = buildDorisTypeName(column);
                     String defaultValue = 
handleDefaultValue(extractJsonNode(column, "defaultValueExpression"));
                     String comment = extractJsonNode(column, "comment");
                     FieldSchema fieldSchema = 
originFieldSchemaMap.get(fieldName);
                     fieldSchema.setName(fieldName);
-                    fieldSchema.setTypeString(type);
+                    fieldSchema.setTypeString(dorisTypeName);
                     fieldSchema.setComment(comment);
                     fieldSchema.setDefaultValue(defaultValue);
                 }
@@ -453,13 +468,36 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
 
     private void buildFieldSchema(Map<String, FieldSchema> filedSchemaMap, 
JsonNode column) {
         String fieldName = column.get("name").asText();
-        JsonNode length = column.get("length");
-        JsonNode scale = column.get("scale");
-        String type = MysqlType.toDorisType(column.get("typeName").asText(),
-                length == null ? 0 : length.asInt(), scale == null ? 0 : 
scale.asInt());
+        String dorisTypeName = buildDorisTypeName(column);
         String defaultValue = handleDefaultValue(extractJsonNode(column, 
"defaultValueExpression"));
         String comment = extractJsonNode(column, "comment");
-        filedSchemaMap.put(fieldName, new FieldSchema(fieldName, type, 
defaultValue, comment));
+        filedSchemaMap.put(fieldName, new FieldSchema(fieldName, 
dorisTypeName, defaultValue, comment));
+    }
+
+    @VisibleForTesting
+    public String buildDorisTypeName(JsonNode column) {
+        int length = column.get("length") == null ? 0 : 
column.get("length").asInt();
+        int scale = column.get("scale") == null ? 0 : 
column.get("scale").asInt();
+        String sourceTypeName = column.get("typeName").asText();
+        String dorisTypeName;
+        switch (sourceConnector) {
+            case MYSQL:
+                dorisTypeName = MysqlType.toDorisType(sourceTypeName, length, 
scale);
+                break;
+            case ORACLE:
+                dorisTypeName = OracleType.toDorisType(sourceTypeName, length, 
scale);
+                break;
+            case POSTGRES:
+                dorisTypeName = PostgresType.toDorisType(sourceTypeName, 
length, scale);
+                break;
+            case SQLSERVER:
+                dorisTypeName = SqlServerType.toDorisType(sourceTypeName, 
length, scale);
+                break;
+            default:
+                String errMsg = "Not support " + sourceTypeName + " schema 
change.";
+                throw new UnsupportedOperationException(errMsg);
+        }
+        return dorisTypeName;
     }
 
     private String handleDefaultValue(String defaultValue) {
@@ -493,6 +531,11 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         return originFieldSchemaMap;
     }
 
+    @VisibleForTesting
+    public void setSourceConnector(String sourceConnector) {
+        this.sourceConnector = 
SourceConnector.valueOf(sourceConnector.toUpperCase());
+    }
+
     public static JsonDebeziumSchemaSerializer.Builder builder() {
         return new JsonDebeziumSchemaSerializer.Builder();
     }
@@ -533,7 +576,8 @@ public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<Strin
         }
 
         public JsonDebeziumSchemaSerializer build() {
-            return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern, sourceTableName, newSchemaChange, executionOptions);
+            return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern, sourceTableName, newSchemaChange,
+                    executionOptions);
         }
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
index 302770c..dc8d83b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/SchemaChangeHelper.java
@@ -37,6 +37,8 @@ public class SchemaChangeHelper {
 
     public static void compareSchema(Map<String, FieldSchema> 
updateFiledSchemaMap,
             Map<String, FieldSchema> originFieldSchemaMap) {
+        dropFieldSchemas.clear();
+        addFieldSchemas.clear();
         for (Entry<String, FieldSchema> updateFieldSchema : 
updateFiledSchemaMap.entrySet()) {
             String columName = updateFieldSchema.getKey();
             if (!originFieldSchemaMap.containsKey(columName)) {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
new file mode 100644
index 0000000..a404dea
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/SourceConnector.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.tools.cdc;
+
+public enum SourceConnector {
+
+    MYSQL("mysql"),
+    ORACLE("oracle"),
+    POSTGRES("postgres"),
+    SQLSERVER("sqlserver");
+
+    public final String connectorName;
+
+    SourceConnector(String connectorName) {
+        this.connectorName = connectorName;
+    }
+
+    public String getConnectorName() {
+        return connectorName;
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
index 44816d0..c4bdbc0 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -67,9 +67,12 @@ public class TestJsonDebeziumSchemaSerializer {
 
     @Test
     public void testSerializeInsert() throws IOException {
-        //insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 
10:01:02','2022-01-01 10:01:03');
-        byte[] serializedValue = 
serializer.serialize("{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\
 [...]
-        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
+        // insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 
10:01:02','2022-01-01 10:01:03');
+        byte[] serializedValue = serializer.serialize(
+                
"{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"c\",\"ts_ms\":
 [...]
+        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8),
+                new TypeReference<Map<String, String>>() {
+                });
         Assert.assertEquals("1", valueMap.get("id"));
         Assert.assertEquals("doris", valueMap.get("name"));
         Assert.assertEquals("2022-01-01", valueMap.get("dt"));
@@ -82,9 +85,12 @@ public class TestJsonDebeziumSchemaSerializer {
 
     @Test
     public void testSerializeUpdate() throws IOException {
-        //update t1 set name='doris-update' WHERE id =1;
-        byte[] serializedValue = 
serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":n
 [...]
-        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
+        // update t1 set name='doris-update' WHERE id =1;
+        byte[] serializedValue = serializer.serialize(
+                
"{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\
 [...]
+        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8),
+                new TypeReference<Map<String, String>>() {
+                });
         Assert.assertEquals("1", valueMap.get("id"));
         Assert.assertEquals("doris-update", valueMap.get("name"));
         Assert.assertEquals("2022-01-01", valueMap.get("dt"));
@@ -97,12 +103,15 @@ public class TestJsonDebeziumSchemaSerializer {
     @Test
     public void testSerializeUpdateBefore() throws IOException {
         serializer = 
JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions)
-                
.setExecutionOptions(DorisExecutionOptions.builderDefaults().setIgnoreUpdateBefore(false).build()).build();
-        //update t1 set name='doris-update' WHERE id =1;
-        byte[] serializedValue = 
serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":n
 [...]
+                
.setExecutionOptions(DorisExecutionOptions.builderDefaults().setIgnoreUpdateBefore(false).build())
+                .build();
+        // update t1 set name='doris-update' WHERE id =1;
+        byte[] serializedValue = serializer.serialize(
+                
"{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\
 [...]
         String row = new String(serializedValue, StandardCharsets.UTF_8);
         String[] split = row.split("\n");
-        Map<String, String> valueMap = objectMapper.readValue(split[1], new 
TypeReference<Map<String, String>>(){});
+        Map<String, String> valueMap = objectMapper.readValue(split[1], new 
TypeReference<Map<String, String>>() {
+        });
         Assert.assertEquals("1", valueMap.get("id"));
         Assert.assertEquals("doris-update", valueMap.get("name"));
         Assert.assertEquals("2022-01-01", valueMap.get("dt"));
@@ -111,14 +120,18 @@ public class TestJsonDebeziumSchemaSerializer {
         Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
         Assert.assertEquals(6, valueMap.size());
 
-        Map<String, String> beforeMap = objectMapper.readValue(split[0], new 
TypeReference<Map<String, String>>(){});
+        Map<String, String> beforeMap = objectMapper.readValue(split[0], new 
TypeReference<Map<String, String>>() {
+        });
         Assert.assertEquals("doris", beforeMap.get("name"));
     }
 
     @Test
     public void testSerializeDelete() throws IOException {
-        byte[] serializedValue = 
serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\"
 [...]
-        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
+        byte[] serializedValue = serializer.serialize(
+                
"{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"d\",\"t
 [...]
+        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8),
+                new TypeReference<Map<String, String>>() {
+                });
         Assert.assertEquals("1", valueMap.get("id"));
         Assert.assertEquals("doris-update", valueMap.get("name"));
         Assert.assertEquals("2022-01-01", valueMap.get("dt"));
@@ -131,13 +144,15 @@ public class TestJsonDebeziumSchemaSerializer {
     @Test
     public void testExtractDDL() throws IOException {
         String srcDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(600)";
-        String record = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        String record
+                = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_
 [...]
         JsonNode recordRoot = objectMapper.readTree(record);
         String ddl = serializer.extractDDL(recordRoot);
         Assert.assertEquals(srcDDL, ddl);
 
         String targetDDL = "ALTER TABLE test.t1 add COLUMN c_1 varchar(65533)";
-        String record1 = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
 [...]
+        String record1
+                = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_
 [...]
         JsonNode recordRoot1 = objectMapper.readTree(record1);
         String ddl1 = serializer.extractDDL(recordRoot1);
         Assert.assertEquals(targetDDL, ddl1);
@@ -156,9 +171,10 @@ public class TestJsonDebeziumSchemaSerializer {
         String sql7 = "ALTER TABLE test.t1 DROP COLUMN test_time";
         String sql8 = "ALTER TABLE test.t1 DROP COLUMN c1";
         String sql9 = "ALTER TABLE test.t1 DROP COLUMN cc";
-        List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3, 
sql4,sql5,sql6,sql7,sql8,sql9);
+        List<String> srcSqlList = Arrays.asList(sql0, sql1, sql2, sql3, sql4, 
sql5, sql6, sql7, sql8, sql9);
 
-        String record = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        String record
+                = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691033764674,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23305,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23305,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_
 [...]
         JsonNode recordRoot = objectMapper.readTree(record);
         List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
         for (int i = 0; i < ddlSQLList.size(); i++) {
@@ -168,9 +184,55 @@ public class TestJsonDebeziumSchemaSerializer {
         }
     }
 
+    @Test
+    public void testExtractDDLListCreateTable() throws IOException {
+        String record
+                = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"oracle\",\"name\":\"oracle_logminer\",\"ts_ms\":1696945825065,\"snapshot\":\"true\",\"db\":\"HELOWIN\",\"sequence\":null,\"schema\":\"ADMIN\",\"table\":\"PERSONS\",\"txId\":null,\"scn\":\"1199617\",\"commit_scn\":null,\"lcr_position\":null,\"rs_id\":null,\"ssn\":0,\"redo_thread\":null},\"databaseName\":\"HELOWIN\",\"schemaName\":\"ADMIN\",\"ddl\":\"\\n
  CREATE TABLE \\\"ADMIN\\\".\\\"PERSONS\\\" \\n   (\\t\\\"ID\ [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testExtractDDLListTruncateTable() throws IOException {
+        String record
+                = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944601264,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5719,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5719,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testExtractDDLListDropTable() throws IOException {
+        String record
+                = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696944747956,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink11\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":5901,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":5901,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testExtractDDLListChangeColumn() throws IOException {
+        String record
+                = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945030603,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6521,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6521,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
+    @Test
+    public void testExtractDDLListModifyColumn() throws IOException {
+        String record
+                = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1696945306941,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000043\",\"pos\":6738,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000043\\\",\\\"pos\\\":6738,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
+        Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
+    }
+
     @Test
     public void testExtractDDLListRenameColumn() throws IOException {
-        String record = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        String record
+                = 
"{\"source\":{\"version\":\"1.9.7.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1691034519226,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000029\",\"pos\":23752,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000029\\\",\\\"pos\\\":23752,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_
 [...]
         JsonNode recordRoot = objectMapper.readTree(record);
         List<String> ddlSQLList = serializer.extractDDLList(recordRoot);
         Assert.assertTrue(CollectionUtils.isEmpty(ddlSQLList));
@@ -184,7 +246,9 @@ public class TestJsonDebeziumSchemaSerializer {
         srcFiledSchemaMap.put("test_time", new FieldSchema("test_time", 
"DATETIMEV2(0)", null, null));
         srcFiledSchemaMap.put("c1", new FieldSchema("c1", "INT", "'100'", 
null));
 
-        String columnsString = 
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"co
 [...]
+        serializer.setSourceConnector("mysql");
+        String columnsString
+                = 
"[{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":false,\"enumValues\":[]},{\"name\":\"name\",\"jdbcType\":12,\"typeName\":\"VARCHAR\",\"typeExpression\":\"VARCHAR\",\"charsetName\":\"utf8mb4\",\"length\":50,\"position\":2,\"optional\":true,\"autoIncremented\":false,\"generated\":false,\"comment\":null,
 [...]
         JsonNode columns = objectMapper.readTree(columnsString);
         serializer.fillOriginSchema(columns);
         Map<String, FieldSchema> originFieldSchemaMap = 
serializer.getOriginFieldSchemaMap();
@@ -202,11 +266,32 @@ public class TestJsonDebeziumSchemaSerializer {
         }
     }
 
+    @Test
+    public void testBuildMysql2DorisTypeName() throws IOException {
+        String columnInfo
+                = 
"{\"name\":\"id\",\"jdbcType\":4,\"typeName\":\"INT\",\"typeExpression\":\"INT\",\"charsetName\":null,\"position\":1,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null,\"hasDefaultValue\":true,\"defaultValueExpression\":\"10\",\"enumValues\":[]}";
+        serializer.setSourceConnector("mysql");
+        JsonNode columns = objectMapper.readTree(columnInfo);
+        String dorisTypeName = serializer.buildDorisTypeName(columns);
+        Assert.assertEquals(dorisTypeName, "INT");
+    }
+
+    @Test
+    public void testBuildOracle2DorisTypeName() throws IOException {
+        String columnInfo
+                = 
"{\"name\":\"NAME\",\"jdbcType\":12,\"nativeType\":null,\"typeName\":\"VARCHAR2\",\"typeExpression\":\"VARCHAR2\",\"charsetName\":null,\"length\":128,\"scale\":null,\"position\":2,\"optional\":false,\"autoIncremented\":false,\"generated\":false,\"comment\":null}";
+        serializer.setSourceConnector("oracle");
+        JsonNode columns = objectMapper.readTree(columnInfo);
+        String dorisTypeName = serializer.buildDorisTypeName(columns);
+        Assert.assertEquals(dorisTypeName, "VARCHAR(384)");
+    }
+
     @Ignore
     @Test
     public void testSerializeAddColumn() throws IOException, DorisException {
         // alter table t1 add  column  c_1 varchar(200)
-        String record = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        String record
+                = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_
 [...]
         JsonNode recordRoot = objectMapper.readTree(record);
         boolean flag = serializer.schemaChange(recordRoot);
         Assert.assertEquals(true, flag);
@@ -220,8 +305,9 @@ public class TestJsonDebeziumSchemaSerializer {
     @Ignore
     @Test
     public void testSerializeDropColumn() throws IOException, DorisException {
-        //alter table  t1 drop  column  c_1;
-        String ddl = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"
 [...]
+        // alter table  t1 drop  column  c_1;
+        String ddl
+                = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"ts_
 [...]
         JsonNode recordRoot = objectMapper.readTree(ddl);
         boolean flag = serializer.schemaChange(recordRoot);
         Assert.assertEquals(true, flag);
@@ -230,13 +316,13 @@ public class TestJsonDebeziumSchemaSerializer {
         Assert.assertNull(targetField);
     }
 
-    private static Field getField(String column) throws DorisException{
-        //get table schema
+    private static Field getField(String column) throws DorisException {
+        // get table schema
         Schema schema = RestService.getSchema(dorisOptions, 
DorisReadOptions.builder().build(), LOG);
         List<Field> properties = schema.getProperties();
         Field targetField = null;
-        for(Field field : properties){
-            if(column.equals(field.getName())){
+        for (Field field : properties) {
+            if (column.equals(field.getName())) {
                 targetField = field;
                 break;
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to