[ 
https://issues.apache.org/jira/browse/FLINK-38833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18047357#comment-18047357
 ] 

Yanquan Lv commented on FLINK-38833:
------------------------------------

I would like to take it.

> Partitioned fixed-bucket Paimon table write parallelism optimization
> --------------------------------------------------------------------
>
>                 Key: FLINK-38833
>                 URL: https://issues.apache.org/jira/browse/FLINK-38833
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: peiyu
>            Priority: Major
>
> {code:java}
> CREATE TABLE `paimon_catalog`.`ods`.`oms_db_tf_b_order` (
>   `ORDER_ID` BIGINT NOT NULL,
>   `OLD_ORDER_NUMBER` VARCHAR(20),
>   `YARD_ID` BIGINT,
>   `CONSIGNOR_STATE` SMALLINT,
>   `SOURCE` SMALLINT,
>   `OPERATION` VARCHAR(20),
>   `dt` VARCHAR(2147483647) NOT NULL,
>   PRIMARY KEY (`ORDER_ID`, `dt`) NOT ENFORCED
> ) PARTITIONED BY (`dt`)
> WITH (
>   'tag.automatic-creation' = 'process-time',
>   'changelog-producer' = 'input',
>   'tag.creation-period' = 'hourly',
>   'write-buffer-spillable' = 'true',
>   'bucket' = '2',
>   'changelog.time-retained' = '1 d',
>   'snapshot.time-retained' = '1 d',
>   'tag.num-retained-max' = '72',
>   'precommit-compact' = 'true',
>   'tag.creation-delay' = '10 m',
>   'deletion-vectors.enabled' = 'true'
> ); {code}
> cdc job config:
> {code:java}
> route:
>   - source-table: oms_db.tf_b_order_delist(_\d{4})?
>     sink-table: ods.oms_db_tf_b_order_delist
>   - source-table: oms_db.tf_b_order_receipt_check(_\d{4})?
>     sink-table: ods.oms_db_tf_b_order_receipt_check
>   - source-table: oms_db.tf_b_order_recept_audit(_\d{4})?
>     sink-table: ods.oms_db_tf_b_order_recept_audit
>   - source-table: oms_db.tf_b_order(_\d{4})?
>     sink-table: ods.oms_db_tf_b_order {code}
> The `{*}FlushEventAlignment{*}` operator's parallelism is matched to the 
> number of buckets, but it is not optimized for partitioning. Therefore, in 
> high-parallelism scenarios, it cannot fully utilize all the parallelism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to