This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 698e9cd243 [fix](demo)fix cdc failed to synchronize datetime type in 
mysql, and added JsonDebeziumSchemaSerializer (#16971)
698e9cd243 is described below

commit 698e9cd243092b4b47a818fa0740998cfc6fef7f
Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com>
AuthorDate: Thu Mar 2 14:14:58 2023 +0800

    [fix](demo)fix cdc failed to synchronize datetime type in mysql, and added 
JsonDebeziumSchemaSerializer (#16971)
    
    * [fix](demo)fix cdc failed to synchronize datetime type in mysql, and 
added JsonDebeziumSchemaSerializer
    * add licenses for DateToStringConverter
---
 samples/doris-demo/flink-demo-v1.1/pom.xml         |  23 ++--
 .../flink/converter/DateToStringConverter.java     | 146 +++++++++++++++++++++
 .../demo/flink/dbsync/DatabaseFullDelSync.java     |   1 +
 .../doris/demo/flink/dbsync/DatabaseFullSync.java  |  61 ++-------
 4 files changed, 170 insertions(+), 61 deletions(-)

diff --git a/samples/doris-demo/flink-demo-v1.1/pom.xml 
b/samples/doris-demo/flink-demo-v1.1/pom.xml
index c1e00ea318..87054383ec 100644
--- a/samples/doris-demo/flink-demo-v1.1/pom.xml
+++ b/samples/doris-demo/flink-demo-v1.1/pom.xml
@@ -29,7 +29,7 @@ under the License.
     <properties>
         <scala.version>2.12</scala.version>
         <java.version>1.8</java.version>
-        <flink.version>1.14.3</flink.version>
+        <flink.version>1.15.3</flink.version>
         <fastjson.version>1.2.62</fastjson.version>
         <hadoop.version>2.8.3</hadoop.version>
         <scope.mode>compile</scope.mode>
@@ -53,17 +53,17 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_${scala.version}</artifactId>
+            <artifactId>flink-clients</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
+            <artifactId>flink-connector-jdbc</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
+            <artifactId>flink-connector-kafka</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
@@ -85,18 +85,13 @@ under the License.
         <!-- flink-doris-connector -->
         <dependency>
             <groupId>org.apache.doris</groupId>
-            <artifactId>flink-doris-connector-1.14_2.12</artifactId>
-            <version>1.1.0</version>
-        </dependency>
-        <dependency>
-            <groupId>mysql</groupId>
-            <artifactId>mysql-connector-java</artifactId>
-            <version>8.0.12</version>
+            <artifactId>flink-doris-connector-1.15</artifactId>
+            <version>1.2.1</version>
         </dependency>
         <dependency>
             <groupId>com.ververica</groupId>
-            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
-            <version>2.2.1</version>
+            <artifactId>flink-connector-mysql-cdc</artifactId>
+            <version>2.3.0</version>
             <exclusions>
                 <exclusion>
                     <artifactId>flink-shaded-guava</artifactId>
@@ -106,7 +101,7 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime-web_${scala.version}</artifactId>
+            <artifactId>flink-runtime-web</artifactId>
             <version>${flink.version}</version>
         </dependency>
     </dependencies>
diff --git 
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/converter/DateToStringConverter.java
 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/converter/DateToStringConverter.java
new file mode 100644
index 0000000000..fec014e539
--- /dev/null
+++ 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/converter/DateToStringConverter.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.converter;
+
+import io.debezium.spi.converter.CustomConverter;
+import io.debezium.spi.converter.RelationalColumn;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.time.DateTimeException;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+public class DateToStringConverter implements CustomConverter<SchemaBuilder, 
RelationalColumn> {
+  private static final Logger log = 
LoggerFactory.getLogger(DateToStringConverter.class);
+  private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
+  private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
+  private DateTimeFormatter datetimeFormatter = 
DateTimeFormatter.ISO_DATE_TIME;
+  private DateTimeFormatter timestampFormatter = 
DateTimeFormatter.ISO_DATE_TIME;
+  private ZoneId timestampZoneId = ZoneId.systemDefault();
+
+  public static Properties DEFAULT_PROPS = new Properties();
+
+  static {
+    DEFAULT_PROPS.setProperty("converters", "date");
+    DEFAULT_PROPS.setProperty("date.type", 
"org.apache.doris.demo.flink.converter.DateToStringConverter");
+    DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd");
+    DEFAULT_PROPS.setProperty("date.format.datetime", "yyyy-MM-dd HH:mm:ss");
+    DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss");
+    DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC");
+  }
+
+  @Override
+  public void configure(Properties props) {
+    readProps(props, "format.date", p -> dateFormatter = 
DateTimeFormatter.ofPattern(p));
+    readProps(props, "format.time", p -> timeFormatter = 
DateTimeFormatter.ofPattern(p));
+    readProps(props, "format.datetime", p -> datetimeFormatter = 
DateTimeFormatter.ofPattern(p));
+    readProps(props, "format.timestamp", p -> timestampFormatter = 
DateTimeFormatter.ofPattern(p));
+    readProps(props, "format.timestamp.zone", z -> timestampZoneId = 
ZoneId.of(z));
+  }
+
+  private void readProps(Properties properties, String settingKey, 
Consumer<String> callback) {
+    String settingValue = (String) properties.get(settingKey);
+    if (settingValue == null || settingValue.length() == 0) {
+      return;
+    }
+    try {
+      callback.accept(settingValue.trim());
+    } catch (IllegalArgumentException | DateTimeException e) {
+      log.error("setting {} is illegal:{}", settingKey, settingValue);
+      throw e;
+    }
+  }
+
+  @Override
+  public void converterFor(RelationalColumn column, 
ConverterRegistration<SchemaBuilder> registration) {
+    String sqlType = column.typeName().toUpperCase();
+    SchemaBuilder schemaBuilder = null;
+    Converter converter = null;
+    if ("DATE".equals(sqlType)) {
+      schemaBuilder = SchemaBuilder.string().optional();
+      converter = this::convertDate;
+    }
+    if ("TIME".equals(sqlType)) {
+      schemaBuilder = SchemaBuilder.string().optional();
+      converter = this::convertTime;
+    }
+    if ("DATETIME".equals(sqlType)) {
+      schemaBuilder = SchemaBuilder.string().optional();
+      converter = this::convertDateTime;
+    }
+    if ("TIMESTAMP".equals(sqlType)) {
+      schemaBuilder = SchemaBuilder.string().optional();
+      converter = this::convertTimestamp;
+    }
+    if (schemaBuilder != null) {
+      registration.register(schemaBuilder, converter);
+    }
+  }
+
+  private String convertDate(Object input) {
+    if (input instanceof LocalDate) {
+      return dateFormatter.format((LocalDate) input);
+    } else if (input instanceof Integer) {
+      LocalDate date = LocalDate.ofEpochDay((Integer) input);
+      return dateFormatter.format(date);
+    }
+    return null;
+  }
+
+  private String convertTime(Object input) {
+    if (input instanceof Duration) {
+      Duration duration = (Duration) input;
+      long seconds = duration.getSeconds();
+      int nano = duration.getNano();
+      LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);
+      return timeFormatter.format(time);
+    }
+    return null;
+  }
+
+  private String convertDateTime(Object input) {
+    if (input instanceof LocalDateTime) {
+      return datetimeFormatter.format((LocalDateTime) input);
+    } else if (input instanceof Timestamp) {
+      return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());
+    }
+    return null;
+  }
+
+  private String convertTimestamp(Object input) {
+    if (input instanceof ZonedDateTime) {
+      // mysql timestamp will be converted to UTC storage,and the 
zonedDatetime here is UTC time
+      ZonedDateTime zonedDateTime = (ZonedDateTime) input;
+      LocalDateTime localDateTime = 
zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();
+      return timestampFormatter.format(localDateTime);
+    } else if (input instanceof Timestamp) {
+      return timestampFormatter.format(((Timestamp) input).toLocalDateTime());
+    }
+    return null;
+  }
+
+}
diff --git 
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
index 22da0b0f84..49fa87b724 100644
--- 
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
+++ 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java
@@ -48,6 +48,7 @@ import java.util.HashMap;
  * Synchronize the full database through flink cdc
  *
  */
