e-mhui opened a new pull request, #6580: URL: https://github.com/apache/inlong/pull/6580
### Fix the problem of write failure after Doris sets the 'sink. properties. columns' parameter - Fixes #6578 ### Motivation To write bitmap data to doris, we need to set the `sink.properties.columns` option. ```sql CREATE TABLE `table_2`( `dt` INT, `page` STRING, `user_id` INT) WITH ( 'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=2', 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)', 'connector' = 'doris-inlong', 'fenodes' = 'xxxx:8030', 'username' = 'root', 'password' = 'xxx', 'sink.multiple.enable' = 'false', 'table.identifier' = 'db.table' ) ``` But, when writing data to doris, it will add a hidden column called `__ DORIS_ DELETE_ SIGN__`. More details refers [batch-delete-manual](https://doris.apache.org/docs/data-operate/update-delete/batch-delete-manual/). The source code is as follows: ```java // add doris delete sign if (enableBatchDelete()) { if (jsonFormat) { valueMap.put(DORIS_DELETE_SIGN, parseDeleteSign(rowData.getRowKind())); } else { value.add(parseDeleteSign(rowData.getRowKind())); } } ``` So, we need to add a column called `__ DORIS_ DELETE_ SIGN__` at the end of parameter, it identifies that the newly added column is a`__ DORIS_ DELETE_ SIGN__` field. such as: ```sql 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id), __ DORIS_ DELETE_ SIGN__' ``` ### Modifications 如果用户设置了`sink.properties.columns`参数,判断是否批量删除,如果可以,就在column参数中增加一个隐藏列`__ DORIS_ DELETE_ SIGN__`。代码如下: If the user has set `sink.properties.columns` option. We should determine whether to enable batch delete. If it enable batch delete, add a hidden column called `__ DORIS_ DELETE_ SIGN__` to the column parameter. The code is as follows: ```java Properties loadProperties = executionOptions.getStreamLoadProp(); // if enable batch delete, the columns must add tag '__DORIS_DELETE_SIGN__' String columns = (String) loadProperties.get(COLUMNS_KEY); if (loadProperties.containsKey(COLUMNS_KEY) && !columns.contains(DORIS_DELETE_SIGN) && enableBatchDelete()) { loadProperties.put(COLUMNS_KEY, String.format("%s,%s", columns, DORIS_DELETE_SIGN)); } private boolean enableBatchDelete() { try { Schema schema = RestService.getSchema(options, readOptions, LOG); return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType()); } catch (DorisException e) { throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier()); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org