This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 698e9cd243 [fix](demo)fix cdc failed to synchronize datetime type in mysql, and added JsonDebeziumSchemaSerializer (#16971) 698e9cd243 is described below commit 698e9cd243092b4b47a818fa0740998cfc6fef7f Author: DongLiang-0 <46414265+donglian...@users.noreply.github.com> AuthorDate: Thu Mar 2 14:14:58 2023 +0800 [fix](demo)fix cdc failed to synchronize datetime type in mysql, and added JsonDebeziumSchemaSerializer (#16971) * [fix](demo)fix cdc failed to synchronize datetime type in mysql, and added JsonDebeziumSchemaSerializer * add licenses for DateToStringConverter --- samples/doris-demo/flink-demo-v1.1/pom.xml | 23 ++-- .../flink/converter/DateToStringConverter.java | 146 +++++++++++++++++++++ .../demo/flink/dbsync/DatabaseFullDelSync.java | 1 + .../doris/demo/flink/dbsync/DatabaseFullSync.java | 61 ++------- 4 files changed, 170 insertions(+), 61 deletions(-) diff --git a/samples/doris-demo/flink-demo-v1.1/pom.xml b/samples/doris-demo/flink-demo-v1.1/pom.xml index c1e00ea318..87054383ec 100644 --- a/samples/doris-demo/flink-demo-v1.1/pom.xml +++ b/samples/doris-demo/flink-demo-v1.1/pom.xml @@ -29,7 +29,7 @@ under the License. <properties> <scala.version>2.12</scala.version> <java.version>1.8</java.version> - <flink.version>1.14.3</flink.version> + <flink.version>1.15.3</flink.version> <fastjson.version>1.2.62</fastjson.version> <hadoop.version>2.8.3</hadoop.version> <scope.mode>compile</scope.mode> @@ -53,17 +53,17 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${scala.version}</artifactId> + <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-jdbc_${scala.version}</artifactId> + <artifactId>flink-connector-jdbc</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka_${scala.version}</artifactId> + <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <dependency> @@ -85,18 +85,13 @@ under the License. <!-- flink-doris-connector --> <dependency> <groupId>org.apache.doris</groupId> - <artifactId>flink-doris-connector-1.14_2.12</artifactId> - <version>1.1.0</version> - </dependency> - <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <version>8.0.12</version> + <artifactId>flink-doris-connector-1.15</artifactId> + <version>1.2.1</version> </dependency> <dependency> <groupId>com.ververica</groupId> - <artifactId>flink-sql-connector-mysql-cdc</artifactId> - <version>2.2.1</version> + <artifactId>flink-connector-mysql-cdc</artifactId> + <version>2.3.0</version> <exclusions> <exclusion> <artifactId>flink-shaded-guava</artifactId> @@ -106,7 +101,7 @@ under the License. </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime-web_${scala.version}</artifactId> + <artifactId>flink-runtime-web</artifactId> <version>${flink.version}</version> </dependency> </dependencies> diff --git a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/converter/DateToStringConverter.java b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/converter/DateToStringConverter.java new file mode 100644 index 0000000000..fec014e539 --- /dev/null +++ b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/converter/DateToStringConverter.java @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.demo.flink.converter; + +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.time.DateTimeException; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Properties; +import java.util.function.Consumer; + +public class DateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { + private static final Logger log = LoggerFactory.getLogger(DateToStringConverter.class); + private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; + private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; + private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; + private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; + private ZoneId timestampZoneId = ZoneId.systemDefault(); + + public static Properties DEFAULT_PROPS = new Properties(); + + static { + DEFAULT_PROPS.setProperty("converters", "date"); + DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.demo.flink.converter.DateToStringConverter"); + DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd"); + DEFAULT_PROPS.setProperty("date.format.datetime", "yyyy-MM-dd HH:mm:ss"); + DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss"); + DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC"); + } + + @Override + public void configure(Properties props) { + readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z)); + } + + private void readProps(Properties properties, String settingKey, Consumer<String> callback) { + String settingValue = (String) properties.get(settingKey); + if (settingValue == null || settingValue.length() == 0) { + return; + } + try { + callback.accept(settingValue.trim()); + } catch (IllegalArgumentException | DateTimeException e) { + log.error("setting {} is illegal:{}", settingKey, settingValue); + throw e; + } + } + + @Override + public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { + String sqlType = column.typeName().toUpperCase(); + SchemaBuilder schemaBuilder = null; + Converter converter = null; + if ("DATE".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertDate; + } + if ("TIME".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertTime; + } + if ("DATETIME".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertDateTime; + } + if ("TIMESTAMP".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertTimestamp; + } + if (schemaBuilder != null) { + registration.register(schemaBuilder, converter); + } + } + + private String convertDate(Object input) { + if (input instanceof LocalDate) { + return dateFormatter.format((LocalDate) input); + } else if (input instanceof Integer) { + LocalDate date = LocalDate.ofEpochDay((Integer) input); + return dateFormatter.format(date); + } + return null; + } + + private String convertTime(Object input) { + if (input instanceof Duration) { + Duration duration = (Duration) input; + long seconds = duration.getSeconds(); + int nano = duration.getNano(); + LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano); + return timeFormatter.format(time); + } + return null; + } + + private String convertDateTime(Object input) { + if (input instanceof LocalDateTime) { + return datetimeFormatter.format((LocalDateTime) input); + } else if (input instanceof Timestamp) { + return datetimeFormatter.format(((Timestamp) input).toLocalDateTime()); + } + return null; + } + + private String convertTimestamp(Object input) { + if (input instanceof ZonedDateTime) { + // mysql timestamp will be converted to UTC storage,and the zonedDatetime here is UTC time + ZonedDateTime zonedDateTime = (ZonedDateTime) input; + LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime(); + return timestampFormatter.format(localDateTime); + } else if (input instanceof Timestamp) { + return timestampFormatter.format(((Timestamp) input).toLocalDateTime()); + } + return null; + } + +} diff --git a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java index 22da0b0f84..49fa87b724 100644 --- a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java +++ b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullDelSync.java @@ -48,6 +48,7 @@ import java.util.HashMap; * Synchronize the full database through flink cdc * */ +@Deprecated public class DatabaseFullDelSync { private static String SOURCE_DB = "custom_db";//db diff --git a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java index 417543df49..55790222b2 100644 --- a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java +++ b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java @@ -20,24 +20,25 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.doris.demo.flink.converter.DateToStringConverter; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.DorisSink; -import org.apache.doris.flink.sink.writer.SimpleStringSerializer; +import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.Collector; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -60,6 +61,9 @@ public class DatabaseFullSync { private static String TARGET_DORIS_DB = "test"; public static void main(String[] args) throws Exception { + Map<String, Object> customConverterConfigs = new HashMap<>(); + customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); + MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname(HOST) .port(MYSQL_PORT) @@ -67,7 +71,9 @@ public class DatabaseFullSync { .tableList(SYNC_TBLS) // set captured table .username(MYSQL_USER) .password(MYSQL_PASSWD) - .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String + .debeziumProperties(DateToStringConverter.DEFAULT_PROPS) + .deserializer(new JsonDebeziumDeserializationSchema(false, customConverterConfigs)) + .includeSchemaChanges(true) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -82,51 +88,12 @@ public class DatabaseFullSync { LOG.info("sync table list:{}",tableList); for(String tbl : tableList){ SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl); - SingleOutputStreamOperator<String> cleanStream = clean(filterStream); DorisSink dorisSink = buildDorisSink(tbl); - cleanStream.sinkTo(dorisSink).name("sink " + tbl); + filterStream.sinkTo(dorisSink).name("sink " + tbl); } env.execute("Full Database Sync "); } - /** - * Get real data - * { - * "before":null, - * "after":{ - * "id":1, - * "name":"zhangsan-1", - * "age":18 - * }, - * "source":{ - * "db":"test", - * "table":"test_1", - * ... - * }, - * "op":"c", - * ... - * } - * */ - private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) { - return source.flatMap(new FlatMapFunction<String,String>(){ - @Override - public void flatMap(String row, Collector<String> out) throws Exception { - try{ - JSONObject rowJson = JSON.parseObject(row); - String op = rowJson.getString("op"); - //history,insert,update - if(Arrays.asList("r","c","u").contains(op)){ - out.collect(rowJson.getJSONObject("after").toJSONString()); - }else{ - LOG.info("filter other op:{}",op); - } - }catch (Exception ex){ - LOG.warn("filter other format binlog:{}",row); - } - } - }); - } - /** * Divide according to tablename * */ @@ -182,11 +149,11 @@ public class DatabaseFullSync { pro.setProperty("read_json_by_line", "true"); DorisExecutionOptions executionOptions = DorisExecutionOptions.builder() .setLabelPrefix("label-" + table + UUID.randomUUID()) //streamload label prefix, - .setStreamLoadProp(pro).build(); + .setStreamLoadProp(pro).setDeletable(true).build(); builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionOptions) - .setSerializer(new SimpleStringSerializer()) //serialize according to string + .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisBuilder.build()).build()) .setDorisOptions(dorisBuilder.build()); return builder.build(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org