This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 31f3eb4 [Doc] Use Flink CDC to realize real-time MySQL data into Apache Doris (#6933) 31f3eb4 is described below commit 31f3eb4a3cf9e5c8183c8fe8773ec9db41cccb35 Author: jiafeng.zhang <zhang...@gmail.com> AuthorDate: Sat Nov 6 16:18:19 2021 +0800 [Doc] Use Flink CDC to realize real-time MySQL data into Apache Doris (#6933) * Best Practices ,Use Flink CDC to realize real-time MySQL data into Apache Doris --- .../public/images/cdc/image-20211025162831632.png | Bin 0 -> 143949 bytes .../public/images/cdc/image-20211025165547903.png | Bin 0 -> 39487 bytes .../public/images/cdc/image-20211025170642628.png | Bin 0 -> 49165 bytes .../public/images/cdc/image-20211025182341086.png | Bin 0 -> 21677 bytes .../public/images/cdc/image-20211025182435827.png | Bin 0 -> 48534 bytes .../public/images/cdc/image-20211026095513892.png | Bin 0 -> 92955 bytes .../public/images/cdc/image-20211026100505972.png | Bin 0 -> 33173 bytes .../public/images/cdc/image-20211026100804091.png | Bin 0 -> 10829 bytes .../public/images/cdc/image-20211026100943474.png | Bin 0 -> 284282 bytes .../public/images/cdc/image-20211026101004547.png | Bin 0 -> 21854 bytes .../public/images/cdc/image-20211026101203629.png | Bin 0 -> 43195 bytes docs/.vuepress/sidebar/en.js | 3 +- docs/.vuepress/sidebar/zh-CN.js | 3 +- docs/en/best-practices/datax-doris-writer.md | 6 +- docs/en/best-practices/flink-cdc-to-doris.md | 367 +++++++++++++++++++++ docs/zh-CN/best-practices/flink-cdc-to-doris.md | 342 +++++++++++++++++++ 16 files changed, 716 insertions(+), 5 deletions(-) diff --git a/docs/.vuepress/public/images/cdc/image-20211025162831632.png b/docs/.vuepress/public/images/cdc/image-20211025162831632.png new file mode 100644 index 0000000..3760970 Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211025162831632.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211025165547903.png b/docs/.vuepress/public/images/cdc/image-20211025165547903.png new file mode 100644 index 0000000..0e61dda Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211025165547903.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211025170642628.png b/docs/.vuepress/public/images/cdc/image-20211025170642628.png new file mode 100644 index 0000000..5381d95 Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211025170642628.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211025182341086.png b/docs/.vuepress/public/images/cdc/image-20211025182341086.png new file mode 100644 index 0000000..800c63a Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211025182341086.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211025182435827.png b/docs/.vuepress/public/images/cdc/image-20211025182435827.png new file mode 100644 index 0000000..fa97de2 Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211025182435827.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211026095513892.png b/docs/.vuepress/public/images/cdc/image-20211026095513892.png new file mode 100644 index 0000000..8f139bd Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211026095513892.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211026100505972.png b/docs/.vuepress/public/images/cdc/image-20211026100505972.png new file mode 100644 index 0000000..e92f187 Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211026100505972.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211026100804091.png b/docs/.vuepress/public/images/cdc/image-20211026100804091.png new file mode 100644 index 0000000..7c1dd62 Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211026100804091.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211026100943474.png b/docs/.vuepress/public/images/cdc/image-20211026100943474.png new file mode 100644 index 0000000..bfc3d7b Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211026100943474.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211026101004547.png b/docs/.vuepress/public/images/cdc/image-20211026101004547.png new file mode 100644 index 0000000..e458645 Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211026101004547.png differ diff --git a/docs/.vuepress/public/images/cdc/image-20211026101203629.png b/docs/.vuepress/public/images/cdc/image-20211026101203629.png new file mode 100644 index 0000000..fcb5ea4 Binary files /dev/null and b/docs/.vuepress/public/images/cdc/image-20211026101203629.png differ diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 89b2d8c..840f7f2 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -223,7 +223,8 @@ module.exports = [ "fe-load-balance", "systemd", "samples", - "datax-doris-writer" + "datax-doris-writer", + "flink-cdc-to-doris" ], }, { diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index d0c53a1..6b83951 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -224,7 +224,8 @@ module.exports = [ "fe-load-balance", "systemd", "samples", - "datax-doris-writer" + "datax-doris-writer", + "flink-cdc-to-doris" ], }, { diff --git a/docs/en/best-practices/datax-doris-writer.md b/docs/en/best-practices/datax-doris-writer.md index bf5aabb..5321583 100644 --- a/docs/en/best-practices/datax-doris-writer.md +++ b/docs/en/best-practices/datax-doris-writer.md @@ -90,9 +90,9 @@ Here I am performing the compilation of Datax into a tar package, which is not t mvn -U clean package assembly:assembly -Dmaven.test.skip=true ``` - + - + After the compilation is complete, the tar package is in the `Datax/target` directory. You can copy the tar package to the place you need. Here I am directly performing the test in datax. Because the python version is 3.x version, you need to The three files in the bin directory are replaced with other versions of python 3. You can download this from the following address: @@ -244,7 +244,7 @@ python bin/datax.py doris.json Then you can see the execution result: - + Then go to the Doris database to check your table, the data has been imported, and the task execution is over diff --git a/docs/en/best-practices/flink-cdc-to-doris.md b/docs/en/best-practices/flink-cdc-to-doris.md new file mode 100644 index 0000000..7685eb5 --- /dev/null +++ b/docs/en/best-practices/flink-cdc-to-doris.md @@ -0,0 +1,367 @@ +--- +{ + "title": "Use Flink CDC to realize real-time MySQL data into Apache Doris", + "language": "zh-CN" +} +--- + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +# Use Flink CDC to realize real-time MySQL data into Apache Doris + +This article uses examples to demonstrate how to use Flink CDC and Doris' Flink Connector to monitor data from the Mysql database and store it in the corresponding table of the Doris data warehouse in real time. + +## 1. What is CDC + +CDC is the abbreviation of Change Data Capture technology. It can record incremental changes of the source database (Source) and synchronize it to one or more data destinations (Sink). In the synchronization process, you can also perform certain processing on the data, such as grouping (GROUP BY), multi-table association (JOIN), and so on. + +For example, for an e-commerce platform, a user’s order will be written to a source database in real time; Department A needs to simply aggregate the real-time data every minute and then save it to Redis for query, and Department B needs to temporarily store the data of the day in Elasticsearch will use a copy for report display, and Department C also needs a copy of data to ClickHouse for real-time data warehouse. As time goes by, the subsequent D and E departments will also have data a [...] + +### 1.1 Application Scenarios of CDC + +-**Data synchronization: **Used for backup and disaster recovery; +-**Data distribution:** One data source is distributed to multiple downstream systems; +-**Data Collection:** ETL data integration for data warehouse/data lake is a very important data source. + +There are many technical solutions for CDC, and the current mainstream implementation mechanisms in the industry can be divided into two types: + +-CDC based on query: + -Offline scheduling query jobs, batch processing. Synchronize a table to other systems, and obtain the latest data in the table through query each time; + -Data consistency cannot be guaranteed, and the data may have been changed many times during the inspection process; + -Real-time performance is not guaranteed, and there is a natural delay based on offline scheduling. +-Log-based CDC: + -Real-time consumption log, stream processing, for example, MySQL's binlog log completely records the changes in the database, and the binlog file can be used as the data source of the stream; + -Ensure data consistency, because the binlog file contains details of all historical changes; + -Guarantee real-time performance, because log files like binlog can be streamed for consumption and provide real-time data. + +## 2.Flink CDC + +Flink added the feature of CDC in version 1.11, referred to as change data capture. The name is a bit messy, let's look at the content of CDC from the previous data structure. + +The above is the log processing flow of the previous `mysq binlog`, for example, canal listens to binlog and writes the log to Kafka. And Apache Flink consumes Kakfa data in real time to achieve mysql data synchronization or other content. As a whole, it can be divided into the following stages. + +1. mysql open binlog +2. Canal synchronize binlog data write to kafka +3. Flink reads the binlog data in kakfa for related business processing. + +The overall processing link is longer and requires more components. Apache Flink CDC can obtain binlog directly from the database for downstream business calculation analysis + +### 2.1 Flink Connector Mysql CDC 2.0 Features + +Provide MySQL CDC 2.0, core features include + +-Concurrent reading, the reading performance of the full amount of data can be horizontally expanded; +-No locks throughout the process, no risk of locks on online business; +-Resumable transfers at breakpoints, support full-stage checkpoints. + +There are test documents on the Internet showing that the customer table in the TPC-DS data set was used for testing. The Flink version is 1.13.1, the data volume of the customer table is 65 million, the Source concurrency is 8, and the full read stage: + +-MySQL CDC 2.0 takes **13** minutes; +-MySQL CDC 1.4 takes **89** minutes; +-Reading performance improved by **6.8** times. + +## 3. What is Doris Flink Connector + +Flink Doris Connector is an extension of the doris community in order to facilitate users to use Flink to read and write Doris data tables. + +Currently doris supports Flink 1.11.x, 1.12.x, 1.13.x, Scala version: 2.12.x + +Currently, the Flink doris connector currently controls the warehousing through two parameters: + +1. sink.batch.size: How many pieces are written once, the default is 100 pieces +2. sink.batch.interval: how many seconds to write each time, the default is 1 second + +These two parameters work at the same time, and that condition will trigger the write doris table operation when the condition comes first. + +**Notice:** + +**Note** here is to enable the http v2 version, specifically configure `enable_http_server_v2=true` in fe.conf, and because the be list is obtained through fe http rest api, the users who need to be configured have admin permissions. + +## 4. Usage example + +### 4.1 Flink Doris Connector compilation + +First, we need to compile Doris's Flink connector, which can also be downloaded from the following address: + +https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar + +>Note: +> +>Here because Doris' Flink Connector is developed based on Scala 2.12.x version, so when you use Flink, please choose the version corresponding to Scala 2.12. +> +>If you downloaded the corresponding jar using the above address, please ignore the compiled content part below + +Compile under the doris docker compilation environment `apache/incubator-doris:build-env-1.2`, because the JDK version below 1.3 is 11, there will be compilation problems. + +Execute in the extension/flink-doris-connector/ source directory: + +``` +sh build.sh +``` + +After the compilation is successful, the file `doris-flink-1.0.0-SNAPSHOT.jar` will be generated in the `output/` directory. Copy this file to the `ClassPath` of `Flink` to use `Flink-Doris-Connector`. For example, for `Flink` running in `Local` mode, put this file in the `jars/` folder. For `Flink` running in `Yarn` cluster mode, put this file into the pre-deployment package. + +**For Flink 1.13.x version adaptation issues** + +```xml + <properties> + <scala.version>2.12</scala.version> + <flink.version>1.11.2</flink.version> + <libthrift.version>0.9.3</libthrift.version> + <arrow.version>0.15.1</arrow.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <doris.home>${basedir}/../../</doris.home> + <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty> + </properties> +``` + +Just change the `flink.version` here to be the same as your Flink cluster version, and edit again + +### 4.2 Configure Flink + +Here we use Flink Sql Client to operate. + +Here we demonstrate the software version used: + +1. Mysql 8.x +2. Apache Flink: 1.13.3 +3. Apache Doris: 0.14.13.1 + +#### 4.2.1 Install Flink + +First download and install Flink: + +https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz + + +Download Flink CDC related Jar packages: + +https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar + +Pay attention to the version correspondence between Flink CDC and Flink here + + + +-Copy the Flink Doris Connector jar package downloaded or compiled above to the lib directory under the Flink root directory +-The Flink cdc jar package is also copied to the lib directory under the Flink root directory + +The demonstration here uses the local stand-alone mode, + +```shell +# wget https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz +# tar zxvf flink-1.13.3-bin-scala_2.12.tgz +# cd flink-1.13.3 +# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar -P ./lib/ +# wget https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar -P ./lib/ +``` + + + +#### 4.2.2 Start Flink + +Here we are using the local stand-alone mode + +``` +# bin/start-cluster.sh +Starting cluster. +Starting standalonesession daemon on host doris01. +Starting taskexecutor daemon on host doris01. +``` + +We start the Flink cluster through web access (the default port is 8081), and we can see that the cluster starts normally + + + +### 4.3 Install Apache Doris + +For the specific method of installing and deploying Doris, refer to the following link: + +https://hf200012.github.io/2021/09/Apache-Doris-Environment installation and deployment + +### 4.3 Installation and Configuration Mysql + +1. Install Mysql + + Quickly use Docker to install and configure Mysql, refer to the following link for details + + https://segmentfault.com/a/1190000021523570 + +2. Open Mysql binlog + + Enter the Docker container to modify the /etc/my.cnf file, and add the following content under [mysqld], + + ``` + log_bin=mysql_bin + binlog-format=Row + server-id=1 + ``` + + Then restart Mysql + + ``` + systemctl restart mysqld + ``` + +3. Create Mysql database table + +```sql + CREATE TABLE `test_cdc` ( + `id` int NOT NULL AUTO_INCREMENT, + `name` varchar(255) DEFAULT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB +``` + +### 4.4 Create doris table + +```sql +CREATE TABLE `doris_test` ( + `id` int NULL COMMENT "", + `name` varchar(100) NULL COMMENT "" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "3", + "in_memory" = "false", + "storage_format" = "V2" + ); +``` + +### 4.5 Start Flink Sql Client + +```shell +./bin/sql-client.sh embedded +> set execution.result-mode=tableau; +``` + + + +#### 4.5.1 Create Flink CDC Mysql mapping table + +```sql +CREATE TABLE test_flink_cdc ( + id INT, + name STRING, + primary key(id) NOT ENFORCED +) WITH ( + 'connector' ='mysql-cdc', + 'hostname' ='localhost', + 'port' = '3306', + 'username' ='root', + 'password' ='password', + 'database-name' ='demo', + 'table-name' ='test_cdc' +); +``` + +The Mysql mapping table created by the query is displayed normally + +``` +select * from test_flink_cdc; +``` + + + +#### 4.5.2 Create Flink Doris Table Mapping Table + +Use Doris Flink Connector to create Doris mapping table + +```sql +CREATE TABLE doris_test_sink ( + id INT, + name STRING +) +WITH ( + 'connector' ='doris', + 'fenodes' ='localhost:8030', + 'table.identifier' ='db_audit.doris_test', + 'sink.batch.size' = '2', + 'sink.batch.interval'='1', + 'username' ='root', + 'password' ='' +) +``` + +Execute the above statement on the command line, you can see that the table is created successfully, and then execute the query statement to verify whether it is normal + +```sql +select * from doris_test_sink; +``` + + + +Perform an insert operation, insert the data in Mysql into Doris through Flink CDC combined with Doris Flink Connector + +```sql +INSERT INTO doris_test_sink select id,name from test_flink_cdc +``` + + + +After the submission is successful, we can see the related job task information on the Flink web interface + + + +#### 4.5.3 Insert data into Mysql table + +```sql +INSERT INTO test_cdc VALUES (123,'this is a update'); +INSERT INTO test_cdc VALUES (1212,'Test flink CDC'); +INSERT INTO test_cdc VALUES (1234,'This is a test'); +INSERT INTO test_cdc VALUES (11233,'zhangfeng_1'); +INSERT INTO test_cdc VALUES (21233,'zhangfeng_2'); +INSERT INTO test_cdc VALUES (31233,'zhangfeng_3'); +INSERT INTO test_cdc VALUES (41233,'zhangfeng_4'); +INSERT INTO test_cdc VALUES (51233,'zhangfeng_5'); +INSERT INTO test_cdc VALUES (61233,'zhangfeng_6'); +INSERT INTO test_cdc VALUES (71233,'zhangfeng_7'); +INSERT INTO test_cdc VALUES (81233,'zhangfeng_8'); +INSERT INTO test_cdc VALUES (91233,'zhangfeng_9'); +``` + +#### 4.5.4 Observe the data in the Doris table + +First stop the Insert into task, because I am in the local stand-alone mode, there is only one task task, so I have to stop it, and then execute the query statement on the command line to see the data + + + +#### 4.5.5 Modify Mysql data + +Restart the Insert into task + + + +Modify the data in the Mysql table + +```sql +update test_cdc set name='This is an operation to verify the modification' where id =123 +``` + +Look at the data in the Doris table again, you will find that it has been modified + +Note that if you want to modify the data in the Mysql table, the data in Doris is also modified. If the model of the Doris data table is a unique key model, other data models (Aggregate Key and Duplicate Key) cannot update data. + + + +#### 4.5.6 Delete data operation + +Currently Doris Flink Connector does not support delete operation, it is planned to add this operation later + diff --git a/docs/zh-CN/best-practices/flink-cdc-to-doris.md b/docs/zh-CN/best-practices/flink-cdc-to-doris.md new file mode 100644 index 0000000..2a39c50 --- /dev/null +++ b/docs/zh-CN/best-practices/flink-cdc-to-doris.md @@ -0,0 +1,342 @@ +# 使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris + +本文通过实例来演示怎么通过Flink CDC 结合Doris的Flink Connector实现从Mysql数据库中监听数据并实时入库到Doris数仓对应的表中。 + +## 1.什么是CDC + +CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如分组(GROUP BY)、多表的关联(JOIN)等。 + +例如对于电商平台,用户的订单会实时写入到某个源数据库;A 部门需要将每分钟的实时数据简单聚合处理后保存到 Redis 中以供查询,B 部门需要将当天的数据暂存到 Elasticsearch 一份来做报表展示,C 部门也需要一份数据到 ClickHouse 做实时数仓。随着时间的推移,后续 D 部门、E 部门也会有数据分析的需求,这种场景下,传统的拷贝分发多个副本方法很不灵活,而 CDC 可以实现一份变动记录,实时处理并投递到多个目的地。 + +### 1.1 CDC的应用场景 + +- **数据同步:**用于备份,容灾; +- **数据分发:**一个数据源分发给多个下游系统; +- **数据采集:**面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。 + +CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种: + +- 基于查询的 CDC: + - 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据; + - 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更; + - 不保障实时性,基于离线调度存在天然的延迟。 +- 基于日志的 CDC: + - 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源; + - 保障数据一致性,因为 binlog 文件包含了所有历史变更明细; + - 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。 + +## 2.Flink CDC + +Flink在1.11版本中新增了CDC的特性,简称 改变数据捕获。名称来看有点乱,我们先从之前的数据架构来看CDC的内容。 + +以上是之前的`mysq binlog`日志处理流程,例如 canal 监听 binlog 把日志写入到 kafka 中。而 Apache Flink 实时消费 Kakfa 的数据实现 mysql 数据的同步或其他内容等。拆分来说整体上可以分为以下几个阶段。 + +1. mysql开启binlog +2. canal同步binlog数据写入到kafka +3. flink读取kakfa中的binlog数据进行相关的业务处理。 + +整体的处理链路较长,需要用到的组件也比较多。Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析 + +### 2.1 Flink Connector Mysql CDC 2.0 特性 + +提供 MySQL CDC 2.0,核心 feature 包括 + +- 并发读取,全量数据的读取性能可以水平扩展; +- 全程无锁,不对线上业务产生锁的风险; +- 断点续传,支持全量阶段的 checkpoint。 + +网上有测试文档显示用 TPC-DS 数据集中的 customer 表进行了测试,Flink 版本是 1.13.1,customer 表的数据量是 6500 万条,Source 并发为 8,全量读取阶段: + +- MySQL CDC 2.0 用时 **13** 分钟; +- MySQL CDC 1.4 用时 **89** 分钟; +- 读取性能提升 **6.8** 倍。 + +## 3.什么是Doris Flink Connector + +Flink Doris Connector 是 doris 社区为了方便用户使用 Flink 读写Doris数据表的一个扩展, + +目前 doris 支持 Flink 1.11.x ,1.12.x,1.13.x,Scala版本:2.12.x + +目前Flink doris connector目前控制入库通过两个参数: + +1. sink.batch.size :每多少条写入一次,默认100条 +2. sink.batch.interval :每个多少秒写入一下,默认1秒 + +这两参数同时起作用,那个条件先到就触发写doris表操作, + +**注意:** + +这里**注意**的是要启用 http v2 版本,具体在 fe.conf 中配置 `enable_http_server_v2=true`,同时因为是通过 fe http rest api 获取 be 列表,这俩需要配置的用户有 admin 权限。 + +## 4. 用法示例 + +### 4.1 Flink Doris Connector 编译 + +首先我们要编译Doris的Flink connector,也可以通过下面的地址进行下载: + +https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar + +>注意: +> +>这里因为Doris 的Flink Connector 是基于Scala 2.12.x版本进行开发的,所有你在使用Flink 的时候请选择对应scala 2.12的版本, +> +>如果你使用上面地址下载了相应的jar,请忽略下面的编译内容部分 + +在 doris 的 docker 编译环境 `apache/incubator-doris:build-env-1.2` 下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。 + +在 extension/flink-doris-connector/ 源码目录下执行: + +``` +sh build.sh +``` + +编译成功后,会在 `output/` 目录下生成文件 `doris-flink-1.0.0-SNAPSHOT.jar`。将此文件复制到 `Flink` 的 `ClassPath` 中即可使用 `Flink-Doris-Connector`。例如,`Local` 模式运行的 `Flink`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Flink`,则将此文件放入预部署包中。 + +**针对Flink 1.13.x版本适配问题** + +```xml + <properties> + <scala.version>2.12</scala.version> + <flink.version>1.11.2</flink.version> + <libthrift.version>0.9.3</libthrift.version> + <arrow.version>0.15.1</arrow.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <doris.home>${basedir}/../../</doris.home> + <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty> + </properties> +``` + +只需要将这里的 `flink.version` 改成和你 Flink 集群版本一致,重新编辑即可 + +### 4.2 配置Flink + +这里我们是通过Flink Sql Client 方式来进行操作。 + +这里我们演示使用的软件版本: + +1. Mysql 8.x +2. Apache Flink : 1.13.3 +3. Apache Doris :0.14.13.1 + +#### 4.2.1 安装Flink + +首先下载和安装 Flink : + +https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz + + +下载Flink CDC相关Jar包: + +https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar + +这里注意Flink CDC 和Flink 的版本对应关系 + + + +#### 4.2.2 启动Flink + +这里我们使用的是本地单机模式 + +``` +# bin/start-cluster.sh +Starting cluster. +Starting standalonesession daemon on host doris01. +Starting taskexecutor daemon on host doris01. +``` + +我们通过web访问(默认端口是8081)启动起来Flink 集群,可以看到集群正常启动 + + + +### 4.3 安装Apache Doris + +具体安装部署Doris的方法,参照下面的连接: + +https://hf200012.github.io/2021/09/Apache-Doris-环境安装部署 + +### 4.3 安装配置 Mysql + +1. 安装Mysql + + 快速使用Docker安装配置Mysql,具体参照下面的连接 + + https://segmentfault.com/a/1190000021523570 + +2. 开启Mysql binlog + + 进入 Docker 容器修改/etc/my.cnf 文件,在 [mysqld] 下面添加以下内容, + + ``` + log_bin=mysql_bin + binlog-format=Row + server-id=1 + ``` + + 然后重启Mysql + + ``` + systemctl restart mysqld + ``` + +3. 创建Mysql数据库表 + +```sql + CREATE TABLE `test_cdc` ( + `id` int NOT NULL AUTO_INCREMENT, + `name` varchar(255) DEFAULT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB +``` + +### 4.4 创建doris表 + +```sql +CREATE TABLE `doris_test` ( + `id` int NULL COMMENT "", + `name` varchar(100) NULL COMMENT "" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "3", + "in_memory" = "false", + "storage_format" = "V2" + ); +``` + +### 4.5 启动 Flink Sql Client + +```shell +./bin/sql-client.sh embedded +> set execution.result-mode=tableau; +``` + + + +#### 4.5.1 创建 Flink CDC Mysql 映射表 + +```sql +CREATE TABLE test_flink_cdc ( + id INT, + name STRING, + primary key(id) NOT ENFORCED +) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'username' = 'root', + 'password' = 'password', + 'database-name' = 'demo', + 'table-name' = 'test_cdc' +); +``` + +执行查询创建的Mysql映射表,显示正常 + +``` +select * from test_flink_cdc; +``` + + + +#### 4.5.2 创建Flink Doris Table 映射表 + +使用Doris Flink Connector创建 Doris映射表 + +```sql +CREATE TABLE doris_test_sink ( + id INT, + name STRING +) +WITH ( + 'connector' = 'doris', + 'fenodes' = 'localhost:8030', + 'table.identifier' = 'db_audit.doris_test', + 'sink.batch.size' = '2', + 'sink.batch.interval'='1', + 'username' = 'root', + 'password' = '' +) +``` + +在命令行下执行上面的语句,可以看到创建表成功,然后执行查询语句,验证是否正常 + +```sql +select * from doris_test_sink; +``` + + + +执行插入操作,将Mysql 里的数据通过 Flink CDC结合Doris Flink Connector方式插入到 Doris中 + +```sql +INSERT INTO doris_test_sink select id,name from test_flink_cdc +``` + + + +提交成功之后我们在Flink的Web界面可以看到相关的Job任务信息 + + + +#### 4.5.3 向Mysql表中插入数据 + +```sql +INSERT INTO test_cdc VALUES (123, 'this is a update'); +INSERT INTO test_cdc VALUES (1212, '测试flink CDC'); +INSERT INTO test_cdc VALUES (1234, '这是测试'); +INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1'); +INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2'); +INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3'); +INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4'); +INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5'); +INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6'); +INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7'); +INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8'); +INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9'); +``` + +#### 4.5.4 观察Doris表的数据 + +首先停掉Insert into这个任务,因为我是在本地单机模式,只有一个task任务,所以要停掉,然后在命令行执行查询语句才能看到数据 + + + +#### 4.5.5 修改Mysql的数据 + +重新启动Insert into任务 + + + +修改Mysql表里的数据 + +```sql +update test_cdc set name='这个是验证修改的操作' where id =123 +``` + +再去观察Doris表中的数据,你会发现已经修改 + +注意这里如果要想Mysql表里的数据修改,Doris里的数据也同样修改,Doris数据表的模型要是Unique key模型,其他数据模型(Aggregate Key 和 Duplicate Key)不能进行数据的更新操作。 + + + +#### 4.5.6 删除数据操作 + +目前Doris Flink Connector 还不支持删除操作,后面计划会加上这个操作 + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org