e-mhui opened a new pull request, #6580:
URL: https://github.com/apache/inlong/pull/6580

   ### Fix the problem of write failure after Doris sets the 'sink. properties. 
columns' parameter
   
   - Fixes #6578
   
   ### Motivation
   
   To write bitmap data to doris, we need to set the `sink.properties.columns` 
option.
   
   ```sql
   CREATE TABLE `table_2`(
       `dt` INT,
       `page` STRING,
       `user_id` INT)
       WITH (
       'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=2',
       'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)',
       'connector' = 'doris-inlong',
       'fenodes' = 'xxxx:8030',
       'username' = 'root',
       'password' = 'xxx',
       'sink.multiple.enable' = 'false',
       'table.identifier' = 'db.table'
   )
   ```
   
   But, when writing data to doris, it will add a hidden column called `__ 
DORIS_ DELETE_ SIGN__`. More details refers 
[batch-delete-manual](https://doris.apache.org/docs/data-operate/update-delete/batch-delete-manual/).
 The source code is as follows:
   
   ```java
   // add doris delete sign
   if (enableBatchDelete()) {
       if (jsonFormat) {
           valueMap.put(DORIS_DELETE_SIGN, 
parseDeleteSign(rowData.getRowKind()));
       } else {
           value.add(parseDeleteSign(rowData.getRowKind()));
       }
   }
   ```
   
   So, we need to add a column called `__ DORIS_ DELETE_ SIGN__` at the end of 
parameter, it identifies that the newly added column is a`__ DORIS_ DELETE_ 
SIGN__` field. such as: 
   
   ```sql
   'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id), __ 
DORIS_ DELETE_ SIGN__'
   ```
   
   ### Modifications
   
   如果用户设置了`sink.properties.columns`参数,判断是否批量删除,如果可以,就在column参数中增加一个隐藏列`__ 
DORIS_ DELETE_ SIGN__`。代码如下:
   
   If the user has set `sink.properties.columns` option. We should determine 
whether to enable batch delete. If it enable batch delete, add a hidden column 
called `__ DORIS_ DELETE_ SIGN__` to the column parameter.  The code is as 
follows:
   
   ```java
   Properties loadProperties = executionOptions.getStreamLoadProp();
   // if enable batch delete, the columns must add tag '__DORIS_DELETE_SIGN__'
   String columns = (String) loadProperties.get(COLUMNS_KEY);
   if (loadProperties.containsKey(COLUMNS_KEY) && 
!columns.contains(DORIS_DELETE_SIGN)
           && enableBatchDelete()) {
       loadProperties.put(COLUMNS_KEY, String.format("%s,%s", columns, 
DORIS_DELETE_SIGN));
   }
   
   private boolean enableBatchDelete() {
       try {
           Schema schema = RestService.getSchema(options, readOptions, LOG);
           return executionOptions.getEnableDelete() || 
UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
       } catch (DorisException e) {
           throw new RuntimeException("Failed fetch doris table schema: " + 
options.getTableIdentifier());
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to