[ 
https://issues.apache.org/jira/browse/FLINK-39321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

鲨鱼辣椒 updated FLINK-39321:
-------------------------
    Description: 
*Problem Background*
When using Flink CDC 3.5.0 to write to a Paimon dynamic bucket table with 
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1。
*Problem Analysis*
When {{{}minAssigners=1{}}}: * PaimonHashFunction (via 
RowAssignerChannelComputer) calculates {{{}recordAssignId = (partitionHash % 1 
+ keyHash % 1) % numChannels = 0{}}}.
 * BucketAssignOperator expects all data to be routed to assigner 0.
 * However, BucketAssignOperator uses {{{}numChannels=totalTasksNumber{}}}, so 
data is routed to all subtasks, such as 0, 1, 2, 3, while {{recordAssignId=0}} 
for all records.
 * This causes validation failures when data enters other subtasks.
 
{code:java}
// BucketAssigner.java   static boolean isMyBucket(int bucket, int 
numAssigners, int assignId) {   return bucket % numAssigners == assignId % 
numAssigners;   }{code}

thrown:
{code:java}
java.lang.IllegalArgumentException: This is a bug, record assign id ? should 
equal to assign id ?.{code}

I have attempted to fix this bug, and by comparison, the latest version appears 
to still have this bug.
 

  was:
When using Flink CDC 3.5.0 to write to a Paimon dynamic bucket table with 
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following 
exception is thrown:
{code:java}
java.lang.IllegalArgumentException: This is a bug, record assign id ? should 
equal to assign id ?.{code}
 
I have attempted to fix this bug, and by comparison, the latest version appears 
to still have this bug.
 


> Writing data from FlinkCDC to a Paimon dynamic bucket table. Assigner ID 
> error.
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-39321
>                 URL: https://issues.apache.org/jira/browse/FLINK-39321
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: 鲨鱼辣椒
>            Priority: Major
>
> *Problem Background*
> When using Flink CDC 3.5.0 to write to a Paimon dynamic bucket table with 
> dynamic-bucket.initial-buckets=1 and Flink parallelism > 1。
> *Problem Analysis*
> When {{{}minAssigners=1{}}}: * PaimonHashFunction (via 
> RowAssignerChannelComputer) calculates {{{}recordAssignId = (partitionHash % 
> 1 + keyHash % 1) % numChannels = 0{}}}.
>  * BucketAssignOperator expects all data to be routed to assigner 0.
>  * However, BucketAssignOperator uses {{{}numChannels=totalTasksNumber{}}}, 
> so data is routed to all subtasks, such as 0, 1, 2, 3, while 
> {{recordAssignId=0}} for all records.
>  * This causes validation failures when data enters other subtasks.
>  
> {code:java}
> // BucketAssigner.java   static boolean isMyBucket(int bucket, int 
> numAssigners, int assignId) {   return bucket % numAssigners == assignId % 
> numAssigners;   }{code}
> thrown:
> {code:java}
> java.lang.IllegalArgumentException: This is a bug, record assign id ? should 
> equal to assign id ?.{code}
> I have attempted to fix this bug, and by comparison, the latest version 
> appears to still have this bug.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to