This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1e8ae7ad16 [doc](flink-connector)improve flink connector doc (#22143) 1e8ae7ad16 is described below commit 1e8ae7ad16fee79c2fd01285ecdb8c73011e679e Author: wudi <676366...@qq.com> AuthorDate: Tue Jul 25 15:58:35 2023 +0800 [doc](flink-connector)improve flink connector doc (#22143) --- docs/en/docs/ecosystem/flink-doris-connector.md | 446 ++++++++++++--------- docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 264 +++++++----- 2 files changed, 419 insertions(+), 291 deletions(-) diff --git a/docs/en/docs/ecosystem/flink-doris-connector.md b/docs/en/docs/ecosystem/flink-doris-connector.md index 7a07793ec4..b8f385469d 100644 --- a/docs/en/docs/ecosystem/flink-doris-connector.md +++ b/docs/en/docs/ecosystem/flink-doris-connector.md @@ -28,12 +28,7 @@ under the License. - -The Flink Doris Connector can support operations (read, insert, modify, delete) data stored in Doris through Flink. - -Github: https://github.com/apache/doris-flink-connector - -* `Doris` table can be mapped to `DataStream` or `Table`. +* [Flink Doris Connector](https://github.com/apache/doris-flink-connector) can support data stored in Doris through Flink operations (read, insert, modify, delete). This document introduces how to operate Doris through Datastream and SQL through Flink. >**Note:** > @@ -50,124 +45,108 @@ Github: https://github.com/apache/doris-flink-connector | 1.3.0 | 1.16 | 1.0+ | 8 | - | | 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- | -## Build and Install - -Ready to work - -1. Modify the `custom_env.sh.tpl` file and rename it to `custom_env.sh` +## USE -2. Execute following command in source dir: -`sh build.sh` -Enter the flink version you need to compile according to the prompt. +### Maven -After the compilation is successful, the target jar package will be generated in the `dist` directory, such as: `flink-doris-connector-1.3.0-SNAPSHOT.jar`. -Copy this file to `classpath` in `Flink` to use `Flink-Doris-Connector`. For example, `Flink` running in `Local` mode, put this file in the `lib/` folder. `Flink` running in `Yarn` cluster mode, put this file in the pre-deployment package. - - -## Using Maven - -Add flink-doris-connector Maven dependencies +Add flink-doris-connector ``` <!-- flink-doris-connector --> <dependency> - <groupId>org.apache.doris</groupId> - <artifactId>flink-doris-connector-1.16</artifactId> - <version>1.3.0</version> -</dependency> + <groupId>org.apache.doris</groupId> + <artifactId>flink-doris-connector-1.16</artifactId> + <version>1.4.0</version> +</dependency> ``` -**Notes** +**Remark** -1. Please replace the corresponding Connector and Flink dependency versions according to different Flink versions. Version 1.3.0 only supports Flink1.16 +1. Please replace the corresponding Connector and Flink dependent versions according to different Flink versions. 2. You can also download the relevant version jar package from [here](https://repo.maven.apache.org/maven2/org/apache/doris/). -## How to use - -There are three ways to use Flink Doris Connector. +### compile -* SQL -* DataStream +When compiling, you can run `sh build.sh` directly. For details, please refer to [here](https://github.com/apache/doris-flink-connector/blob/master/README.md). -### Parameters Configuration +After the compilation is successful, the target jar package will be generated in the `dist` directory, such as: `flink-doris-connector-1.5.0-SNAPSHOT.jar`. +Copy this file to `classpath` of `Flink` to use `Flink-Doris-Connector`. For example, `Flink` running in `Local` mode, put this file in the `lib/` folder. `Flink` running in `Yarn` cluster mode, put this file into the pre-deployment package. -Flink Doris Connector Sink writes data to Doris by the `Stream load`, and also supports the configurations of `Stream load`, For specific parameters, please refer to [here](../data-operate/import/import-way/stream-load-manual.md). +## Instructions -* SQL configured by `sink.properties.` in the `WITH` -* DataStream configured by `DorisExecutionOptions.builder().setStreamLoadProp(Properties)` +### read - -### SQL - -* Source +####SQL ```sql +-- doris source CREATE TABLE flink_doris_source ( - name STRING, - age INT, - price DECIMAL(5,2), - sale DOUBLE - ) - WITH ( - 'connector' = 'doris', - 'fenodes' = 'FE_IP:8030', - 'table.identifier' = 'database.table', - 'username' = 'root', - 'password' = 'password' + name STRING, + age INT, + price DECIMAL(5,2), + sale DOUBLE + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = 'FE_IP:8030', + 'table.identifier' = 'database.table', + 'username' = 'root', + 'password' = 'password' ); ``` -* Sink +####DataStream -```sql --- enable checkpoint -SET 'execution.checkpointing.interval' = '10s'; -CREATE TABLE flink_doris_sink ( - name STRING, - age INT, - price DECIMAL(5,2), - sale DOUBLE - ) - WITH ( - 'connector' = 'doris', - 'fenodes' = 'FE_IP:8030', - 'table.identifier' = 'db.table', - 'username' = 'root', - 'password' = 'password', - 'sink.label-prefix' = 'doris_label' -); -``` +```java +DorisOptions.Builder builder = DorisOptions.builder() + .setFenodes("FE_IP:8030") + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("password"); -* Insert +DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() + .setDorisOptions(builder.build()) + .setDorisReadOptions(DorisReadOptions.builder().build()) + .setDeserializer(new SimpleListDeserializationSchema()) + .build(); -```sql -INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source +env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); ``` -### DataStream +### write -* Source +####SQL -```java -DorisOptions.Builder builder = DorisOptions.builder() - .setFenodes("FE_IP:8030") - .setTableIdentifier("db.table") - .setUsername("root") - .setPassword("password"); +```sql +--enable checkpoint +SET 'execution.checkpointing.interval' = '10s'; -DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() - .setDorisOptions(builder.build()) - .setDorisReadOptions(DorisReadOptions.builder().build()) - .setDeserializer(new SimpleListDeserializationSchema()) - .build(); +-- doris sink +CREATE TABLE flink_doris_sink ( + name STRING, + age INT, + price DECIMAL(5,2), + sale DOUBLE + ) + WITH ( + 'connector' = 'doris', + 'fenodes' = 'FE_IP:8030', + 'table.identifier' = 'db.table', + 'username' = 'root', + 'password' = 'password', + 'sink.label-prefix' = 'doris_label' +); -env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); +-- submit insert job +INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source ``` -* Sink +####DataStream -**String Stream** +DorisSink writes data to Doris through StreamLoad, and DataStream supports different serialization methods when writing + +**String data stream (SimpleStringSerializer)** ```java // enable checkpoint @@ -178,37 +157,29 @@ env.setRuntimeMode(RuntimeExecutionMode.BATCH); DorisSink.Builder<String> builder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes("FE_IP:8030") - .setTableIdentifier("db.table") - .setUsername("root") - .setPassword("password"); - -Properties properties = new Properties(); -/** -json format to streamload -properties.setProperty("format", "json"); -properties.setProperty("read_json_by_line", "true"); -**/ + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("password"); -DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); +DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix("label-doris") //streamload label prefix - .setStreamLoadProp(properties); + .setDeletable(false); builder.setDorisReadOptions(DorisReadOptions.builder().build()) - .setDorisExecutionOptions(executionBuilder.build()) - .setSerializer(new SimpleStringSerializer()) //serialize according to string - .setDorisOptions(dorisBuilder.build()); - + .setDorisExecutionOptions(executionBuilder.build()) + .setSerializer(new SimpleStringSerializer()) //serialize according to string + .setDorisOptions(dorisBuilder.build()); //mock string source List<Tuple2<String, Integer>> data = new ArrayList<>(); data.add(new Tuple2<>("doris",1)); -DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data); +DataStreamSource<Tuple2<String, Integer>> source = env. fromCollection(data); source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1) - .sinkTo(builder.build()); + .sinkTo(builder.build()); ``` -**RowData Stream** +**RowData data stream (RowDataSerializer)** ```java // enable checkpoint @@ -220,105 +191,163 @@ env.setRuntimeMode(RuntimeExecutionMode.BATCH); DorisSink.Builder<RowData> builder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes("FE_IP:8030") - .setTableIdentifier("db.table") - .setUsername("root") - .setPassword("password"); + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("password"); // json format to streamload Properties properties = new Properties(); properties.setProperty("format", "json"); properties.setProperty("read_json_by_line", "true"); -DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); +DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix("label-doris") //streamload label prefix - .setStreamLoadProp(properties); //streamload params + .setDeletable(false) + .setStreamLoadProp(properties); //streamload params -//flink rowdata‘s schema +//flink rowdata's schema String[] fields = {"city", "longitude", "latitude", "destroy_date"}; DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()}; builder.setDorisReadOptions(DorisReadOptions.builder().build()) - .setDorisExecutionOptions(executionBuilder.build()) - .setSerializer(RowDataSerializer.builder() //serialize according to rowdata - .setFieldNames(fields) - .setType("json") //json format - .setFieldType(types).build()) - .setDorisOptions(dorisBuilder.build()); + .setDorisExecutionOptions(executionBuilder.build()) + .setSerializer(RowDataSerializer.builder() //serialize according to rowdata + .setFieldNames(fields) + .setType("json") //json format + .setFieldType(types).build()) + .setDorisOptions(dorisBuilder.build()); //mock rowdata source -DataStream<RowData> source = env.fromElements("") - .map(new MapFunction<String, RowData>() { - @Override - public RowData map(String value) throws Exception { - GenericRowData genericRowData = new GenericRowData(4); - genericRowData.setField(0, StringData.fromString("beijing")); - genericRowData.setField(1, 116.405419); - genericRowData.setField(2, 39.916927); - genericRowData.setField(3, LocalDate.now().toEpochDay()); - return genericRowData; - } - }); - -source.sinkTo(builder.build()); +DataStream<RowData> source = env. fromElements("") + .map(new MapFunction<String, RowData>() { + @Override + public RowData map(String value) throws Exception { + GenericRowData genericRowData = new GenericRowData(4); + genericRowData.setField(0, StringData.fromString("beijing")); + genericRowData.setField(1, 116.405419); + genericRowData.setField(2, 39.916927); + genericRowData.setField(3, LocalDate.now().toEpochDay()); + return genericRowData; + } + }); + +source. sinkTo(builder. build()); ``` -**SchemaChange Stream** +**SchemaChange data stream (JsonDebeziumSchemaSerializer)** + ```java // enable checkpoint env.enableCheckpointing(10000); Properties props = new Properties(); -props.setProperty("format", "json"); +props. setProperty("format", "json"); props.setProperty("read_json_by_line", "true"); -DorisOptions dorisOptions = DorisOptions.builder() - .setFenodes("127.0.0.1:8030") - .setTableIdentifier("test.t1") - .setUsername("root") - .setPassword("").build(); +DorisOptions dorisOptions = DorisOptions. builder() + .setFenodes("127.0.0.1:8030") + .setTableIdentifier("test.t1") + .setUsername("root") + .setPassword("").build(); -DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); -executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID()) - .setStreamLoadProp(props).setDeletable(true); +DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); +executionBuilder.setLabelPrefix("label-prefix") + .setStreamLoadProp(props).setDeletable(true); DorisSink.Builder<String> builder = DorisSink.builder(); builder.setDorisReadOptions(DorisReadOptions.builder().build()) - .setDorisExecutionOptions(executionBuilder.build()) - .setDorisOptions(dorisOptions) - .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()); + .setDorisExecutionOptions(executionBuilder.build()) + .setDorisOptions(dorisOptions) + .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()); -env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//.print(); - .sinkTo(builder.build()); +env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") + .sinkTo(builder.build()); ``` -refer: [CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java) - - -### General - -| Key | Default Value | Required | Comment | -| -------------------------------- | ----------------- | ------------------------------------------------------------ | -------------------------------- | -| fenodes | -- | Y | Doris FE http address, support multiple addresses, separated by commas | -| table.identifier | -- | Y | Doris table identifier, eg, db1.tbl1 | -| username | -- | Y | Doris username | -| password | -- | Y | Doris password | -| doris.request.retries | 3 | N | Number of retries to send requests to Doris | -| doris.request.connect.timeout.ms | 30000 | N | Connection timeout for sending requests to Doris | -| doris.request.read.timeout.ms | 30000 | N | Read timeout for sending request to Doris | -| doris.request.query.timeout.s | 3600 | N | Query the timeout time of doris, the default is 1 hour, -1 means no timeout limit | -| doris.request.tablet.size | Integer.MAX_VALUE | N | The number of Doris Tablets corresponding to an Partition. The smaller this value is set, the more partitions will be generated. This will increase the parallelism on the flink side, but at the same time will cause greater pressure on Doris. | -| doris.batch.size | 1024 | N | The maximum number of rows to read data from BE at one time. Increasing this value can reduce the number of connections between Flink and Doris. Thereby reducing the extra time overhead caused by network delay. | -| doris.exec.mem.limit | 2147483648 | N | Memory limit for a single query. The default is 2GB, in bytes. | -| doris.deserialize.arrow.async | false | N | Whether to support asynchronous conversion of Arrow format to RowBatch required for flink-doris-connector iteration | -| doris.deserialize.queue.size | 64 | N | Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true | -| doris.read.field | -- | N | List of column names in the Doris table, separated by commas | -| doris.filter.query | -- | N | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. | -| sink.label-prefix | -- | Y | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of Flink. | -| sink.properties.* | -- | N | The stream load parameters.<br /> <br /> eg:<br /> sink.properties.column_separator' = ','<br /> <br /> Setting 'sink.properties.escape_delimiters' = 'true' if you want to use a control char as a separator, so that such as '\\x01' will translate to binary 0x01<br /><br />Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.read_json_by_line' ='true' | -| sink.enable-delete | true | N | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Uniq model.| -| sink.enable-2pc | true | N | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](../data-operate/import/import-way/stream-load-manual.md). | -| sink.max-retries | 3 | N | In the 2pc scenario, the number of retries after the commit phase fails. | -| sink.buffer-size | 1048576(1MB) | N | Write data cache buffer size, in bytes. It is not recommended to modify, the default configuration is sufficient. | -| sink.buffer-count | 3 | N | The number of write data cache buffers, it is not recommended to modify, the default configuration is sufficient. +Reference: [CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java) +### Lookup Join + +```sql +CREATE TABLE fact_table ( + `id` BIGINT, + `name` STRING, + `city` STRING, + `process_time` as proctime() +) WITH ( + 'connector' = 'kafka', + ... +); + +create table dim_city( + `city` STRING, + `level` INT , + `province` STRING, + `country` STRING +) WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', + 'table.identifier' = 'dim.dim_city', + 'username' = 'root', + 'password' = '' +); + +SELECT a.id, a.name, a.city, c.province, c.country,c.level +FROM fact_table a +LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c +ON a.city = c.city +``` + +## configuration + +### General configuration items + +| Key | Default Value | Required | Comment | +| -------------------------------- | ------------- | -------- | ------------------------------------------------------------ | +| fenodes | -- | Y | Doris FE http address, multiple addresses are supported, separated by commas | +| table.identifier | -- | Y | Doris table name, such as: db.tbl | +| username | -- | Y | username to access Doris | +| password | -- | Y | Password to access Doris | +| doris.request.retries | 3 | N | Number of retries to send requests to Doris | +| doris.request.connect.timeout.ms | 30000 | N | Connection timeout for sending requests to Doris | +| doris.request.read.timeout.ms | 30000 | N | Read timeout for sending requests to Doris | + +### Source configuration item + +| Key | Default Value | Required | Comment | +| ----------------------------- | ------------------ | -------- | ------------------------------------------------------------ | +| doris.request.query.timeout.s | 3600 | N | The timeout time for querying Doris, the default value is 1 hour, -1 means no timeout limit | +| doris.request.tablet.size | Integer. MAX_VALUE | N | The number of Doris Tablets corresponding to a Partition. The smaller this value is set, the more Partitions will be generated. This improves the parallelism on the Flink side, but at the same time puts more pressure on Doris. | +| doris.batch.size | 1024 | N | The maximum number of rows to read data from BE at a time. Increasing this value reduces the number of connections established between Flink and Doris. Thereby reducing the additional time overhead caused by network delay. | +| doris.exec.mem.limit | 2147483648 | N | Memory limit for a single query. The default is 2GB, in bytes | +| doris.deserialize.arrow.async | FALSE | N | Whether to support asynchronous conversion of Arrow format to RowBatch needed for flink-doris-connector iterations | +| doris.deserialize.queue.size | 64 | N | Asynchronous conversion of internal processing queue in Arrow format, effective when doris.deserialize.arrow.async is true | +| doris.read.field | -- | N | Read the list of column names of the Doris table, separated by commas | +| doris.filter.query | -- | N | The expression to filter the read data, this expression is transparently passed to Doris. Doris uses this expression to complete source-side data filtering. For example age=18. | + +### Sink configuration items + +| Key | Default Value | Required | Comment | +| ------------------ | ------------- | -------- | ------------------------------------------------------------ | +| sink.label-prefix | -- | Y | The label prefix used by Stream load import. In the 2pc scenario, global uniqueness is required to ensure Flink's EOS semantics. | +| sink.properties.* | -- | N | Import parameters for Stream Load. <br/>For example: 'sink.properties.column_separator' = ', ' defines column delimiters, 'sink.properties.escape_delimiters' = 'true' special characters as delimiters, '\x01' will be converted to binary 0x01 <br/><br/>JSON format import<br/>'sink.properties.format' = 'json' 'sink.properties. read_json_by_line' = 'true'<br/>Detailed parameters refer to [here](../data-operate/import/import-way/stream-load-ma [...] +| sink.enable-delete | TRUE | N | Whether to enable delete. This option requires the Doris table to enable the batch delete function (Doris 0.15+ version is enabled by default), and only supports the Unique model. | +| sink.enable-2pc | TRUE | N | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](../data-operate/import/import-way/stream-load-manual.md). | +| sink.buffer-size | 1MB | N | The size of the write data cache buffer, in bytes. It is not recommended to modify, the default configuration is enough | +| sink.buffer-count | 3 | N | The number of write data buffers. It is not recommended to modify, the default configuration is enough | +| sink.max-retries | 3 | N | Maximum number of retries after Commit failure, default 3 | + +### Lookup Join configuration item + +| Key | Default Value | Required | Comment | +| --------------------------------- | ------------- | -------- | ------------------------------------------------------------ | +| jdbc-url | -- | Y | jdbc connection information | +| lookup.cache.max-rows | -1 | N | The maximum number of rows in the lookup cache, the default value is -1, and the cache is not enabled | +| lookup.cache.ttl | 10s | N | The maximum time of lookup cache, the default is 10s | +| lookup.max-retries | 1 | N | The number of retries after a lookup query fails | +| lookup.jdbc.async | false | N | Whether to enable asynchronous lookup, the default is false | +| lookup.jdbc.read.batch.size | 128 | N | Under asynchronous lookup, the maximum batch size for each query | +| lookup.jdbc.read.batch.queue-size | 256 | N | The size of the intermediate buffer queue during asynchronous lookup | +| lookup.jdbc.read.thread-size | 3 | N | The number of jdbc threads for lookup in each task | ## Doris & Flink Column Type Mapping @@ -342,7 +371,7 @@ refer: [CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/ | TIME | DOUBLE | | HLL | Unsupported datatype | -## An example of using Flink CDC to access Doris (supports insert/update/delete events) +## An example of using Flink CDC to access Doris ```sql SET 'execution.checkpointing.interval' = '10s'; CREATE TABLE cdc_mysql_source ( @@ -359,7 +388,7 @@ CREATE TABLE cdc_mysql_source ( 'table-name' = 'table' ); --- Support delete event synchronization (sink.enable-delete='true'), requires Doris table to enable batch delete function +-- Support synchronous insert/update/delete events CREATE TABLE doris_sink ( id INT, name STRING @@ -372,20 +401,22 @@ WITH ( 'password' = '', 'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true', - 'sink.enable-delete' = 'true', + 'sink.enable-delete' = 'true', -- Synchronize delete events 'sink.label-prefix' = 'doris_label' ); insert into doris_sink select id,name from cdc_mysql_source; ``` -## Use Flink CDC to access multi-table or database +## Use FlinkCDC to access multi-table or whole database example + ### grammar -``` + +```shell <FLINK_HOME>/bin/flink run \ - -c org.apache.doris.flink.tools.cdc.CdcTools\ - lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \ - mysql-sync-database \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar\ + <mysql-sync-database|oracle-sync-database> \ --database <doris-database-name> \ [--job-name <flink-job-name>] \ [--table-prefix <doris-table-prefix>] \ @@ -393,6 +424,7 @@ insert into doris_sink select id,name from cdc_mysql_source; [--including-tables <mysql-table-name|name-regular-expr>] \ [--excluding-tables <mysql-table-name|name-regular-expr>] \ --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \ + --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \ --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \ [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]] ``` @@ -403,19 +435,21 @@ insert into doris_sink select id,name from cdc_mysql_source; - **--table-suffix** Same as above, the suffix name of the Doris table. - **--including-tables** MySQL tables that need to be synchronized, you can use "|" to separate multiple tables, and support regular expressions. For example --including-tables table1|tbl.* is to synchronize table1 and all tables beginning with tbl. - **--excluding-tables** Tables that do not need to be synchronized, the usage is the same as above. -- **--mysql-conf** MySQL CDCSource configuration, for example --mysql-conf hostname=127.0.0.1 , you can find it in [here](https://ververica.github.io/flink-cdc-connectors/master /content/connectors/mysql-cdc.html) to view all configurations of MySQL-CDC, where hostname/username/password/database-name are required. -- **--sink-conf** All configurations of Doris Sink, you can view the complete configuration items [here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9). +- **--mysql-conf** MySQL CDCSource configuration, eg --mysql-conf hostname=127.0.0.1 , you can see all configuration MySQL-CDC in [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html), where hostname/username/password/database-name is required. +- **--oracle-conf** Oracle CDCSource configuration, for example --oracle-conf hostname=127.0.0.1, you can view all configurations of Oracle-CDC in [here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html), where hostname/username/password/database-name/schema-name is required. +- **--sink-conf** All configurations of Doris Sink, you can view the complete configuration items in [here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9). - **--table-conf** The configuration item of the Doris table, that is, the content contained in properties. For example --table-conf replication_num=1 ->Note: flink-sql-connector-mysql-cdc-2.3.0.jar needs to be added in the $FLINK_HOME/lib directory +>Note: When synchronizing, you need to add the corresponding Flink CDC dependencies in the $FLINK_HOME/lib directory, such as flink-sql-connector-mysql-cdc-${version}.jar, flink-sql-connector-oracle-cdc-${version}.jar -### Example -``` +### MySQL synchronization example + +```shell <FLINK_HOME>/bin/flink run \ -Dexecution.checkpointing.interval=10s\ -Dparallelism.default=1\ -c org.apache.doris.flink.tools.cdc.CdcTools\ - lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \ + lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \ mysql-sync-database\ --database test_db \ --mysql-conf hostname=127.0.0.1 \ @@ -431,7 +465,33 @@ insert into doris_sink select id,name from cdc_mysql_source; --table-conf replication_num=1 ``` +### Oracle synchronization example + +```shell +<FLINK_HOME>/bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\ + oracle-sync-database \ + --database test_db \ + --oracle-conf hostname=127.0.0.1 \ + --oracle-conf port=1521 \ + --oracle-conf username=admin \ + --oracle-conf password="password" \ + --oracle-conf database-name=XE \ + --oracle-conf schema-name=ADMIN \ + --including-tables "tbl1|tbl2" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=\ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` + ## Use FlinkCDC to update Key column + Generally, in a business database, the number is used as the primary key of the table, such as the Student table, the number (id) is used as the primary key, but with the development of the business, the number corresponding to the data may change. In this scenario, using FlinkCDC + Doris Connector to synchronize data can automatically update the data in the Doris primary key column. ### Principle @@ -537,6 +597,8 @@ At this time, it cannot be started from the checkpoint, and the expiration time This is because the concurrent import of the same library exceeds 100, which can be solved by adjusting the parameter `max_running_txn_num_per_db` of fe.conf. For details, please refer to [max_running_txn_num_per_db](https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db) +At the same time, if a task frequently modifies the label and restarts, it may also cause this error. In the 2pc scenario (Duplicate/Aggregate model), the label of each task needs to be unique, and when restarting from the checkpoint, the Flink task will actively abort the txn that has been successfully precommitted before and has not been committed. Frequently modifying the label and restarting will cause a large number of txn that have successfully precommitted to fail to be aborted, o [...] + 7. **How to ensure the order of a batch of data when Flink writes to the Uniq model?** You can add sequence column configuration to ensure that, for details, please refer to [sequence](https://doris.apache.org/zh-CN/docs/dev/data-operate/update-delete/sequence-column-manual) diff --git a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md index a5f7b2977f..ad1947aaee 100644 --- a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md +++ b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md @@ -30,12 +30,7 @@ under the License. - -Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。 - -代码库地址:https://github.com/apache/doris-flink-connector - -* 可以将 `Doris` 表映射为 `DataStream` 或者 `Table`。 +[Flink Doris Connector](https://github.com/apache/doris-flink-connector) 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。本文档介绍如何通过Flink如果通过Datastream和SQL操作Doris。 >**注意:** > @@ -52,21 +47,9 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改 | 1.3.0 | 1.16 | 1.0+ | 8 | - | | 1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 |- | -## 编译与安装 - -准备工作 - -1. 修改`custom_env.sh.tpl`文件,重命名为`custom_env.sh` +## 使用 -2. 在源码目录下执行: -`sh build.sh` -根据提示输入你需要的 flink 版本进行编译。 - -编译成功后,会在 `dist` 目录生成目标jar包,如:`flink-doris-connector-1.3.0-SNAPSHOT.jar`。 -将此文件复制到 `Flink` 的 `classpath` 中即可使用 `Flink-Doris-Connector` 。例如, `Local` 模式运行的 `Flink` ,将此文件放入 `lib/` 文件夹下。 `Yarn` 集群模式运行的 `Flink` ,则将此文件放入预部署包中。 - - -## 使用 Maven 管理 +### Maven 添加 flink-doris-connector @@ -75,7 +58,7 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改 <dependency> <groupId>org.apache.doris</groupId> <artifactId>flink-doris-connector-1.16</artifactId> - <version>1.3.0</version> + <version>1.4.0</version> </dependency> ``` @@ -85,25 +68,21 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改 2.也可从[这里](https://repo.maven.apache.org/maven2/org/apache/doris/)下载相关版本jar包。 -## 使用方法 - -Flink 读写 Doris 数据主要有两种方式 - -* SQL -* DataStream +### 编译 -### 参数配置 +编译时,可直接运行`sh build.sh`,具体可参考[这里](https://github.com/apache/doris-flink-connector/blob/master/README.md)。 -Flink Doris Connector Sink 的内部实现是通过 `Stream Load` 服务向 Doris 写入数据, 同时也支持 `Stream Load` 请求参数的配置设置,具体参数可参考[这里](../data-operate/import/import-way/stream-load-manual.md),配置方法如下: +编译成功后,会在 `dist` 目录生成目标jar包,如:`flink-doris-connector-1.5.0-SNAPSHOT.jar`。 +将此文件复制到 `Flink` 的 `classpath` 中即可使用 `Flink-Doris-Connector` 。例如, `Local` 模式运行的 `Flink` ,将此文件放入 `lib/` 文件夹下。 `Yarn` 集群模式运行的 `Flink` ,则将此文件放入预部署包中。 -* SQL 使用 `WITH` 参数 `sink.properties.` 配置 -* DataStream 使用方法`DorisExecutionOptions.builder().setStreamLoadProp(Properties)`配置 +## 使用方法 -### SQL +### 读取 -* Source +#### SQL ```sql +-- doris source CREATE TABLE flink_doris_source ( name STRING, age INT, @@ -119,11 +98,33 @@ CREATE TABLE flink_doris_source ( ); ``` -* Sink +#### DataStream + +```java +DorisOptions.Builder builder = DorisOptions.builder() + .setFenodes("FE_IP:8030") + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("password"); + +DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() + .setDorisOptions(builder.build()) + .setDorisReadOptions(DorisReadOptions.builder().build()) + .setDeserializer(new SimpleListDeserializationSchema()) + .build(); + +env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); +``` + +### 写入 + +#### SQL ```sql -- enable checkpoint SET 'execution.checkpointing.interval' = '10s'; + +-- doris sink CREATE TABLE flink_doris_sink ( name STRING, age INT, @@ -138,37 +139,16 @@ CREATE TABLE flink_doris_sink ( 'password' = 'password', 'sink.label-prefix' = 'doris_label' ); -``` - -* Insert -```sql +-- submit insert job INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source ``` -### DataStream - -* Source - -```java -DorisOptions.Builder builder = DorisOptions.builder() - .setFenodes("FE_IP:8030") - .setTableIdentifier("db.table") - .setUsername("root") - .setPassword("password"); - -DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() - .setDorisOptions(builder.build()) - .setDorisReadOptions(DorisReadOptions.builder().build()) - .setDeserializer(new SimpleListDeserializationSchema()) - .build(); - -env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print(); -``` +#### DataStream -* Sink +DorisSink是通过StreamLoad想Doris写入数据,DataStream写入时,支持不同的序列化方法 -**String 数据流** +**String 数据流(SimpleStringSerializer)** ```java // enable checkpoint @@ -183,16 +163,15 @@ dorisBuilder.setFenodes("FE_IP:8030") .setUsername("root") .setPassword("password"); - DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); -executionBuilder.setLabelPrefix("label-doris"); //streamload label prefix +executionBuilder.setLabelPrefix("label-doris") //streamload label prefix + .setDeletable(false); builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisExecutionOptions(executionBuilder.build()) .setSerializer(new SimpleStringSerializer()) //serialize according to string .setDorisOptions(dorisBuilder.build()); - //mock string source List<Tuple2<String, Integer>> data = new ArrayList<>(); data.add(new Tuple2<>("doris",1)); @@ -202,7 +181,7 @@ source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f .sinkTo(builder.build()); ``` -**RowData 数据流** +**RowData 数据流(RowDataSerializer)** ```java // enable checkpoint @@ -224,6 +203,7 @@ properties.setProperty("format", "json"); properties.setProperty("read_json_by_line", "true"); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setLabelPrefix("label-doris") //streamload label prefix + .setDeletable(false) .setStreamLoadProp(properties); //streamload params //flink rowdata‘s schema @@ -255,7 +235,8 @@ DataStream<RowData> source = env.fromElements("") source.sinkTo(builder.build()); ``` -**SchemaChange 数据流** +**SchemaChange 数据流(JsonDebeziumSchemaSerializer)** + ```java // enable checkpoint env.enableCheckpointing(10000); @@ -270,7 +251,7 @@ DorisOptions dorisOptions = DorisOptions.builder() .setPassword("").build(); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); -executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID()) +executionBuilder.setLabelPrefix("label-prefix") .setStreamLoadProp(props).setDeletable(true); DorisSink.Builder<String> builder = DorisSink.builder(); @@ -279,40 +260,96 @@ builder.setDorisReadOptions(DorisReadOptions.builder().build()) .setDorisOptions(dorisOptions) .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()); -env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//.print(); +env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") .sinkTo(builder.build()); ``` 参考: [CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java) +### Lookup Join + +```sql +CREATE TABLE fact_table ( + `id` BIGINT, + `name` STRING, + `city` STRING, + `process_time` as proctime() +) WITH ( + 'connector' = 'kafka', + ... +); + +create table dim_city( + `city` STRING, + `level` INT , + `province` STRING, + `country` STRING +) WITH ( + 'connector' = 'doris', + 'fenodes' = '127.0.0.1:8030', + 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', + 'table.identifier' = 'dim.dim_city', + 'username' = 'root', + 'password' = '' +); + +SELECT a.id, a.name, a.city, c.province, c.country,c.level +FROM fact_table a +LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c +ON a.city = c.city +``` + ## 配置 ### 通用配置项 -| Key | Default Value | Required | Comment | -| -------------------------------- | ------------------ | -------- | ------------------------------------------------------------ | -| fenodes | -- | Y | Doris FE http 地址 | -| table.identifier | -- | Y | Doris 表名,如:db.tbl | -| username | -- | Y | 访问 Doris 的用户名 | -| password | -- | Y | 访问 Doris 的密码 | -| doris.request.retries | 3 | N | 向 Doris 发送请求的重试次数 | -| doris.request.connect.timeout.ms | 30000 | N | 向 Doris 发送请求的连接超时时间 | -| doris.request.read.timeout.ms | 30000 | N | 向 Doris 发送请求的读取超时时间 | -| doris.request.query.timeout.s | 3600 | N | 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制 | -| doris.request.tablet.size | Integer. MAX_VALUE | N | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 | -| doris.batch.size | 1024 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 | -| doris.exec.mem.limit | 2147483648 | N | 单个查询的内存限制。默认为 2GB,单位为字节 | -| doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch | -| doris.deserialize.queue.size | 64 | N | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | -| doris.read.field | -- | N | 读取 Doris 表的列名列表,多列之间使用逗号分隔 | -| doris.filter.query | -- | N | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 | -| sink.label-prefix | -- | Y | Stream load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。 | -| sink.properties.* | -- | N | Stream Load 的导入参数。<br/>例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01 <br/><br/>JSON格式导入<br/>'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true' | -| sink.enable-delete | TRUE | N | 是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 | -| sink.enable-2pc | TRUE | N | 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考[这里](../data-operate/import/import-way/stream-load-manual.md)。 | -| sink.buffer-size | 1MB | N | 写数据缓存buffer大小,单位字节。不建议修改,默认配置即可 | -| sink.buffer-count | 3 | N | 写数据缓存buffer个数。不建议修改,默认配置即可 | -| sink.max-retries | 3 | N | Commit失败后的最大重试次数,默认3次 | +| Key | Default Value | Required | Comment | +| -------------------------------- | ------------- | -------- | ----------------------------------------------- | +| fenodes | -- | Y | Doris FE http 地址, 支持多个地址,使用逗号分隔 | +| table.identifier | -- | Y | Doris 表名,如:db.tbl | +| username | -- | Y | 访问 Doris 的用户名 | +| password | -- | Y | 访问 Doris 的密码 | +| doris.request.retries | 3 | N | 向 Doris 发送请求的重试次数 | +| doris.request.connect.timeout.ms | 30000 | N | 向 Doris 发送请求的连接超时时间 | +| doris.request.read.timeout.ms | 30000 | N | 向 Doris 发送请求的读取超时时间 | + +### Source 配置项 + +| Key | Default Value | Required | Comment | +| ----------------------------- | ------------------ | -------- | ------------------------------------------------------------ | +| doris.request.query.timeout.s | 3600 | N | 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制 | +| doris.request.tablet.size | Integer. MAX_VALUE | N | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 | +| doris.batch.size | 1024 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 | +| doris.exec.mem.limit | 2147483648 | N | 单个查询的内存限制。默认为 2GB,单位为字节 | +| doris.deserialize.arrow.async | FALSE | N | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch | +| doris.deserialize.queue.size | 64 | N | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | +| doris.read.field | -- | N | 读取 Doris 表的列名列表,多列之间使用逗号分隔 | +| doris.filter.query | -- | N | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。比如 age=18。 | + +### Sink 配置项 + +| Key | Default Value | Required | Comment | +| ------------------ | ------------- | -------- | ------------------------------------------------------------ | +| sink.label-prefix | -- | Y | Stream load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。 | +| sink.properties.* | -- | N | Stream Load 的导入参数。<br/>例如: 'sink.properties.column_separator' = ', ' 定义列分隔符, 'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01 <br/><br/>JSON格式导入<br/>'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true'<br/>详细参数参考[这里](../data-operate/import/import-way/stream-load-manual.md)。 | +| sink.enable-delete | TRUE | N | 是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 | +| sink.enable-2pc | TRUE | N | 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考[这里](../data-operate/import/import-way/stream-load-manual.md)。 | +| sink.buffer-size | 1MB | N | 写数据缓存buffer大小,单位字节。不建议修改,默认配置即可 | +| sink.buffer-count | 3 | N | 写数据缓存buffer个数。不建议修改,默认配置即可 | +| sink.max-retries | 3 | N | Commit失败后的最大重试次数,默认3次 | + +### Lookup Join 配置项 + +| Key | Default Value | Required | Comment | +| --------------------------------- | ------------- | -------- | ------------------------------------------ | +| jdbc-url | -- | Y | jdbc连接信息 | +| lookup.cache.max-rows | -1 | N | lookup缓存的最大行数,默认值-1,不开启缓存 | +| lookup.cache.ttl | 10s | N | lookup缓存的最大时间,默认10s | +| lookup.max-retries | 1 | N | lookup查询失败后的重试次数 | +| lookup.jdbc.async | false | N | 是否开启异步的lookup,默认false | +| lookup.jdbc.read.batch.size | 128 | N | 异步lookup下,每次查询的最大批次大小 | +| lookup.jdbc.read.batch.queue-size | 256 | N | 异步lookup时,中间缓冲队列的大小 | +| lookup.jdbc.read.thread-size | 3 | N | 每个task中lookup的jdbc线程数 | ## Doris 和 Flink 列类型映射关系 @@ -336,7 +373,7 @@ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//. | TIME | DOUBLE | | HLL | Unsupported datatype | -## 使用FlinkSQL通过CDC接入Doris示例(支持Insert/Update/Delete事件) +## 使用FlinkSQL通过CDC接入Doris示例 ```sql -- enable checkpoint SET 'execution.checkpointing.interval' = '10s'; @@ -355,7 +392,7 @@ CREATE TABLE cdc_mysql_source ( 'table-name' = 'table' ); --- 支持删除事件同步(sink.enable-delete='true'),需要 Doris 表开启批量删除功能 +-- 支持同步insert/update/delete事件 CREATE TABLE doris_sink ( id INT, name STRING @@ -368,7 +405,7 @@ WITH ( 'password' = '', 'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true', - 'sink.enable-delete' = 'true', + 'sink.enable-delete' = 'true', -- 同步删除事件 'sink.label-prefix' = 'doris_label' ); @@ -377,11 +414,11 @@ insert into doris_sink select id,name from cdc_mysql_source; ## 使用FlinkCDC接入多表或整库示例 ### 语法 -``` +```shell <FLINK_HOME>/bin/flink run \ -c org.apache.doris.flink.tools.cdc.CdcTools \ lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \ - mysql-sync-database \ + <mysql-sync-database|oracle-sync-database> \ --database <doris-database-name> \ [--job-name <flink-job-name>] \ [--table-prefix <doris-table-prefix>] \ @@ -389,6 +426,7 @@ insert into doris_sink select id,name from cdc_mysql_source; [--including-tables <mysql-table-name|name-regular-expr>] \ [--excluding-tables <mysql-table-name|name-regular-expr>] \ --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \ + --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \ --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \ [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]] ``` @@ -400,13 +438,14 @@ insert into doris_sink select id,name from cdc_mysql_source; - **--including-tables** 需要同步的MySQL表,可以使用"|" 分隔多个表,并支持正则表达式。 比如--including-tables table1|tbl.*就是同步table1和所有以tbl开头的表。 - **--excluding-tables** 不需要同步的表,用法同上。 - **--mysql-conf** MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1 ,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html)查看所有配置MySQL-CDC,其中hostname/username/password/database-name 是必需的。 +- **--oracle-conf** Oracle CDCSource 配置,例如--oracle-conf hostname=127.0.0.1 ,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html)查看所有配置Oracle-CDC,其中hostname/username/password/database-name/schema-name 是必需的。 - **--sink-conf** Doris Sink 的所有配置,可以在[这里](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9)查看完整的配置项。 - **--table-conf** Doris表的配置项,即properties中包含的内容。 例如 --table-conf replication_num=1 ->注:需要在$FLINK_HOME/lib 目录下添加flink-sql-connector-mysql-cdc-2.3.0.jar +>注:同步时需要在$FLINK_HOME/lib 目录下添加对应的Flink CDC依赖,比如 flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar -### 示例 -``` +### MySQL同步示例 +```shell <FLINK_HOME>/bin/flink run \ -Dexecution.checkpointing.interval=10s \ -Dparallelism.default=1 \ @@ -427,8 +466,33 @@ insert into doris_sink select id,name from cdc_mysql_source; --table-conf replication_num=1 ``` +### Oracle同步示例 + +```shell +<FLINK_HOME>/bin/flink run \ + -Dexecution.checkpointing.interval=10s \ + -Dparallelism.default=1 \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \ + oracle-sync-database \ + --database test_db \ + --oracle-conf hostname=127.0.0.1 \ + --oracle-conf port=1521 \ + --oracle-conf username=admin \ + --oracle-conf password="password" \ + --oracle-conf database-name=XE \ + --oracle-conf schema-name=ADMIN \ + --including-tables "tbl1|tbl2" \ + --sink-conf fenodes=127.0.0.1:8030 \ + --sink-conf username=root \ + --sink-conf password=\ + --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \ + --sink-conf sink.label-prefix=label \ + --table-conf replication_num=1 +``` ## 使用FlinkCDC更新Key列 + 一般在业务数据库中,会使用编号来作为表的主键,比如Student表,会使用编号(id)来作为主键,但是随着业务的发展,数据对应的编号有可能是会发生变化的。 在这种场景下,使用FlinkCDC + Doris Connector同步数据,便可以自动更新Doris主键列的数据。 ### 原理 @@ -532,7 +596,9 @@ Exactly-Once场景下,Flink Job重启时必须从最新的Checkpoint/Savepoint 6. **errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100** -这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 `max_running_txn_num_per_db` 来解决。具体可参考 [max_running_txn_num_per_db](https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db) +这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 `max_running_txn_num_per_db` 来解决,具体可参考 [max_running_txn_num_per_db](https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db)。 + +同时,一个任务频繁修改label重启,也可能会导致这个错误。2pc场景下(Duplicate/Aggregate模型),每个任务的label需要唯一,并且从checkpoint重启时,flink任务才会主动abort掉之前已经precommit成功,没有commit的txn,频繁修改label重启,会导致大量precommit成功的txn无法被abort,占用事务。在Unique模型下也可关闭2pc,可以实现幂等写入。 7. **Flink写入Uniq模型时,如何保证一批数据的有序性?** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org