This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch schemachange-1.14
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/schemachange-1.14 by this push:
     new b407905  [Feature] Support Light Schema change for flink1.14 (#64)
b407905 is described below

commit b40790537b0fe89d2c4c0aac75ff7476c0345ee2
Author: wudi <676366...@qq.com>
AuthorDate: Wed Sep 28 08:59:55 2022 +0800

    [Feature] Support Light Schema change for flink1.14 (#64)
    
     Support Light Schema change for flink1.14
---
 flink-doris-connector/pom.xml                      |  45 ++--
 .../doris/flink/cfg/DorisExecutionOptions.java     |   2 +-
 .../org/apache/doris/flink/rest/RestService.java   |   2 +-
 .../apache/doris/flink/sink/HttpGetWithEntity.java |  36 +++
 .../apache/doris/flink/sink/HttpPutBuilder.java    |  12 +-
 .../doris/flink/sink/committer/DorisCommitter.java |   1 +
 .../doris/flink/sink/writer/DorisStreamLoad.java   |   3 +
 .../doris/flink/sink/writer/DorisWriter.java       |   7 +-
 .../sink/writer/JsonDebeziumSchemaSerializer.java  | 264 +++++++++++++++++++++
 .../flink/table/DorisDynamicTableFactory.java      |   2 +-
 .../apache/doris/flink/CDCSchemaChangeExample.java |  87 +++++++
 .../org/apache/doris/flink/DorisSinkExample.java   |  12 +-
 .../apache/doris/flink/DorisSinkSQLExample.java    |   7 +-
 .../apache/doris/flink/DorisSourceSinkExample.java |  40 ++--
 .../writer/TestJsonDebeziumSchemaSerializer.java   | 151 ++++++++++++
 .../doris/flink/source/DorisSourceExampleTest.java |   1 -
 .../flink/source/reader/DorisSourceReaderTest.java |   2 +
 .../doris/flink/utils/DateToStringConverter.java   | 147 ++++++++++++
 18 files changed, 759 insertions(+), 62 deletions(-)

diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 6206b4a..bde8d37 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -62,9 +62,9 @@ under the License.
         </mailingList>
     </mailingLists>
     <properties>
-        <scala.version>${env.scala.version}</scala.version>
-        <flink.version>${env.flink.version}</flink.version>
-        <flink.minor.version>${env.flink.minor.version}</flink.minor.version>
+        <scala.version>2.12</scala.version>
+        <flink.version>1.14.4</flink.version>
+        <flink.minor.version>1.14</flink.minor.version>
         <libthrift.version>0.13.0</libthrift.version>
         <arrow.version>5.0.0</arrow.version>
         <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
@@ -251,33 +251,20 @@ under the License.
             <version>2.13.3</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-web</artifactId>
-            <version>${log4j2.version}</version>
-        </dependency>
-        <!-- 
https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-api</artifactId>
-            <version>${log4j2.version}</version>
-        </dependency>
-        <!-- 
https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-core</artifactId>
-            <version>${log4j2.version}</version>
-        </dependency>
-        <!-- 
https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl -->
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-slf4j-impl</artifactId>
-            <version>${log4j2.version}</version>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.25</version>
         </dependency>
-        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.9</version>
+            <version>1.7.25</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
         </dependency>
         <!--Test-->
         <dependency>
@@ -310,6 +297,12 @@ under the License.
             </exclusions>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>flink-connector-mysql-cdc</artifactId>
+            <version>2.2.1</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 2daf5e1..102a7ee 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -73,7 +73,7 @@ public class DorisExecutionOptions implements Serializable {
     public static DorisExecutionOptions defaults() {
         Properties properties = new Properties();
         properties.setProperty("format", "json");
-        properties.setProperty("strip_outer_array", "true");
+        properties.setProperty("read_json_by_line", "true");
         return new Builder().setStreamLoadProp(properties).build();
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index 734bfdb..5732dc8 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -442,7 +442,7 @@ public class RestService implements Serializable {
     public static boolean isUniqueKeyType(DorisOptions options, 
DorisReadOptions readOptions, Logger logger)
             throws DorisRuntimeException {
         try {
-            return "UNIQUE_KEYS_TYPE".equals(getSchema(options, readOptions, 
logger).getKeysType());
+            return UNIQUE_KEYS_TYPE.equals(getSchema(options, readOptions, 
logger).getKeysType());
         } catch (Exception e) {
             throw new DorisRuntimeException(e);
         }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpGetWithEntity.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpGetWithEntity.java
new file mode 100644
index 0000000..28b4a65
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpGetWithEntity.java
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink;
+
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+
+import java.net.URI;
+
+public class HttpGetWithEntity extends HttpEntityEnclosingRequestBase {
+    private final static String METHOD_NAME = "GET";
+
+    @Override
+    public String getMethod() {
+        return METHOD_NAME;
+    }
+
+    public HttpGetWithEntity(final String uri) {
+        super();
+        setURI(URI.create(uri));
+    }
+}
\ No newline at end of file
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
index 6288645..b59f2c1 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java
@@ -17,13 +17,14 @@
 
 package org.apache.doris.flink.sink;
 
-import org.apache.flink.util.Preconditions;
-
 import org.apache.commons.codec.binary.Base64;
+import org.apache.doris.flink.sink.writer.LoadConstants;
+import org.apache.flink.util.Preconditions;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.entity.StringEntity;
+
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
@@ -50,6 +51,13 @@ public class HttpPutBuilder {
         return this;
     }
 
+    public HttpPutBuilder addHiddenColumns(boolean add) {
+        if(add){
+            header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN);
+        }
+        return this;
+    }
+
     public HttpPutBuilder enable2PC() {
         header.put("two_phase_commit", "true");
         return this;
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
index dfcc0bc..d62e8b4 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java
@@ -90,6 +90,7 @@ public class DorisCommitter implements 
Committer<DorisCommittable> {
             try {
                 response = httpClient.execute(putBuilder.build());
             } catch (IOException e) {
+                LOG.error("commit transaction failed: ", e);
                 hostPort = RestService.getBackend(dorisOptions, 
dorisReadOptions, LOG);
                 continue;
             }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index dfb9cb7..bd29d34 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -75,6 +75,7 @@ public class DorisStreamLoad implements Serializable {
     private final String db;
     private final String table;
     private final boolean enable2PC;
+    private final boolean enableDelete;
     private final Properties streamLoadProp;
     private final RecordStream recordStream;
     private Future<CloseableHttpResponse> pendingLoadFuture;
@@ -98,6 +99,7 @@ public class DorisStreamLoad implements Serializable {
         this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
         this.enable2PC = executionOptions.enabled2PC();
         this.streamLoadProp = executionOptions.getStreamLoadProp();
+        this.enableDelete = executionOptions.getDeletable();
         this.httpClient = httpClient;
         this.executorService = new ThreadPoolExecutor(1, 1,
                 0L, TimeUnit.MILLISECONDS,
@@ -233,6 +235,7 @@ public class DorisStreamLoad implements Serializable {
             putBuilder.setUrl(loadUrlStr)
                     .baseAuth(user, passwd)
                     .addCommonHeader()
+                    .addHiddenColumns(enableDelete)
                     .setLabel(label)
                     .setEntity(entity)
                     .addProperties(streamLoadProp);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 86ed9ed..aa11df5 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -123,7 +124,11 @@ public class DorisWriter<IN> implements SinkWriter<IN, 
DorisCommittable, DorisWr
     @Override
     public void write(IN in, Context context) throws IOException {
         checkLoadException();
-        dorisStreamLoad.writeRecord(serializer.serialize(in));
+        byte[] serialize = serializer.serialize(in);
+        if(Objects.isNull(serialize)){
+            return;
+        }
+        dorisStreamLoad.writeRecord(serialize);
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
new file mode 100644
index 0000000..c3fe987
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.HttpGetWithEntity;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN;
+
+public class JsonDebeziumSchemaSerializer implements 
DorisRecordSerializer<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
+    private static final String CHECK_SCHEMA_CHANGE_API = 
"http://%s/api/enable_light_schema_change/%s/%s";;
+    private static final String SCHEMA_CHANGE_API = 
"http://%s/api/query/default_cluster/%s";;
+    private static final String OP_READ = "r"; // snapshot read
+    private static final String OP_CREATE = "c"; // insert
+    private static final String OP_UPDATE = "u"; // update
+    private static final String OP_DELETE = "d"; // delete
+
+    private static final String addDropDDLRegex = 
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+COLUMN\\s+([^\\s]+).*";
+    private final Pattern addDropDDLPattern;
+    private DorisOptions dorisOptions;
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern 
pattern) {
+        this.dorisOptions = dorisOptions;
+        this.addDropDDLPattern = pattern == null ? 
Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
+    }
+
+    @Override
+    public byte[] serialize(String record) throws IOException {
+        LOG.debug("received debezium json data {} :", record);
+        JsonNode recordRoot = objectMapper.readTree(record);
+        String op = extractJsonNode(recordRoot, "op");
+        if (Objects.isNull(op)) {
+            //schema change ddl
+            schemaChange(recordRoot);
+            return null;
+        }
+        Map<String, String> valueMap;
+        if (OP_READ.equals(op) || OP_CREATE.equals(op)) {
+            valueMap = extractAfterRow(recordRoot);
+            addDeleteSign(valueMap,false);
+        } else if (OP_UPDATE.equals(op)) {
+            valueMap = extractAfterRow(recordRoot);
+            addDeleteSign(valueMap,false);
+        } else if (OP_DELETE.equals(op)) {
+            valueMap = extractBeforeRow(recordRoot);
+            addDeleteSign(valueMap,true);
+        } else {
+            LOG.error("parse record fail, unknown op {} in {}",op,record);
+            return null;
+        }
+        return 
objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @VisibleForTesting
+    public boolean schemaChange(JsonNode recordRoot) {
+        boolean status = false;
+        try{
+            boolean doSchemaChange = checkSchemaChange(recordRoot);
+            status = doSchemaChange && execSchemaChange(recordRoot);
+            LOG.info("schema change status:{}", status);
+        }catch (Exception ex){
+            LOG.warn("schema change error :", ex);
+        }
+        return status;
+    }
+
+    private void addDeleteSign(Map<String, String> valueMap, boolean delete) {
+        if(delete){
+            valueMap.put(DORIS_DELETE_SIGN, "1");
+        }else{
+            valueMap.put(DORIS_DELETE_SIGN, "0");
+        }
+    }
+
+    private boolean checkSchemaChange(JsonNode record) throws IOException {
+        String database = extractDatabase(record);
+        String table = extractTable(record);
+        String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, 
dorisOptions.getFenodes(), database, table);
+        Map<String,Object> param = buildRequestParam(record);
+        if(param.size() != 2){
+            return false;
+        }
+        HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
+        httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+        httpGet.setEntity(new 
StringEntity(objectMapper.writeValueAsString(param)));
+        boolean success = handleResponse(httpGet);
+        if (!success) {
+            LOG.warn("schema change can not do table {}.{}",database,table);
+        }
+        return success;
+    }
+
+    /**
+     * Build param
+     * {
+     * "isDropColumn": true,
+     * "columnName" : "column"
+     * }
+     */
+    private Map<String, Object> buildRequestParam(JsonNode record) throws 
JsonProcessingException {
+        Map<String,Object> params = new HashMap<>();
+        String ddl = extractDDL(record);
+        if(ddl == null){
+            return params;
+        }
+        Matcher matcher = addDropDDLPattern.matcher(ddl);
+        if(matcher.find()){
+            String op = matcher.group(1);
+            String col = matcher.group(2);
+            params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
+            params.put("columnName", col);
+        }
+        return params;
+    }
+
+    private boolean execSchemaChange(JsonNode record) throws IOException {
+        String extractDDL = extractDDL(record);
+        Map<String, String> param = new HashMap<>();
+        param.put("stmt", extractDDL);
+        String database = extractDatabase(record);
+        String requestUrl = String.format(SCHEMA_CHANGE_API, 
dorisOptions.getFenodes(), database);
+        HttpPost httpPost = new HttpPost(requestUrl);
+        httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+        httpPost.setEntity(new 
StringEntity(objectMapper.writeValueAsString(param)));
+        boolean success = handleResponse(httpPost);
+        return success;
+    }
+
+    private String extractDatabase(JsonNode record) {
+        return extractJsonNode(record.get("source"), "db");
+    }
+
+    private String extractTable(JsonNode record) {
+        return extractJsonNode(record.get("source"), "table");
+    }
+
+    private boolean handleResponse(HttpUriRequest request) throws IOException {
+        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+            CloseableHttpResponse response = httpclient.execute(request);
+            final int statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode == 200 && response.getEntity() != null) {
+                String loadResult = EntityUtils.toString(response.getEntity());
+                Map<String, Object> responseMap = 
objectMapper.readValue(loadResult, Map.class);
+                String code = responseMap.getOrDefault("code", 
"-1").toString();
+                if (code.equals("0")) {
+                    return true;
+                } else {
+                    LOG.error("schema change response:{}", loadResult);
+                }
+            }
+        }catch(Exception e){
+            LOG.error("http request error,", e);
+        }
+        return false;
+    }
+
+    private String extractJsonNode(JsonNode record, String key) {
+        return record != null && record.get(key) != null ? 
record.get(key).asText() : null;
+    }
+
+    private Map<String, String> extractBeforeRow(JsonNode record) {
+        return extractRow(record.get("before"));
+    }
+
+    private Map<String, String> extractAfterRow(JsonNode record) {
+        return extractRow(record.get("after"));
+    }
+
+    private Map<String, String> extractRow(JsonNode recordRow) {
+        Map<String, String> recordMap = objectMapper.convertValue(recordRow, 
new TypeReference<Map<String, String>>() {
+        });
+        return recordMap != null ? recordMap : new HashMap<>();
+    }
+
+    @VisibleForTesting
+    public String extractDDL(JsonNode record) throws JsonProcessingException {
+        String historyRecord = extractJsonNode(record, "historyRecord");
+        if (Objects.isNull(historyRecord)) {
+            return null;
+        }
+        String ddl = extractJsonNode(objectMapper.readTree(historyRecord), 
"ddl");
+        if (!Objects.isNull(ddl)) {
+            //filter add/drop operation
+            if (addDropDDLPattern.matcher(ddl).matches()) {
+                return ddl;
+            }
+        }
+        LOG.info("parse ddl:{}", ddl);
+        return null;
+    }
+
+    private String authHeader() {
+        return "Basic " + new 
String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + 
dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
+    }
+
+    public static JsonDebeziumSchemaSerializer.Builder builder() {
+        return new JsonDebeziumSchemaSerializer.Builder();
+    }
+
+    /**
+     * Builder for JsonDebeziumSchemaSerializer.
+     */
+    public static class Builder {
+        private DorisOptions dorisOptions;
+        private Pattern addDropDDLPattern;
+
+        public JsonDebeziumSchemaSerializer.Builder 
setDorisOptions(DorisOptions dorisOptions) {
+            this.dorisOptions = dorisOptions;
+            return this;
+        }
+
+        public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern 
addDropDDLPattern) {
+            this.addDropDDLPattern = addDropDDLPattern;
+            return this;
+        }
+
+        public JsonDebeziumSchemaSerializer build() {
+            return new JsonDebeziumSchemaSerializer(dorisOptions, 
addDropDDLPattern);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index be00cff..fb44359 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -147,7 +147,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
     private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions
             .key("sink.label-prefix")
             .stringType()
-            .noDefaultValue()
+            .defaultValue("")
             .withDescription("the unique label prefix.");
     private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = 
ConfigOptions
             .key("sink.batch.interval")
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
new file mode 100644
index 0000000..bdc0584
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.utils.DateToStringConverter;
+import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+public class CDCSchemaChangeExample {
+
+    public static void main(String[] args) throws Exception {
+
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
+        JsonDebeziumDeserializationSchema schema =
+                new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs);
+
+        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
+                .hostname("127.0.0.1")
+                .port(3306)
+                .databaseList("test") // set captured database
+                .tableList("test.t1") // set captured table
+                .username("root")
+                .password("123456")
+                .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
+                .deserializer(schema)
+                .serverTimeZone("Asia/Shanghai")
+                .includeSchemaChanges(true) // converts SourceRecord to JSON 
String
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        // enable checkpoint
+        env.enableCheckpointing(10000);
+//
+        Properties props = new Properties();
+        props.setProperty("format", "json");
+        props.setProperty("read_json_by_line", "true");
+        DorisOptions dorisOptions = DorisOptions.builder()
+                .setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("test.t1")
+                .setUsername("root")
+                .setPassword("").build();
+//
+        DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
+        executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID())
+                .setStreamLoadProp(props).setDeletable(true);
+
+        DorisSink.Builder<String> builder = DorisSink.builder();
+        builder.setDorisReadOptions(DorisReadOptions.builder().build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setDorisOptions(dorisOptions)
+                
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
+
+        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL 
Source")//.print();
+                .sinkTo(builder.build());
+
+        env.execute("Print MySQL Snapshot + Binlog");
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
index 9c459b7..daac8bb 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.UUID;
 
 
 public class DorisSinkExample {
@@ -43,6 +44,7 @@ public class DorisSinkExample {
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.enableCheckpointing(10000);
+        env.setParallelism(1);
         
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
Time.milliseconds(30000)));
         DorisSink.Builder<String> builder = DorisSink.builder();
@@ -61,12 +63,12 @@ public class DorisSinkExample {
         properties.setProperty("line_delimiter", "\n");
         properties.setProperty("format", "csv");
         DorisOptions.Builder dorisBuilder = DorisOptions.builder();
-        dorisBuilder.setFenodes("127.0.0.1:8040")
-                .setTableIdentifier("db.table")
-                .setUsername("test")
-                .setPassword("test");
+        dorisBuilder.setFenodes("127.0.0.1:8131")
+                .setTableIdentifier("test.tbl")
+                .setUsername("root")
+                .setPassword("");
         DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
-        executionBuilder.setLabelPrefix("label-doris")
+        executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID())
                 .setStreamLoadProp(properties)
                 .setBufferSize(8*1024)
                 .setBufferCount(3);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
index 87da571..91ce32b 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
@@ -47,15 +47,16 @@ public class DorisSinkSQLExample {
                         ") " +
                         "WITH (\n" +
                         "  'connector' = 'doris',\n" +
-                        "  'fenodes' = 'FE_IP:8030',\n" +
-                        "  'table.identifier' = 'db.table',\n" +
+                        "  'fenodes' = '127.0.0.1:8030',\n" +
+                        "  'table.identifier' = 'test.tbl',\n" +
                         "  'username' = 'root',\n" +
                         "  'password' = '',\n" +
                         "  'sink.properties.format' = 'json',\n" +
                         "  'sink.buffer-count' = '4',\n" +
                         "  'sink.buffer-size' = '4086'," +
                         "  'sink.label-prefix' = 'doris_label',\n" +
-                        "  'sink.properties.read_json_by_line' = 'true'\n" +
+                        "  'sink.properties.read_json_by_line' = 'true',\n" +
+                        "  'sink.properties.function_column.sequence_col' = 
'age'\n" +
                         ")");
         tEnv.executeSql("INSERT INTO doris_test_sink select name,age from 
doris_test");
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
index 60524c8..1478885 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
@@ -17,48 +17,46 @@
 
 package org.apache.doris.flink;
 
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
 public class DorisSourceSinkExample {
 
     public static void main(String[] args) {
-        EnvironmentSettings settings = EnvironmentSettings.newInstance()
-                .useBlinkPlanner()
-                .inStreamingMode()
-                .build();
-        TableEnvironment tEnv = TableEnvironment.create(settings);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
         tEnv.executeSql(
                 "CREATE TABLE doris_test (" +
+                        "id INT," +
                         "name STRING," +
-                        "age INT," +
-                        "price DECIMAL(5,2)," +
-                        "sale DOUBLE" +
+                        "PRIMARY KEY (id) NOT ENFORCED" +
                         ") " +
                         "WITH (\n" +
-                        "  'connector' = 'doris',\n" +
-                        "  'fenodes' = 'FE_IP:8030',\n" +
-                        "  'table.identifier' = 'db.table',\n" +
+                        "  'connector' = 'mysql-cdc',\n" +
+                        "  'hostname' = '127.0.0.1',\n" +
+                        "  'port' = '3306',\n" +
                         "  'username' = 'root',\n" +
-                        "  'password' = ''" +
+                        "  'password' = '123456'," +
+                        "  'database-name' = 'test', " +
+                        "  'table-name' = 'test'" +
                         ")");
         tEnv.executeSql(
                 "CREATE TABLE doris_test_sink (" +
-                        "name STRING," +
-                        "age INT," +
-                        "price DECIMAL(5,2)," +
-                        "sale DOUBLE" +
+                        "id INT," +
+                        "name STRING" +
                         ") " +
                         "WITH (\n" +
                         "  'connector' = 'doris',\n" +
-                        "  'fenodes' = 'FE_IP:8030',\n" +
-                        "  'table.identifier' = 'db.table',\n" +
+                        "  'fenodes' = '127.0.0.1:8030',\n" +
+                        "  'table.identifier' = 'test.tbk',\n" +
                         "  'username' = 'root',\n" +
                         "  'password' = '',\n" +
                         "  'sink.properties.format' = 'csv',\n" +
                         "  'sink.label-prefix' = 'doris_csv_table'\n" +
                         ")");
 
-        tEnv.executeSql("INSERT INTO doris_test_sink select 
name,age,price,sale from doris_test");
+        tEnv.executeSql("INSERT INTO doris_test_sink select id,name from 
doris_test");
     }
 }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
new file mode 100644
index 0000000..0d35ae3
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Field;
+import org.apache.doris.flink.rest.models.Schema;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * test for JsonDebeziumSchemaSerializer.
+ */
+public class TestJsonDebeziumSchemaSerializer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestJsonDebeziumSchemaSerializer.class);
+    static DorisOptions dorisOptions;
+    static JsonDebeziumSchemaSerializer serializer;
+    static ObjectMapper objectMapper = new ObjectMapper();
+
+    @BeforeClass
+    public static void setUp() {
+        dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("test.t1")
+                .setUsername("root")
+                .setPassword("").build();
+        serializer = 
JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build();
+    }
+
+    @Test
+    public void testSerializeInsert() throws IOException {
+        //insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 
10:01:02','2022-01-01 10:01:03');
+        byte[] serializedValue = 
serializer.serialize("{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\
 [...]
+        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
+        Assert.assertEquals("1", valueMap.get("id"));
+        Assert.assertEquals("doris", valueMap.get("name"));
+        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+        Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+        Assert.assertEquals(6, valueMap.size());
+
+    }
+
+    @Test
+    public void testSerializeUpdate() throws IOException {
+        //update t1 set name='doris-update' WHERE id =1;
+        byte[] serializedValue = 
serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":n
 [...]
+        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
+        Assert.assertEquals("1", valueMap.get("id"));
+        Assert.assertEquals("doris-update", valueMap.get("name"));
+        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+        Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__"));
+        Assert.assertEquals(6, valueMap.size());
+    }
+
+    @Test
+    public void testSerializeDelete() throws IOException {
+        byte[] serializedValue = 
serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01
 10:01:02\",\"ts\":\"2022-01-01 
10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\"
 [...]
+        Map<String, String> valueMap = objectMapper.readValue(new 
String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, 
String>>(){});
+        Assert.assertEquals("1", valueMap.get("id"));
+        Assert.assertEquals("doris-update", valueMap.get("name"));
+        Assert.assertEquals("2022-01-01", valueMap.get("dt"));
+        Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime"));
+        Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts"));
+        Assert.assertEquals("1", valueMap.get("__DORIS_DELETE_SIGN__"));
+        Assert.assertEquals(6, valueMap.size());
+    }
+
+    @Test
+    public void testExtractDDL() throws IOException {
+        String srcDDL = "alter table t1 add \n column  c_1 varchar(200)";
+        String record = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        String ddl = serializer.extractDDL(recordRoot);
+        Assert.assertEquals(srcDDL, ddl);
+    }
+
+    @Ignore
+    @Test
+    public void testSerializeAddColumn() throws IOException, DorisException {
+        // alter table t1 add  column  c_1 varchar(200)
+        String record = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\
 [...]
+        JsonNode recordRoot = objectMapper.readTree(record);
+        boolean flag = serializer.schemaChange(recordRoot);
+        Assert.assertEquals(true, flag);
+
+        Field targetField = getField("c_1");
+        Assert.assertNotNull(targetField);
+        Assert.assertEquals("c_1", targetField.getName());
+        Assert.assertEquals("VARCHAR", targetField.getType());
+    }
+
+    @Ignore
+    @Test
+    public void testSerializeDropColumn() throws IOException, DorisException {
+        //alter table  t1 drop  column  c_1;
+        String ddl = 
"{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\"
 [...]
+        JsonNode recordRoot = objectMapper.readTree(ddl);
+        boolean flag = serializer.schemaChange(recordRoot);
+        Assert.assertEquals(true, flag);
+
+        Field targetField = getField("c_1");
+        Assert.assertNull(targetField);
+    }
+
+    private static Field getField(String column) throws DorisException{
+        //get table schema
+        Schema schema = RestService.getSchema(dorisOptions, 
DorisReadOptions.builder().build(), LOG);
+        List<Field> properties = schema.getProperties();
+        Field targetField = null;
+        for(Field field : properties){
+            if(column.equals(field.getName())){
+                targetField = field;
+                break;
+            }
+        }
+        return targetField;
+    }
+}
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
index d85e70d..c8033d6 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
@@ -40,7 +40,6 @@ public class DorisSourceExampleTest {
                 .setDeserializer(new SimpleListDeserializationSchema())
                 .build();
 
-
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
         env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
Source")
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
index a44b96d..4ab44bf 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.source.reader;
 import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
 import org.apache.doris.flink.sink.OptionUtils;
 import org.apache.doris.flink.source.split.DorisSourceSplit;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -29,6 +30,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Unit tests for the {@link DorisSourceReader}.
  */
+@Ignore
 public class DorisSourceReaderTest {
 
     private static DorisSourceReader createReader(TestingReaderContext 
context) {
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
new file mode 100644
index 0000000..9d73f53
--- /dev/null
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.utils;
+
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.time.DateTimeException;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class DateToStringConverter implements CustomConverter<SchemaBuilder, 
RelationalColumn> {
+    private static final Logger log = 
LoggerFactory.getLogger(DateToStringConverter.class);
+    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
+    private DateTimeFormatter datetimeFormatter = 
DateTimeFormatter.ISO_DATE_TIME;
+    private DateTimeFormatter timestampFormatter = 
DateTimeFormatter.ISO_DATE_TIME;
+    private ZoneId timestampZoneId = ZoneId.systemDefault();
+
+    public static Properties DEFAULT_PROPS = new Properties();
+
+    static {
+        DEFAULT_PROPS.setProperty("converters", "date");
+        DEFAULT_PROPS.setProperty("date.type", 
"org.apache.doris.flink.utils.DateToStringConverter");
+        DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd");
+        DEFAULT_PROPS.setProperty("date.format.datetime", "yyyy-MM-dd 
HH:mm:ss");
+        DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd 
HH:mm:ss");
+        DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC");
+    }
+
+    @Override
+    public void configure(Properties props) {
+        readProps(props, "format.date", p -> dateFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(props, "format.time", p -> timeFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(props, "format.datetime", p -> datetimeFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(props, "format.timestamp", p -> timestampFormatter = 
DateTimeFormatter.ofPattern(p));
+        readProps(props, "format.timestamp.zone", z -> timestampZoneId = 
ZoneId.of(z));
+    }
+
+    private void readProps(Properties properties, String settingKey, 
Consumer<String> callback) {
+        String settingValue = (String) properties.get(settingKey);
+        if (settingValue == null || settingValue.length() == 0) {
+            return;
+        }
+        try {
+            callback.accept(settingValue.trim());
+        } catch (IllegalArgumentException | DateTimeException e) {
+            log.error("setting {} is illegal:{}", settingKey, settingValue);
+            throw e;
+        }
+    }
+
+    @Override
+    public void converterFor(RelationalColumn column, 
ConverterRegistration<SchemaBuilder> registration) {
+        String sqlType = column.typeName().toUpperCase();
+        SchemaBuilder schemaBuilder = null;
+        Converter converter = null;
+        if ("DATE".equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertDate;
+        }
+        if ("TIME".equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertTime;
+        }
+        if ("DATETIME".equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertDateTime;
+        }
+        if ("TIMESTAMP".equals(sqlType)) {
+            schemaBuilder = SchemaBuilder.string().optional();
+            converter = this::convertTimestamp;
+        }
+        if (schemaBuilder != null) {
+            registration.register(schemaBuilder, converter);
+        }
+    }
+
+    private String convertDate(Object input) {
+        if (input instanceof LocalDate) {
+            return dateFormatter.format((LocalDate) input);
+        } else if (input instanceof Integer) {
+            LocalDate date = LocalDate.ofEpochDay((Integer) input);
+            return dateFormatter.format(date);
+        }
+        return null;
+    }
+
+    private String convertTime(Object input) {
+        if (input instanceof Duration) {
+            Duration duration = (Duration) input;
+            long seconds = duration.getSeconds();
+            int nano = duration.getNano();
+            LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
+            return timeFormatter.format(time);
+        }
+        return null;
+    }
+
+    private String convertDateTime(Object input) {
+        if (input instanceof LocalDateTime) {
+            return datetimeFormatter.format((LocalDateTime) input);
+        } else if (input instanceof Timestamp) {
+            return datetimeFormatter.format(((Timestamp) 
input).toLocalDateTime());
+        }
+        return null;
+    }
+
+    private String convertTimestamp(Object input) {
+        if (input instanceof ZonedDateTime) {
+            // mysql timestamp will be converted to UTC storage,and the 
zonedDatetime here is UTC time
+            ZonedDateTime zonedDateTime = (ZonedDateTime) input;
+            LocalDateTime localDateTime = 
zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
+            return timestampFormatter.format(localDateTime);
+        } else if (input instanceof Timestamp) {
+            return timestampFormatter.format(((Timestamp) 
input).toLocalDateTime());
+        }
+        return null;
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to