+@Deprecated
 public class DatabaseFullDelSync {
 
     private static String SOURCE_DB = "custom_db";//db
diff --git 
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
index 417543df49..55790222b2 100644
--- 
a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
+++ 
b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
@@ -20,24 +20,25 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.ververica.cdc.connectors.mysql.source.MySqlSource;
 import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.doris.demo.flink.converter.DateToStringConverter;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -60,6 +61,9 @@ public class DatabaseFullSync {
     private static String TARGET_DORIS_DB = "test";
 
     public static void main(String[] args) throws Exception {
+        Map<String, Object> customConverterConfigs = new HashMap<>();
+        customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, 
"numeric");
+
         MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
             .hostname(HOST)
             .port(MYSQL_PORT)
@@ -67,7 +71,9 @@ public class DatabaseFullSync {
             .tableList(SYNC_TBLS) // set captured table
             .username(MYSQL_USER)
             .password(MYSQL_PASSWD)
-            .deserializer(new JsonDebeziumDeserializationSchema()) // converts 
SourceRecord to JSON String
+            .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
+            .deserializer(new JsonDebeziumDeserializationSchema(false, 
customConverterConfigs))
+            .includeSchemaChanges(true)
             .build();
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -82,51 +88,12 @@ public class DatabaseFullSync {
         LOG.info("sync table list:{}",tableList);
         for(String tbl : tableList){
             SingleOutputStreamOperator<String> filterStream = 
filterTableData(cdcSource, tbl);
-            SingleOutputStreamOperator<String> cleanStream = 
clean(filterStream);
             DorisSink dorisSink = buildDorisSink(tbl);
-            cleanStream.sinkTo(dorisSink).name("sink " + tbl);
+            filterStream.sinkTo(dorisSink).name("sink " + tbl);
         }
         env.execute("Full Database Sync ");
     }
 
-    /**
-     * Get real data
-     * {
-     *     "before":null,
-     *     "after":{
-     *         "id":1,
-     *         "name":"zhangsan-1",
-     *         "age":18
-     *     },
-     *     "source":{
-     *         "db":"test",
-     *         "table":"test_1",
-     *         ...
-     *     },
-     *     "op":"c",
-     *     ...
-     * }
-     * */
-    private static SingleOutputStreamOperator<String> 
clean(SingleOutputStreamOperator<String> source) {
-        return source.flatMap(new FlatMapFunction<String,String>(){
-            @Override
-            public void flatMap(String row, Collector<String> out) throws 
Exception {
-                try{
-                    JSONObject rowJson = JSON.parseObject(row);
-                    String op = rowJson.getString("op");
-                    //history,insert,update
-                    if(Arrays.asList("r","c","u").contains(op)){
-                        
out.collect(rowJson.getJSONObject("after").toJSONString());
-                    }else{
-                        LOG.info("filter other op:{}",op);
-                    }
-                }catch (Exception ex){
-                    LOG.warn("filter other format binlog:{}",row);
-                }
-            }
-        });
-    }
-
     /**
      * Divide according to tablename
      * */
@@ -182,11 +149,11 @@ public class DatabaseFullSync {
         pro.setProperty("read_json_by_line", "true");
         DorisExecutionOptions executionOptions = 
DorisExecutionOptions.builder()
             .setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload 
label prefix,
-            .setStreamLoadProp(pro).build();
+            .setStreamLoadProp(pro).setDeletable(true).build();
 
         builder.setDorisReadOptions(DorisReadOptions.builder().build())
             .setDorisExecutionOptions(executionOptions)
-            .setSerializer(new SimpleStringSerializer()) //serialize according 
to string
+            
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build())
             .setDorisOptions(dorisBuilder.build());
 
         return builder.build();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to