This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e8ae7ad16 [doc](flink-connector)improve flink connector doc (#22143)
1e8ae7ad16 is described below

commit 1e8ae7ad16fee79c2fd01285ecdb8c73011e679e
Author: wudi <676366...@qq.com>
AuthorDate: Tue Jul 25 15:58:35 2023 +0800

    [doc](flink-connector)improve flink connector doc (#22143)
---
 docs/en/docs/ecosystem/flink-doris-connector.md    | 446 ++++++++++++---------
 docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 264 +++++++-----
 2 files changed, 419 insertions(+), 291 deletions(-)

diff --git a/docs/en/docs/ecosystem/flink-doris-connector.md 
b/docs/en/docs/ecosystem/flink-doris-connector.md
index 7a07793ec4..b8f385469d 100644
--- a/docs/en/docs/ecosystem/flink-doris-connector.md
+++ b/docs/en/docs/ecosystem/flink-doris-connector.md
@@ -28,12 +28,7 @@ under the License.
 
 
 
-
-The Flink Doris Connector can support operations (read, insert, modify, 
delete) data stored in Doris through Flink.
-
-Github: https://github.com/apache/doris-flink-connector
-
-* `Doris` table can be mapped to `DataStream` or `Table`.
+* [Flink Doris Connector](https://github.com/apache/doris-flink-connector) can 
support data stored in Doris through Flink operations (read, insert, modify, 
delete). This document introduces how to operate Doris through Datastream and 
SQL through Flink.
 
 >**Note:**
 >
@@ -50,124 +45,108 @@ Github: https://github.com/apache/doris-flink-connector
 | 1.3.0     | 1.16  | 1.0+   | 8    | -         |
 | 1.4.0     | 1.15,1.16,1.17  | 1.0+   | 8   |- |
 
-## Build and Install
-
-Ready to work
-
-1. Modify the `custom_env.sh.tpl` file and rename it to `custom_env.sh`
+## USE
 
-2. Execute following command in source dir:
-`sh build.sh`
-Enter the flink version you need to compile according to the prompt.
+### Maven
 
-After the compilation is successful, the target jar package will be generated 
in the `dist` directory, such as: `flink-doris-connector-1.3.0-SNAPSHOT.jar`.
-Copy this file to `classpath` in `Flink` to use `Flink-Doris-Connector`. For 
example, `Flink` running in `Local` mode, put this file in the `lib/` folder. 
`Flink` running in `Yarn` cluster mode, put this file in the pre-deployment 
package.
-
-
-## Using Maven
-
-Add flink-doris-connector Maven dependencies
+Add flink-doris-connector
 
 ```
 <!-- flink-doris-connector -->
 <dependency>
-  <groupId>org.apache.doris</groupId>
-  <artifactId>flink-doris-connector-1.16</artifactId>
-  <version>1.3.0</version>
-</dependency>  
+   <groupId>org.apache.doris</groupId>
+   <artifactId>flink-doris-connector-1.16</artifactId>
+   <version>1.4.0</version>
+</dependency>
 ```
 
-**Notes**
+**Remark**
 
-1. Please replace the corresponding Connector and Flink dependency versions 
according to different Flink versions. Version 1.3.0 only supports Flink1.16
+1. Please replace the corresponding Connector and Flink dependent versions 
according to different Flink versions.
 
 2. You can also download the relevant version jar package from 
[here](https://repo.maven.apache.org/maven2/org/apache/doris/).
 
-## How to use
-
-There are three ways to use Flink Doris Connector. 
+### compile
 
-* SQL
-* DataStream
+When compiling, you can run `sh build.sh` directly. For details, please refer 
to 
[here](https://github.com/apache/doris-flink-connector/blob/master/README.md).
 
-### Parameters Configuration
+After the compilation is successful, the target jar package will be generated 
in the `dist` directory, such as: `flink-doris-connector-1.5.0-SNAPSHOT.jar`.
+Copy this file to `classpath` of `Flink` to use `Flink-Doris-Connector`. For 
example, `Flink` running in `Local` mode, put this file in the `lib/` folder. 
`Flink` running in `Yarn` cluster mode, put this file into the pre-deployment 
package.
 
-Flink Doris Connector Sink writes data to Doris by the `Stream load`, and also 
supports the configurations of `Stream load`, For specific parameters, please 
refer to [here](../data-operate/import/import-way/stream-load-manual.md).
+## Instructions
 
-* SQL  configured by `sink.properties.` in the `WITH`
-* DataStream configured by 
`DorisExecutionOptions.builder().setStreamLoadProp(Properties)`
+### read
 
-
-### SQL
-
-* Source
+####SQL
 
 ```sql
+-- doris source
 CREATE TABLE flink_doris_source (
-    name STRING,
-    age INT,
-    price DECIMAL(5,2),
-    sale DOUBLE
-    ) 
-    WITH (
-      'connector' = 'doris',
-      'fenodes' = 'FE_IP:8030',
-      'table.identifier' = 'database.table',
-      'username' = 'root',
-      'password' = 'password'
+     name STRING,
+     age INT,
+     price DECIMAL(5,2),
+     sale DOUBLE
+     )
+     WITH (
+       'connector' = 'doris',
+       'fenodes' = 'FE_IP:8030',
+       'table.identifier' = 'database.table',
+       'username' = 'root',
+       'password' = 'password'
 );
 ```
 
-* Sink
+####DataStream
 
-```sql
--- enable checkpoint
-SET 'execution.checkpointing.interval' = '10s';
-CREATE TABLE flink_doris_sink (
-    name STRING,
-    age INT,
-    price DECIMAL(5,2),
-    sale DOUBLE
-    ) 
-    WITH (
-      'connector' = 'doris',
-      'fenodes' = 'FE_IP:8030',
-      'table.identifier' = 'db.table',
-      'username' = 'root',
-      'password' = 'password',
-      'sink.label-prefix' = 'doris_label'
-);
-```
+```java
+DorisOptions.Builder builder = DorisOptions.builder()
+         .setFenodes("FE_IP:8030")
+         .setTableIdentifier("db.table")
+         .setUsername("root")
+         .setPassword("password");
 
-* Insert
+DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
+         .setDorisOptions(builder.build())
+         .setDorisReadOptions(DorisReadOptions.builder().build())
+         .setDeserializer(new SimpleListDeserializationSchema())
+         .build();
 
-```sql
-INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
+env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
source").print();
 ```
 
-### DataStream
+### write
 
-* Source
+####SQL
 
-```java
-DorisOptions.Builder builder = DorisOptions.builder()
-        .setFenodes("FE_IP:8030")
-        .setTableIdentifier("db.table")
-        .setUsername("root")
-        .setPassword("password");
+```sql
+--enable checkpoint
+SET 'execution.checkpointing.interval' = '10s';
 
-DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
-        .setDorisOptions(builder.build())
-        .setDorisReadOptions(DorisReadOptions.builder().build())
-        .setDeserializer(new SimpleListDeserializationSchema())
-        .build();
+-- doris sink
+CREATE TABLE flink_doris_sink (
+     name STRING,
+     age INT,
+     price DECIMAL(5,2),
+     sale DOUBLE
+     )
+     WITH (
+       'connector' = 'doris',
+       'fenodes' = 'FE_IP:8030',
+       'table.identifier' = 'db.table',
+       'username' = 'root',
+       'password' = 'password',
+       'sink.label-prefix' = 'doris_label'
+);
 
-env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
source").print();
+-- submit insert job
+INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
 ```
 
-* Sink
+####DataStream
 
-**String Stream**
+DorisSink writes data to Doris through StreamLoad, and DataStream supports 
different serialization methods when writing
+
+**String data stream (SimpleStringSerializer)**
 
 ```java
 // enable checkpoint
@@ -178,37 +157,29 @@ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
 DorisSink.Builder<String> builder = DorisSink.builder();
 DorisOptions.Builder dorisBuilder = DorisOptions.builder();
 dorisBuilder.setFenodes("FE_IP:8030")
-        .setTableIdentifier("db.table")
-        .setUsername("root")
-        .setPassword("password");
-
-Properties properties = new Properties();
-/**
-json format to streamload
-properties.setProperty("format", "json");
-properties.setProperty("read_json_by_line", "true");
-**/
+         .setTableIdentifier("db.table")
+         .setUsername("root")
+         .setPassword("password");
 
-DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
+DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
 executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
-                .setStreamLoadProp(properties); 
+                 .setDeletable(false);
 
 builder.setDorisReadOptions(DorisReadOptions.builder().build())
-        .setDorisExecutionOptions(executionBuilder.build())
-        .setSerializer(new SimpleStringSerializer()) //serialize according to 
string 
-        .setDorisOptions(dorisBuilder.build());
-
+         .setDorisExecutionOptions(executionBuilder.build())
+         .setSerializer(new SimpleStringSerializer()) //serialize according to 
string
+         .setDorisOptions(dorisBuilder.build());
 
 //mock string source
 List<Tuple2<String, Integer>> data = new ArrayList<>();
 data.add(new Tuple2<>("doris",1));
-DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
+DataStreamSource<Tuple2<String, Integer>> source = env. fromCollection(data);
 
 source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + 
t.f1)
-      .sinkTo(builder.build());
+       .sinkTo(builder.build());
 ```
 
-**RowData Stream**
+**RowData data stream (RowDataSerializer)**
 
 ```java
 // enable checkpoint
@@ -220,105 +191,163 @@ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
 DorisSink.Builder<RowData> builder = DorisSink.builder();
 DorisOptions.Builder dorisBuilder = DorisOptions.builder();
 dorisBuilder.setFenodes("FE_IP:8030")
-        .setTableIdentifier("db.table")
-        .setUsername("root")
-        .setPassword("password");
+         .setTableIdentifier("db.table")
+         .setUsername("root")
+         .setPassword("password");
 
 // json format to streamload
 Properties properties = new Properties();
 properties.setProperty("format", "json");
 properties.setProperty("read_json_by_line", "true");
-DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
+DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
 executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
-                .setStreamLoadProp(properties); //streamload params
+                 .setDeletable(false)
+                 .setStreamLoadProp(properties); //streamload params
 
-//flink rowdata‘s schema
+//flink rowdata's schema
 String[] fields = {"city", "longitude", "latitude", "destroy_date"};
 DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), 
DataTypes.DOUBLE(), DataTypes.DATE()};
 
 builder.setDorisReadOptions(DorisReadOptions.builder().build())
-        .setDorisExecutionOptions(executionBuilder.build())
-        .setSerializer(RowDataSerializer.builder()    //serialize according to 
rowdata 
-                           .setFieldNames(fields)
-                           .setType("json")           //json format
-                           .setFieldType(types).build())
-        .setDorisOptions(dorisBuilder.build());
+         .setDorisExecutionOptions(executionBuilder.build())
+         .setSerializer(RowDataSerializer.builder() //serialize according to 
rowdata
+                            .setFieldNames(fields)
+                            .setType("json") //json format
+                            .setFieldType(types).build())
+         .setDorisOptions(dorisBuilder.build());
 
 //mock rowdata source
-DataStream<RowData> source = env.fromElements("")
-    .map(new MapFunction<String, RowData>() {
-        @Override
-        public RowData map(String value) throws Exception {
-            GenericRowData genericRowData = new GenericRowData(4);
-            genericRowData.setField(0, StringData.fromString("beijing"));
-            genericRowData.setField(1, 116.405419);
-            genericRowData.setField(2, 39.916927);
-            genericRowData.setField(3, LocalDate.now().toEpochDay());
-            return genericRowData;
-        }
-    });
-
-source.sinkTo(builder.build());
+DataStream<RowData> source = env. fromElements("")
+     .map(new MapFunction<String, RowData>() {
+         @Override
+         public RowData map(String value) throws Exception {
+             GenericRowData genericRowData = new GenericRowData(4);
+             genericRowData.setField(0, StringData.fromString("beijing"));
+             genericRowData.setField(1, 116.405419);
+             genericRowData.setField(2, 39.916927);
+             genericRowData.setField(3, LocalDate.now().toEpochDay());
+             return genericRowData;
+         }
+     });
+
+source. sinkTo(builder. build());
 ```
 
-**SchemaChange Stream**
+**SchemaChange data stream (JsonDebeziumSchemaSerializer)**
+
 ```java
 // enable checkpoint
 env.enableCheckpointing(10000);
 
 Properties props = new Properties();
-props.setProperty("format", "json");
+props. setProperty("format", "json");
 props.setProperty("read_json_by_line", "true");
-DorisOptions dorisOptions = DorisOptions.builder()
-        .setFenodes("127.0.0.1:8030")
-        .setTableIdentifier("test.t1")
-        .setUsername("root")
-        .setPassword("").build();
+DorisOptions dorisOptions = DorisOptions. builder()
+         .setFenodes("127.0.0.1:8030")
+         .setTableIdentifier("test.t1")
+         .setUsername("root")
+         .setPassword("").build();
 
-DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
-executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID())
-        .setStreamLoadProp(props).setDeletable(true);
+DorisExecutionOptions.Builder executionBuilder = 
DorisExecutionOptions.builder();
+executionBuilder.setLabelPrefix("label-prefix")
+         .setStreamLoadProp(props).setDeletable(true);
 
 DorisSink.Builder<String> builder = DorisSink.builder();
 builder.setDorisReadOptions(DorisReadOptions.builder().build())
-        .setDorisExecutionOptions(executionBuilder.build())
-        .setDorisOptions(dorisOptions)
-        
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
+         .setDorisExecutionOptions(executionBuilder.build())
+         .setDorisOptions(dorisOptions)
+         
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
 
-env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL 
Source")//.print();
-        .sinkTo(builder.build());
+env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
+         .sinkTo(builder.build());
 ```
-refer: 
[CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java)
-
-
-### General
-
-| Key                              | Default Value     | Required | Comment    
                                                  |
-| -------------------------------- | ----------------- | 
------------------------------------------------------------ | 
-------------------------------- |
-| fenodes                    | --                | Y               | Doris FE 
http address, support multiple addresses, separated by commas            |
-| table.identifier           | --                | Y               | Doris 
table identifier, eg, db1.tbl1                                 |
-| username                            | --            | Y           | Doris 
username                                            |
-| password                        | --            | Y           | Doris 
password                                              |
-| doris.request.retries            | 3                 | N                | 
Number of retries to send requests to Doris                                    |
-| doris.request.connect.timeout.ms | 30000             | N            | 
Connection timeout for sending requests to Doris                                
|
-| doris.request.read.timeout.ms    | 30000             | N            | Read 
timeout for sending request to Doris                                |
-| doris.request.query.timeout.s    | 3600              | N             | Query 
the timeout time of doris, the default is 1 hour, -1 means no timeout limit     
        |
-| doris.request.tablet.size        | Integer.MAX_VALUE | N | The number of 
Doris Tablets corresponding to an Partition. The smaller this value is set, the 
more partitions will be generated. This will increase the parallelism on the 
flink side, but at the same time will cause greater pressure on Doris. |
-| doris.batch.size                 | 1024              | N             | The 
maximum number of rows to read data from BE at one time. Increasing this value 
can reduce the number of connections between Flink and Doris. Thereby reducing 
the extra time overhead caused by network delay. |
-| doris.exec.mem.limit             | 2147483648        | N       | Memory 
limit for a single query. The default is 2GB, in bytes.                     |
-| doris.deserialize.arrow.async    | false             | N            | 
Whether to support asynchronous conversion of Arrow format to RowBatch required 
for flink-doris-connector iteration           |
-| doris.deserialize.queue.size     | 64                | N               | 
Asynchronous conversion of the internal processing queue in Arrow format takes 
effect when doris.deserialize.arrow.async is true        |
-| doris.read.field            | --            | N           | List of column 
names in the Doris table, separated by commas                  |
-| doris.filter.query          | --            | N           | Filter 
expression of the query, which is transparently transmitted to Doris. Doris 
uses this expression to complete source-side data filtering. |
-| sink.label-prefix | -- | Y | The label prefix used by stream load imports. 
In the 2pc scenario, global uniqueness is required to ensure the EOS semantics 
of Flink. |
-| sink.properties.*     | --               | N              | The stream load 
parameters.<br /> <br /> eg:<br /> sink.properties.column_separator' = ','<br 
/> <br /> Setting 'sink.properties.escape_delimiters' = 'true' if you want to 
use a control char as a separator, so that such as '\\x01' will translate to 
binary 0x01<br /><br />Support JSON format import, you need to enable both 
'sink.properties.format' ='json' and 'sink.properties.read_json_by_line' 
='true' |
-| sink.enable-delete     | true               | N              | Whether to 
enable deletion. This option requires Doris table to enable batch delete 
function (0.15+ version is enabled by default), and only supports Uniq model.|
-| sink.enable-2pc                  | true              | N        | Whether to 
enable two-phase commit (2pc), the default is true, to ensure Exactly-Once 
semantics. For two-phase commit, please refer to 
[here](../data-operate/import/import-way/stream-load-manual.md). |
-| sink.max-retries                 | 3                 | N        | In the 2pc 
scenario, the number of retries after the commit phase fails.                   
                                                                                
                                                                                
                                                      |
-| sink.buffer-size                 | 1048576(1MB)       | N        | Write 
data cache buffer size, in bytes. It is not recommended to modify, the default 
configuration is sufficient.                                                    
                                                                                
                                                                                
             |
-| sink.buffer-count                | 3                  | N        | The 
number of write data cache buffers, it is not recommended to modify, the 
default configuration is sufficient.                                            
                                                                                
   
 
+Reference: 
[CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java)
 
+### Lookup Join
+
+```sql
+CREATE TABLE fact_table (
+  `id` BIGINT,
+  `name` STRING,
+  `city` STRING,
+  `process_time` as proctime()
+) WITH (
+  'connector' = 'kafka',
+  ...
+);
+
+create table dim_city(
+  `city` STRING,
+  `level` INT ,
+  `province` STRING,
+  `country` STRING
+) WITH (
+  'connector' = 'doris',
+  'fenodes' = '127.0.0.1:8030',
+  'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
+  'table.identifier' = 'dim.dim_city',
+  'username' = 'root',
+  'password' = ''
+);
+
+SELECT a.id, a.name, a.city, c.province, c.country,c.level 
+FROM fact_table a
+LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
+ON a.city = c.city
+```
+
+## configuration
+
+### General configuration items
+
+| Key                              | Default Value | Required | Comment        
                                              |
+| -------------------------------- | ------------- | -------- | 
------------------------------------------------------------ |
+| fenodes                          | --            | Y        | Doris FE http 
address, multiple addresses are supported, separated by commas |
+| table.identifier                 | --            | Y        | Doris table 
name, such as: db.tbl                            |
+| username                         | --            | Y        | username to 
access Doris                                     |
+| password                         | --            | Y        | Password to 
access Doris                                     |
+| doris.request.retries            | 3             | N        | Number of 
retries to send requests to Doris                  |
+| doris.request.connect.timeout.ms | 30000         | N        | Connection 
timeout for sending requests to Doris             |
+| doris.request.read.timeout.ms    | 30000         | N        | Read timeout 
for sending requests to Doris                   |
+
+### Source configuration item
+
+| Key                           | Default Value      | Required | Comment      
                                                |
+| ----------------------------- | ------------------ | -------- | 
------------------------------------------------------------ |
+| doris.request.query.timeout.s | 3600               | N        | The timeout 
time for querying Doris, the default value is 1 hour, -1 means no timeout limit 
|
+| doris.request.tablet.size     | Integer. MAX_VALUE | N        | The number 
of Doris Tablets corresponding to a Partition. The smaller this value is set, 
the more Partitions will be generated. This improves the parallelism on the 
Flink side, but at the same time puts more pressure on Doris. |
+| doris.batch.size              | 1024               | N        | The maximum 
number of rows to read data from BE at a time. Increasing this value reduces 
the number of connections established between Flink and Doris. Thereby reducing 
the additional time overhead caused by network delay. |
+| doris.exec.mem.limit          | 2147483648         | N        | Memory limit 
for a single query. The default is 2GB, in bytes |
+| doris.deserialize.arrow.async | FALSE              | N        | Whether to 
support asynchronous conversion of Arrow format to RowBatch needed for 
flink-doris-connector iterations |
+| doris.deserialize.queue.size  | 64                 | N        | Asynchronous 
conversion of internal processing queue in Arrow format, effective when 
doris.deserialize.arrow.async is true |
+| doris.read.field              | --                 | N        | Read the 
list of column names of the Doris table, separated by commas |
+| doris.filter.query            | --                 | N        | The 
expression to filter the read data, this expression is transparently passed to 
Doris. Doris uses this expression to complete source-side data filtering. For 
example age=18. |
+
+### Sink configuration items
+
+| Key                | Default Value | Required | Comment                      
                                |
+| ------------------ | ------------- | -------- | 
------------------------------------------------------------ |
+| sink.label-prefix  | --            | Y        | The label prefix used by 
Stream load import. In the 2pc scenario, global uniqueness is required to 
ensure Flink's EOS semantics. |
+| sink.properties.*  | --            | N        | Import parameters for Stream 
Load. <br/>For example: 'sink.properties.column_separator' = ', ' defines 
column delimiters, 'sink.properties.escape_delimiters' = 'true' special 
characters as delimiters, '\x01' will be converted to binary 0x01 
<br/><br/>JSON format import<br/>'sink.properties.format' = 'json' 
'sink.properties. read_json_by_line' = 'true'<br/>Detailed parameters refer to 
[here](../data-operate/import/import-way/stream-load-ma [...]
+| sink.enable-delete | TRUE          | N        | Whether to enable delete. 
This option requires the Doris table to enable the batch delete function (Doris 
0.15+ version is enabled by default), and only supports the Unique model. |
+| sink.enable-2pc    | TRUE          | N        | Whether to enable two-phase 
commit (2pc), the default is true, to ensure Exactly-Once semantics. For 
two-phase commit, please refer to 
[here](../data-operate/import/import-way/stream-load-manual.md). |
+| sink.buffer-size   | 1MB           | N        | The size of the write data 
cache buffer, in bytes. It is not recommended to modify, the default 
configuration is enough |
+| sink.buffer-count  | 3             | N        | The number of write data 
buffers. It is not recommended to modify, the default configuration is enough |
+| sink.max-retries   | 3             | N        | Maximum number of retries 
after Commit failure, default 3    |
+
+### Lookup Join configuration item
+
+| Key                               | Default Value | Required | Comment       
                                               |
+| --------------------------------- | ------------- | -------- | 
------------------------------------------------------------ |
+| jdbc-url                          | --            | Y        | jdbc 
connection information                                  |
+| lookup.cache.max-rows             | -1            | N        | The maximum 
number of rows in the lookup cache, the default value is -1, and the cache is 
not enabled |
+| lookup.cache.ttl                  | 10s           | N        | The maximum 
time of lookup cache, the default is 10s         |
+| lookup.max-retries                | 1             | N        | The number of 
retries after a lookup query fails             |
+| lookup.jdbc.async                 | false         | N        | Whether to 
enable asynchronous lookup, the default is false  |
+| lookup.jdbc.read.batch.size       | 128           | N        | Under 
asynchronous lookup, the maximum batch size for each query |
+| lookup.jdbc.read.batch.queue-size | 256           | N        | The size of 
the intermediate buffer queue during asynchronous lookup |
+| lookup.jdbc.read.thread-size      | 3             | N        | The number of 
jdbc threads for lookup in each task           |
 
 ## Doris & Flink Column Type Mapping
 
@@ -342,7 +371,7 @@ refer: 
[CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/
 | TIME       | DOUBLE             |
 | HLL        | Unsupported datatype             |
 
-## An example of using Flink CDC to access Doris (supports 
insert/update/delete events)
+## An example of using Flink CDC to access Doris 
 ```sql
 SET 'execution.checkpointing.interval' = '10s';
 CREATE TABLE cdc_mysql_source (
@@ -359,7 +388,7 @@ CREATE TABLE cdc_mysql_source (
  'table-name' = 'table'
 );
 
--- Support delete event synchronization (sink.enable-delete='true'), requires 
Doris table to enable batch delete function
+-- Support synchronous insert/update/delete events
 CREATE TABLE doris_sink (
 id INT,
 name STRING
@@ -372,20 +401,22 @@ WITH (
   'password' = '',
   'sink.properties.format' = 'json',
   'sink.properties.read_json_by_line' = 'true',
-  'sink.enable-delete' = 'true',
+  'sink.enable-delete' = 'true', -- Synchronize delete events
   'sink.label-prefix' = 'doris_label'
 );
 
 insert into doris_sink select id,name from cdc_mysql_source;
 ```
 
-## Use Flink CDC to access multi-table or database
+## Use FlinkCDC to access multi-table or whole database example
+
 ### grammar
-```
+
+```shell
 <FLINK_HOME>/bin/flink run \
-     -c org.apache.doris.flink.tools.cdc.CdcTools\
-     lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
-     mysql-sync-database \
+     -c org.apache.doris.flink.tools.cdc.CdcTools \
+     lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar\
+     <mysql-sync-database|oracle-sync-database> \
      --database <doris-database-name> \
      [--job-name <flink-job-name>] \
      [--table-prefix <doris-table-prefix>] \
@@ -393,6 +424,7 @@ insert into doris_sink select id,name from cdc_mysql_source;
      [--including-tables <mysql-table-name|name-regular-expr>] \
      [--excluding-tables <mysql-table-name|name-regular-expr>] \
      --mysql-conf <mysql-cdc-source-conf> [--mysql-conf 
<mysql-cdc-source-conf> ...] \
+     --oracle-conf <oracle-cdc-source-conf> [--oracle-conf 
<oracle-cdc-source-conf> ...] \
      --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
      [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
 ```
@@ -403,19 +435,21 @@ insert into doris_sink select id,name from 
cdc_mysql_source;
 - **--table-suffix** Same as above, the suffix name of the Doris table.
 - **--including-tables** MySQL tables that need to be synchronized, you can 
use "|" to separate multiple tables, and support regular expressions. For 
example --including-tables table1|tbl.* is to synchronize table1 and all tables 
beginning with tbl.
 - **--excluding-tables** Tables that do not need to be synchronized, the usage 
is the same as above.
-- **--mysql-conf** MySQL CDCSource configuration, for example --mysql-conf 
hostname=127.0.0.1 , you can find it in 
[here](https://ververica.github.io/flink-cdc-connectors/master 
/content/connectors/mysql-cdc.html) to view all configurations of MySQL-CDC, 
where hostname/username/password/database-name are required.
-- **--sink-conf** All configurations of Doris Sink, you can view the complete 
configuration items 
[here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9).
+- **--mysql-conf** MySQL CDCSource configuration, eg --mysql-conf 
hostname=127.0.0.1 , you can see all configuration MySQL-CDC in 
[here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html),
 where hostname/username/password/database-name is required.
+- **--oracle-conf** Oracle CDCSource configuration, for example --oracle-conf 
hostname=127.0.0.1, you can view all configurations of Oracle-CDC in 
[here](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html),
 where hostname/username/password/database-name/schema-name is required.
+- **--sink-conf** All configurations of Doris Sink, you can view the complete 
configuration items in 
[here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9).
 - **--table-conf** The configuration item of the Doris table, that is, the 
content contained in properties. For example --table-conf replication_num=1
 
->Note: flink-sql-connector-mysql-cdc-2.3.0.jar needs to be added in the 
$FLINK_HOME/lib directory
+>Note: When synchronizing, you need to add the corresponding Flink CDC 
dependencies in the $FLINK_HOME/lib directory, such as 
flink-sql-connector-mysql-cdc-${version}.jar, 
flink-sql-connector-oracle-cdc-${version}.jar
 
-### Example
-```
+### MySQL synchronization example
+
+```shell
 <FLINK_HOME>/bin/flink run \
      -Dexecution.checkpointing.interval=10s\
      -Dparallelism.default=1\
      -c org.apache.doris.flink.tools.cdc.CdcTools\
-     lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
+     lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
      mysql-sync-database\
      --database test_db \
      --mysql-conf hostname=127.0.0.1 \
@@ -431,7 +465,33 @@ insert into doris_sink select id,name from 
cdc_mysql_source;
      --table-conf replication_num=1
 ```
 
+### Oracle synchronization example
+
+```shell
+<FLINK_HOME>/bin/flink run \
+      -Dexecution.checkpointing.interval=10s \
+      -Dparallelism.default=1 \
+      -c org.apache.doris.flink.tools.cdc.CdcTools \
+      ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\
+      oracle-sync-database \
+      --database test_db \
+      --oracle-conf hostname=127.0.0.1 \
+      --oracle-conf port=1521 \
+      --oracle-conf username=admin \
+      --oracle-conf password="password" \
+      --oracle-conf database-name=XE \
+      --oracle-conf schema-name=ADMIN \
+      --including-tables "tbl1|tbl2" \
+      --sink-conf fenodes=127.0.0.1:8030 \
+      --sink-conf username=root \
+      --sink-conf password=\
+      --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
+      --sink-conf sink.label-prefix=label \
+      --table-conf replication_num=1
+```
+
 ## Use FlinkCDC to update Key column
+
 Generally, in a business database, the number is used as the primary key of 
the table, such as the Student table, the number (id) is used as the primary 
key, but with the development of the business, the number corresponding to the 
data may change.
 In this scenario, using FlinkCDC + Doris Connector to synchronize data can 
automatically update the data in the Doris primary key column.
 ### Principle
@@ -537,6 +597,8 @@ At this time, it cannot be started from the checkpoint, and 
the expiration time
 
 This is because the concurrent import of the same library exceeds 100, which 
can be solved by adjusting the parameter `max_running_txn_num_per_db` of 
fe.conf. For details, please refer to 
[max_running_txn_num_per_db](https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db)
 
+At the same time, if a task frequently modifies the label and restarts, it may 
also cause this error. In the 2pc scenario (Duplicate/Aggregate model), the 
label of each task needs to be unique, and when restarting from the checkpoint, 
the Flink task will actively abort the txn that has been successfully 
precommitted before and has not been committed. Frequently modifying the label 
and restarting will cause a large number of txn that have successfully 
precommitted to fail to be aborted, o [...]
+
 7. **How to ensure the order of a batch of data when Flink writes to the Uniq 
model?**
 
 You can add sequence column configuration to ensure that, for details, please 
refer to 
[sequence](https://doris.apache.org/zh-CN/docs/dev/data-operate/update-delete/sequence-column-manual)
diff --git a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md 
b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
index a5f7b2977f..ad1947aaee 100644
--- a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
+++ b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
@@ -30,12 +30,7 @@ under the License.
 
 
 
-
-Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。
-
-代码库地址:https://github.com/apache/doris-flink-connector
-
-* 可以将 `Doris` 表映射为 `DataStream` 或者 `Table`。
+[Flink Doris Connector](https://github.com/apache/doris-flink-connector) 
可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 
中存储的数据。本文档介绍如何通过Flink如果通过Datastream和SQL操作Doris。
 
 >**注意:**
 >
@@ -52,21 +47,9 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改
 | 1.3.0     | 1.16  | 1.0+   | 8    | -         |
 | 1.4.0     | 1.15,1.16,1.17  | 1.0+   | 8   |- |
 
-## 编译与安装
-
-准备工作
-
-1. 修改`custom_env.sh.tpl`文件,重命名为`custom_env.sh`
+## 使用
 
-2. 在源码目录下执行:
-`sh build.sh`
-根据提示输入你需要的 flink 版本进行编译。
-
-编译成功后,会在 `dist` 目录生成目标jar包,如:`flink-doris-connector-1.3.0-SNAPSHOT.jar`。
-将此文件复制到 `Flink` 的 `classpath` 中即可使用 `Flink-Doris-Connector` 。例如, `Local` 模式运行的 
`Flink` ,将此文件放入 `lib/` 文件夹下。 `Yarn` 集群模式运行的 `Flink` ,则将此文件放入预部署包中。
-
-
-## 使用 Maven 管理
+### Maven
 
 添加 flink-doris-connector
 
@@ -75,7 +58,7 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改
 <dependency>
   <groupId>org.apache.doris</groupId>
   <artifactId>flink-doris-connector-1.16</artifactId>
-  <version>1.3.0</version>
+  <version>1.4.0</version>
 </dependency>  
 ```
 
@@ -85,25 +68,21 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改
 
 2.也可从[这里](https://repo.maven.apache.org/maven2/org/apache/doris/)下载相关版本jar包。 
 
-## 使用方法
-
-Flink 读写 Doris 数据主要有两种方式
-
-* SQL
-* DataStream
+### 编译
 
-### 参数配置
+编译时,可直接运行`sh 
build.sh`,具体可参考[这里](https://github.com/apache/doris-flink-connector/blob/master/README.md)。
 
-Flink Doris Connector Sink 的内部实现是通过 `Stream Load` 服务向 Doris 写入数据, 同时也支持 
`Stream Load` 
请求参数的配置设置,具体参数可参考[这里](../data-operate/import/import-way/stream-load-manual.md),配置方法如下:
+编译成功后,会在 `dist` 目录生成目标jar包,如:`flink-doris-connector-1.5.0-SNAPSHOT.jar`。
+将此文件复制到 `Flink` 的 `classpath` 中即可使用 `Flink-Doris-Connector` 。例如, `Local` 模式运行的 
`Flink` ,将此文件放入 `lib/` 文件夹下。 `Yarn` 集群模式运行的 `Flink` ,则将此文件放入预部署包中。
 
-* SQL 使用 `WITH` 参数 `sink.properties.` 配置
-* DataStream 
使用方法`DorisExecutionOptions.builder().setStreamLoadProp(Properties)`配置
+## 使用方法
 
-### SQL
+### 读取
 
-* Source
+#### SQL
 
 ```sql
+-- doris source
 CREATE TABLE flink_doris_source (
     name STRING,
     age INT,
@@ -119,11 +98,33 @@ CREATE TABLE flink_doris_source (
 );
 ```
 
-* Sink
+#### DataStream
+
+```java
+DorisOptions.Builder builder = DorisOptions.builder()
+        .setFenodes("FE_IP:8030")
+        .setTableIdentifier("db.table")
+        .setUsername("root")
+        .setPassword("password");
+
+DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
+        .setDorisOptions(builder.build())
+        .setDorisReadOptions(DorisReadOptions.builder().build())
+        .setDeserializer(new SimpleListDeserializationSchema())
+        .build();
+
+env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
source").print();
+```
+
+### 写入
+
+#### SQL
 
 ```sql
 -- enable checkpoint
 SET 'execution.checkpointing.interval' = '10s';
+
+-- doris sink
 CREATE TABLE flink_doris_sink (
     name STRING,
     age INT,
@@ -138,37 +139,16 @@ CREATE TABLE flink_doris_sink (
       'password' = 'password',
       'sink.label-prefix' = 'doris_label'
 );
-```
-
-* Insert
 
-```sql
+-- submit insert job
 INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
 ```
 
-### DataStream
-
-* Source
-
-```java
-DorisOptions.Builder builder = DorisOptions.builder()
-        .setFenodes("FE_IP:8030")
-        .setTableIdentifier("db.table")
-        .setUsername("root")
-        .setPassword("password");
-
-DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder()
-        .setDorisOptions(builder.build())
-        .setDorisReadOptions(DorisReadOptions.builder().build())
-        .setDeserializer(new SimpleListDeserializationSchema())
-        .build();
-
-env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris 
source").print();
-```
+#### DataStream
 
-* Sink
+DorisSink是通过StreamLoad想Doris写入数据,DataStream写入时,支持不同的序列化方法
 
-**String 数据流**
+**String 数据流(SimpleStringSerializer)**
 
 ```java
 // enable checkpoint
@@ -183,16 +163,15 @@ dorisBuilder.setFenodes("FE_IP:8030")
         .setUsername("root")
         .setPassword("password");
 
-
 DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
-executionBuilder.setLabelPrefix("label-doris"); //streamload label prefix
+executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
+                .setDeletable(false); 
 
 builder.setDorisReadOptions(DorisReadOptions.builder().build())
         .setDorisExecutionOptions(executionBuilder.build())
         .setSerializer(new SimpleStringSerializer()) //serialize according to 
string 
         .setDorisOptions(dorisBuilder.build());
 
-
 //mock string source
 List<Tuple2<String, Integer>> data = new ArrayList<>();
 data.add(new Tuple2<>("doris",1));
@@ -202,7 +181,7 @@ source.map((MapFunction<Tuple2<String, Integer>, String>) t 
-> t.f0 + "\t" + t.f
       .sinkTo(builder.build());
 ```
 
-**RowData 数据流**
+**RowData 数据流(RowDataSerializer)**
 
 ```java
 // enable checkpoint
@@ -224,6 +203,7 @@ properties.setProperty("format", "json");
 properties.setProperty("read_json_by_line", "true");
 DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
 executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
+                .setDeletable(false)
                 .setStreamLoadProp(properties); //streamload params
 
 //flink rowdata‘s schema
@@ -255,7 +235,8 @@ DataStream<RowData> source = env.fromElements("")
 source.sinkTo(builder.build());
 ```
 
-**SchemaChange 数据流**
+**SchemaChange 数据流(JsonDebeziumSchemaSerializer)**
+
 ```java
 // enable checkpoint
 env.enableCheckpointing(10000);
@@ -270,7 +251,7 @@ DorisOptions dorisOptions = DorisOptions.builder()
         .setPassword("").build();
 
 DorisExecutionOptions.Builder  executionBuilder = 
DorisExecutionOptions.builder();
-executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID())
+executionBuilder.setLabelPrefix("label-prefix")
         .setStreamLoadProp(props).setDeletable(true);
 
 DorisSink.Builder<String> builder = DorisSink.builder();
@@ -279,40 +260,96 @@ 
builder.setDorisReadOptions(DorisReadOptions.builder().build())
         .setDorisOptions(dorisOptions)
         
.setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
 
-env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL 
Source")//.print();
+env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
         .sinkTo(builder.build());
 ```
 参考: 
[CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java)
 
+### Lookup Join
+
+```sql
+CREATE TABLE fact_table (
+  `id` BIGINT,
+  `name` STRING,
+  `city` STRING,
+  `process_time` as proctime()
+) WITH (
+  'connector' = 'kafka',
+  ...
+);
+
+create table dim_city(
+  `city` STRING,
+  `level` INT ,
+  `province` STRING,
+  `country` STRING
+) WITH (
+  'connector' = 'doris',
+  'fenodes' = '127.0.0.1:8030',
+  'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
+  'table.identifier' = 'dim.dim_city',
+  'username' = 'root',
+  'password' = ''
+);
+
+SELECT a.id, a.name, a.city, c.province, c.country,c.level 
+FROM fact_table a
+LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
+ON a.city = c.city
+```
+
 
 ## 配置
 
 ### 通用配置项
 
-| Key                              | Default Value      | Required | Comment   
                                                   |
-| -------------------------------- | ------------------ | -------- | 
------------------------------------------------------------ |
-| fenodes                          | --                 | Y        | Doris FE 
http 地址                                           |
-| table.identifier                 | --                 | Y        | Doris 
表名,如:db.tbl                                       |
-| username                         | --                 | Y        | 访问 Doris 
的用户名                                          |
-| password                         | --                 | Y        | 访问 Doris 
的密码                                            |
-| doris.request.retries            | 3                  | N        | 向 Doris 
发送请求的重试次数                                  |
-| doris.request.connect.timeout.ms | 30000              | N        | 向 Doris 
发送请求的连接超时时间                              |
-| doris.request.read.timeout.ms    | 30000              | N        | 向 Doris 
发送请求的读取超时时间                              |
-| doris.request.query.timeout.s    | 3600               | N        | 查询 Doris 
的超时时间,默认值为1小时,-1表示无超时限制       |
-| doris.request.tablet.size        | Integer. MAX_VALUE | N        | 一个 
Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 
Doris 造成更大的压力。 |
-| doris.batch.size                 | 1024               | N        | 一次从 BE 
读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
-| doris.exec.mem.limit             | 2147483648         | N        | 
单个查询的内存限制。默认为 2GB,单位为字节                   |
-| doris.deserialize.arrow.async    | FALSE              | N        | 是否支持异步转换 
Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch |
-| doris.deserialize.queue.size     | 64                 | N        | 异步转换 
Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
-| doris.read.field                 | --                 | N        | 读取 Doris 
表的列名列表,多列之间使用逗号分隔                |
-| doris.filter.query               | --                 | N        | 
过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 |
-| sink.label-prefix                | --                 | Y        | Stream 
load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。 |
-| sink.properties.*                | --                 | N        | Stream 
Load 的导入参数。<br/>例如:  'sink.properties.column_separator' = ', ' 定义列分隔符,  
'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01  
<br/><br/>JSON格式导入<br/>'sink.properties.format' = 'json' 
'sink.properties.read_json_by_line' = 'true' |
-| sink.enable-delete               | TRUE               | N        | 
是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 |
-| sink.enable-2pc                  | TRUE               | N        | 
是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考[这里](../data-operate/import/import-way/stream-load-manual.md)。
 |
-| sink.buffer-size                 | 1MB                | N        | 
写数据缓存buffer大小,单位字节。不建议修改,默认配置即可     |
-| sink.buffer-count                | 3                  | N        | 
写数据缓存buffer个数。不建议修改,默认配置即可               |
-| sink.max-retries                 | 3                  | N        | 
Commit失败后的最大重试次数,默认3次                          |
+| Key                              | Default Value | Required | Comment        
                                 |
+| -------------------------------- | ------------- | -------- | 
----------------------------------------------- |
+| fenodes                          | --            | Y        | Doris FE http 
地址, 支持多个地址,使用逗号分隔 |
+| table.identifier                 | --            | Y        | Doris 
表名,如:db.tbl                          |
+| username                         | --            | Y        | 访问 Doris 的用户名  
                           |
+| password                         | --            | Y        | 访问 Doris 的密码   
                            |
+| doris.request.retries            | 3             | N        | 向 Doris 
发送请求的重试次数                     |
+| doris.request.connect.timeout.ms | 30000         | N        | 向 Doris 
发送请求的连接超时时间                 |
+| doris.request.read.timeout.ms    | 30000         | N        | 向 Doris 
发送请求的读取超时时间                 |
+
+### Source 配置项
+
+| Key                           | Default Value      | Required | Comment      
                                                |
+| ----------------------------- | ------------------ | -------- | 
------------------------------------------------------------ |
+| doris.request.query.timeout.s | 3600               | N        | 查询 Doris 
的超时时间,默认值为1小时,-1表示无超时限制       |
+| doris.request.tablet.size     | Integer. MAX_VALUE | N        | 一个 Partition 
对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 
造成更大的压力。 |
+| doris.batch.size              | 1024               | N        | 一次从 BE 
读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
+| doris.exec.mem.limit          | 2147483648         | N        | 
单个查询的内存限制。默认为 2GB,单位为字节                   |
+| doris.deserialize.arrow.async | FALSE              | N        | 是否支持异步转换 
Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch |
+| doris.deserialize.queue.size  | 64                 | N        | 异步转换 Arrow 
格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
+| doris.read.field              | --                 | N        | 读取 Doris 
表的列名列表,多列之间使用逗号分隔                |
+| doris.filter.query            | --                 | N        | 
过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。比如 age=18。 |
+
+### Sink 配置项
+
+| Key                | Default Value | Required | Comment                      
                                |
+| ------------------ | ------------- | -------- | 
------------------------------------------------------------ |
+| sink.label-prefix  | --            | Y        | Stream 
load导入使用的label前缀。2pc场景下要求全局唯一 ,用来保证Flink的EOS语义。 |
+| sink.properties.*  | --            | N        | Stream Load 的导入参数。<br/>例如:  
'sink.properties.column_separator' = ', ' 定义列分隔符,  
'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01  
<br/><br/>JSON格式导入<br/>'sink.properties.format' = 'json' 
'sink.properties.read_json_by_line' = 
'true'<br/>详细参数参考[这里](../data-operate/import/import-way/stream-load-manual.md)。 
|
+| sink.enable-delete | TRUE          | N        | 是否启用删除。此选项需要 Doris 
表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 |
+| sink.enable-2pc    | TRUE          | N        | 
是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考[这里](../data-operate/import/import-way/stream-load-manual.md)。
 |
+| sink.buffer-size   | 1MB           | N        | 
写数据缓存buffer大小,单位字节。不建议修改,默认配置即可     |
+| sink.buffer-count  | 3             | N        | 写数据缓存buffer个数。不建议修改,默认配置即可   
            |
+| sink.max-retries   | 3             | N        | Commit失败后的最大重试次数,默认3次        
                  |
+
+### Lookup Join 配置项
+
+| Key                               | Default Value | Required | Comment       
                             |
+| --------------------------------- | ------------- | -------- | 
------------------------------------------ |
+| jdbc-url                          | --            | Y        | jdbc连接信息      
                         |
+| lookup.cache.max-rows             | -1            | N        | 
lookup缓存的最大行数,默认值-1,不开启缓存 |
+| lookup.cache.ttl                  | 10s           | N        | 
lookup缓存的最大时间,默认10s              |
+| lookup.max-retries                | 1             | N        | 
lookup查询失败后的重试次数                 |
+| lookup.jdbc.async                 | false         | N        | 
是否开启异步的lookup,默认false            |
+| lookup.jdbc.read.batch.size       | 128           | N        | 
异步lookup下,每次查询的最大批次大小       |
+| lookup.jdbc.read.batch.queue-size | 256           | N        | 
异步lookup时,中间缓冲队列的大小           |
+| lookup.jdbc.read.thread-size      | 3             | N        | 
每个task中lookup的jdbc线程数               |
 
 ## Doris 和 Flink 列类型映射关系
 
@@ -336,7 +373,7 @@ env.fromSource(mySqlSource, 
WatermarkStrategy.noWatermarks(), "MySQL Source")//.
 | TIME       | DOUBLE             |
 | HLL        | Unsupported datatype             |
 
-## 使用FlinkSQL通过CDC接入Doris示例(支持Insert/Update/Delete事件)
+## 使用FlinkSQL通过CDC接入Doris示例
 ```sql
 -- enable checkpoint
 SET 'execution.checkpointing.interval' = '10s';
@@ -355,7 +392,7 @@ CREATE TABLE cdc_mysql_source (
  'table-name' = 'table'
 );
 
--- 支持删除事件同步(sink.enable-delete='true'),需要 Doris 表开启批量删除功能
+-- 支持同步insert/update/delete事件
 CREATE TABLE doris_sink (
 id INT,
 name STRING
@@ -368,7 +405,7 @@ WITH (
   'password' = '',
   'sink.properties.format' = 'json',
   'sink.properties.read_json_by_line' = 'true',
-  'sink.enable-delete' = 'true',
+  'sink.enable-delete' = 'true',  -- 同步删除事件
   'sink.label-prefix' = 'doris_label'
 );
 
@@ -377,11 +414,11 @@ insert into doris_sink select id,name from 
cdc_mysql_source;
 
 ## 使用FlinkCDC接入多表或整库示例
 ### 语法
-```
+```shell
 <FLINK_HOME>/bin/flink run \
     -c org.apache.doris.flink.tools.cdc.CdcTools \
     lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
-    mysql-sync-database \
+    <mysql-sync-database|oracle-sync-database> \
     --database <doris-database-name> \
     [--job-name <flink-job-name>] \
     [--table-prefix <doris-table-prefix>] \
@@ -389,6 +426,7 @@ insert into doris_sink select id,name from cdc_mysql_source;
     [--including-tables <mysql-table-name|name-regular-expr>] \
     [--excluding-tables <mysql-table-name|name-regular-expr>] \
     --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> 
...] \
+    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf 
<oracle-cdc-source-conf> ...] \
     --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
     [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
 ```
@@ -400,13 +438,14 @@ insert into doris_sink select id,name from 
cdc_mysql_source;
 - **--including-tables** 需要同步的MySQL表,可以使用"|" 分隔多个表,并支持正则表达式。 
比如--including-tables table1|tbl.*就是同步table1和所有以tbl开头的表。
 - **--excluding-tables** 不需要同步的表,用法同上。
 - **--mysql-conf** MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1 
,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html)查看所有配置MySQL-CDC,其中hostname/username/password/database-name
 是必需的。
+- **--oracle-conf** Oracle CDCSource 配置,例如--oracle-conf hostname=127.0.0.1 
,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html)查看所有配置Oracle-CDC,其中hostname/username/password/database-name/schema-name
 是必需的。
 - **--sink-conf** Doris Sink 
的所有配置,可以在[这里](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9)查看完整的配置项。
 - **--table-conf** Doris表的配置项,即properties中包含的内容。 例如 --table-conf 
replication_num=1
 
->注:需要在$FLINK_HOME/lib 目录下添加flink-sql-connector-mysql-cdc-2.3.0.jar
+>注:同步时需要在$FLINK_HOME/lib 目录下添加对应的Flink CDC依赖,比如 
flink-sql-connector-mysql-cdc-${version}.jar,flink-sql-connector-oracle-cdc-${version}.jar
 
-### 示例
-```
+### MySQL同步示例
+```shell
 <FLINK_HOME>/bin/flink run \
     -Dexecution.checkpointing.interval=10s \
     -Dparallelism.default=1 \
@@ -427,8 +466,33 @@ insert into doris_sink select id,name from 
cdc_mysql_source;
     --table-conf replication_num=1 
 ```
 
+### Oracle同步示例
+
+```shell
+<FLINK_HOME>/bin/flink run \
+     -Dexecution.checkpointing.interval=10s \
+     -Dparallelism.default=1 \
+     -c org.apache.doris.flink.tools.cdc.CdcTools \
+     ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
+     oracle-sync-database \
+     --database test_db \
+     --oracle-conf hostname=127.0.0.1 \
+     --oracle-conf port=1521 \
+     --oracle-conf username=admin \
+     --oracle-conf password="password" \
+     --oracle-conf database-name=XE \
+     --oracle-conf schema-name=ADMIN \
+     --including-tables "tbl1|tbl2" \
+     --sink-conf fenodes=127.0.0.1:8030 \
+     --sink-conf username=root \
+     --sink-conf password=\
+     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
+     --sink-conf sink.label-prefix=label \
+     --table-conf replication_num=1
+```
 
 ## 使用FlinkCDC更新Key列
+
 一般在业务数据库中,会使用编号来作为表的主键,比如Student表,会使用编号(id)来作为主键,但是随着业务的发展,数据对应的编号有可能是会发生变化的。
 在这种场景下,使用FlinkCDC + Doris Connector同步数据,便可以自动更新Doris主键列的数据。
 ### 原理
@@ -532,7 +596,9 @@ Exactly-Once场景下,Flink Job重启时必须从最新的Checkpoint/Savepoint
 
 6. **errCode = 2, detailMessage = current running txns on db 10006 is 100, 
larger than limit 100**
 
-这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 `max_running_txn_num_per_db` 来解决。具体可参考 
[max_running_txn_num_per_db](https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db)
+这是因为同一个库并发导入超过了100,可通过调整 fe.conf的参数 `max_running_txn_num_per_db` 来解决,具体可参考 
[max_running_txn_num_per_db](https://doris.apache.org/zh-CN/docs/dev/admin-manual/config/fe-config/#max_running_txn_num_per_db)。
+
+同时,一个任务频繁修改label重启,也可能会导致这个错误。2pc场景下(Duplicate/Aggregate模型),每个任务的label需要唯一,并且从checkpoint重启时,flink任务才会主动abort掉之前已经precommit成功,没有commit的txn,频繁修改label重启,会导致大量precommit成功的txn无法被abort,占用事务。在Unique模型下也可关闭2pc,可以实现幂等写入。
 
 7. **Flink写入Uniq模型时,如何保证一批数据的有序性?**
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to