[ 
https://issues.apache.org/jira/browse/FLINK-39321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

鲨鱼辣椒 updated FLINK-39321:
-------------------------
    Description: 
h2. Bug Description

When using Flink CDC to write to a Paimon dynamic bucket table with 
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following 
exception is thrown:
java.lang.IllegalArgumentException: This is a bug, record assign id 3 should 
equal to assign id 0.
at org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
h2. Reproduction Steps
 # Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set 
dynamic-bucket.initial-buckets=1
 # Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
 # Start the Flink CDC job to write data to the Paimon table
 # Observe the exception in the task manager logs

h2. Detailed Root Cause Analysis
h3. Step 1: PaimonHashFunction Calculates Routing

Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction

PaimonHashFunction is responsible for calculating which subtask each record 
should be routed to. It uses RowAssignerChannelComputer from Paimon:
// PaimonHashFunction.java
public PaimonHashFunction(
        Options options, TableId tableId, Schema 
        schema, ZoneId zoneId, int parallelism) \{
    this.parallelism = parallelism;
    Catalog catalog = FlinkCatalogFactory.
    createPaimonCatalog(options);
    FileStoreTable table = (FileStoreTable) 
    catalog.getTable(Identifier.fromString
    (tableId.toString()));
    
    if (table instanceof AppendOnlyFileStore) {
        this.fieldGetters = null;
        channelComputer = null;
    } else \{
        this.fieldGetters = PaimonWriterHelper.
        createFieldGetters(schema, zoneId);
        channelComputer = new 
        RowAssignerChannelComputer(table.schema
        (), parallelism);
        channelComputer.setup(parallelism);
    }
}

@Override
public int hashcode(DataChangeEvent event) \{
    if (channelComputer != null) {
        GenericRow genericRow = 
        PaimonWriterHelper.
        convertEventToGenericRow(event, 
        fieldGetters);
        return channelComputer.channel
        (genericRow);
    } else \{
        return ThreadLocalRandom.current().
        nextInt(parallelism);
    }
}
Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
h3. Step 2: RowAssignerChannelComputer Computes recordAssignId

Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer

RowAssignerChannelComputer is responsible for computing the assigner ID for 
each record:
// RowAssignerChannelComputer.java
@Override
public void setup(int numChannels) \{
    this.numChannels = numChannels;
    this.numAssigners = MathUtils.min
    (numAssigners, numChannels);
    this.extractor = new RowPartitionKeyExtractor
    (schema);
}

@Override
public int channel(InternalRow record) \{
    int partitionHash = extractor.partition
    (record).hashCode();
    int keyHash = extractor.trimmedPrimaryKey
    (record).hashCode();
    return computeAssigner(partitionHash, 
    keyHash, numChannels, numAssigners);
}{{}}
Calculation (when initial-buckets=1 and parallelism=4 ):
 * numChannels = 4 (from setup(parallelism) )
 * numAssigners = MathUtils.min(1, 4) = 1
 * recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4, 
numAssigners=1)
 * recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
Result : All records have recordAssignId = 0 , meaning they should all route to 
subtask 0.

This is correct behavior : When numAssigners=1 , all data should route to 
assigner 0.
h3. Step 3: BucketAssignOperator Creates HashBucketAssigner

Location : 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator

BucketAssignOperator is responsible for creating HashBucketAssigner instances 
for each subtask:
// BucketAssignOperator.java (before fix)
public void open() throws Exception \{
    super.open();
    int numAssigners = table.coreOptions().
    dynamicBucketInitialBuckets();
    int totalTasksNumber = getRuntimeContext().
    getNumberOfParallelSubtasks();
    int currentTaskNumber = getRuntimeContext().
    getIndexOfThisSubtask();
    
    int minAssigners = MathUtils.min
    (numAssigners, totalTasksNumber);
    
    assigner = new HashBucketAssigner(
        minAssigners,
        totalTasksNumber,  // numChannels
        currentTaskNumber   // assignId
    );
}{{}}
Parameters (when initial-buckets=1 and parallelism=4 ):
 * numAssigners = 1
 * totalTasksNumber = 4
 * currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
 * minAssigners = MathUtils.min(1, 4) = 1
HashBucketAssigner creation :
 * Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
 * Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
 * Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
 * Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)

h3. Step 4: HashBucketAssigner Validates recordAssignId

Location : org.apache.paimon.index.HashBucketAssigner

HashBucketAssigner validates that the recordAssignId matches the assignId :
// HashBucketAssigner.java
@Override
public int assign(BinaryRow partition, int hash) 
\{
    int partitionHash = partition.hashCode();
    int recordAssignId = computeAssignId
    (partitionHash, hash);
    
    checkArgument(
        recordAssignId == assignId,
        "This is a bug, record assign id %s 
        should equal to assign id %s.",
        recordAssignId,
        assignId);
    }
private int computeAssignId(int partitionHash, 
int hash) \{
    return computeAssigner(partitionHash, hash, 
    numChannels, numAssigners);
}{{}}
Validation (when initial-buckets=1 and parallelism=4 ):
 * recordAssignId = computeAssigner(partitionHash, hash, numChannels=4, 
numAssigners=1) = 0
 * Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
 * Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
 * Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
 * Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS

h3. The Conflict

The core conflict is between two components :
 # PaimonHashFunction (via RowAssignerChannelComputer) :

 * Calculates recordAssignId = 0 for all records when numAssigners=1
 * This is correct: all data should route to assigner 0

 # BucketAssignOperator (creates HashBucketAssigner) :

 * Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
 * Expects each subtask to receive records with matching recordAssignId
 * But all records have recordAssignId=0 , so only subtask 0 passes validation

The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all 
records to subtask 0, but BucketAssignOperator creates HashBucketAssigner 
instances with different assignId values for each subtask, causing validation 
failures.
h2. Solution

Modify BucketAssignOperator.java to align with the design principle that when 
numAssigners=1 , all data should route to assigner 0:
{{}}

  was:
*Problem Background*
When using Flink CDC 3.5.0 to write to a Paimon dynamic bucket table with 
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1。
*Problem Analysis*
When {{{}minAssigners=1{}}}: * PaimonHashFunction (via 
RowAssignerChannelComputer) calculates {{{}recordAssignId = (partitionHash % 1 
+ keyHash % 1) % numChannels = 0{}}}.
 * BucketAssignOperator expects all data to be routed to assigner 0.
 * However, BucketAssignOperator uses {{{}numChannels=totalTasksNumber{}}}, so 
data is routed to all subtasks, such as 0, 1, 2, 3, while {{recordAssignId=0}} 
for all records.
 * This causes validation failures when data enters other subtasks.
 
{code:java}
// BucketAssigner.java   
static boolean isMyBucket(int bucket, int numAssigners, int assignId) {   
return bucket % numAssigners == assignId % numAssigners;   }{code}

thrown:
{code:java}
java.lang.IllegalArgumentException: This is a bug, record assign id ? should 
equal to assign id ?.{code}

 


> Writing data from FlinkCDC to a Paimon dynamic bucket table. Assigner ID 
> error.
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-39321
>                 URL: https://issues.apache.org/jira/browse/FLINK-39321
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: 鲨鱼辣椒
>            Assignee: 鲨鱼辣椒
>            Priority: Major
>
> h2. Bug Description
> When using Flink CDC to write to a Paimon dynamic bucket table with 
> dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following 
> exception is thrown:
> java.lang.IllegalArgumentException: This is a bug, record assign id 3 should 
> equal to assign id 0.
> at 
> org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
> h2. Reproduction Steps
>  # Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set 
> dynamic-bucket.initial-buckets=1
>  # Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
>  # Start the Flink CDC job to write data to the Paimon table
>  # Observe the exception in the task manager logs
> h2. Detailed Root Cause Analysis
> h3. Step 1: PaimonHashFunction Calculates Routing
> Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction
> PaimonHashFunction is responsible for calculating which subtask each record 
> should be routed to. It uses RowAssignerChannelComputer from Paimon:
> // PaimonHashFunction.java
> public PaimonHashFunction(
>         Options options, TableId tableId, Schema 
>         schema, ZoneId zoneId, int parallelism) \{
>     this.parallelism = parallelism;
>     Catalog catalog = FlinkCatalogFactory.
>     createPaimonCatalog(options);
>     FileStoreTable table = (FileStoreTable) 
>     catalog.getTable(Identifier.fromString
>     (tableId.toString()));
>     
>     if (table instanceof AppendOnlyFileStore) {
>         this.fieldGetters = null;
>         channelComputer = null;
>     } else \{
>         this.fieldGetters = PaimonWriterHelper.
>         createFieldGetters(schema, zoneId);
>         channelComputer = new 
>         RowAssignerChannelComputer(table.schema
>         (), parallelism);
>         channelComputer.setup(parallelism);
>     }
> }
> @Override
> public int hashcode(DataChangeEvent event) \{
>     if (channelComputer != null) {
>         GenericRow genericRow = 
>         PaimonWriterHelper.
>         convertEventToGenericRow(event, 
>         fieldGetters);
>         return channelComputer.channel
>         (genericRow);
>     } else \{
>         return ThreadLocalRandom.current().
>         nextInt(parallelism);
>     }
> }
> Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
> h3. Step 2: RowAssignerChannelComputer Computes recordAssignId
> Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer
> RowAssignerChannelComputer is responsible for computing the assigner ID for 
> each record:
> // RowAssignerChannelComputer.java
> @Override
> public void setup(int numChannels) \{
>     this.numChannels = numChannels;
>     this.numAssigners = MathUtils.min
>     (numAssigners, numChannels);
>     this.extractor = new RowPartitionKeyExtractor
>     (schema);
> }
> @Override
> public int channel(InternalRow record) \{
>     int partitionHash = extractor.partition
>     (record).hashCode();
>     int keyHash = extractor.trimmedPrimaryKey
>     (record).hashCode();
>     return computeAssigner(partitionHash, 
>     keyHash, numChannels, numAssigners);
> }{{}}
> Calculation (when initial-buckets=1 and parallelism=4 ):
>  * numChannels = 4 (from setup(parallelism) )
>  * numAssigners = MathUtils.min(1, 4) = 1
>  * recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4, 
> numAssigners=1)
>  * recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
> Result : All records have recordAssignId = 0 , meaning they should all route 
> to subtask 0.
> This is correct behavior : When numAssigners=1 , all data should route to 
> assigner 0.
> h3. Step 3: BucketAssignOperator Creates HashBucketAssigner
> Location : 
> org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator
> BucketAssignOperator is responsible for creating HashBucketAssigner instances 
> for each subtask:
> // BucketAssignOperator.java (before fix)
> public void open() throws Exception \{
>     super.open();
>     int numAssigners = table.coreOptions().
>     dynamicBucketInitialBuckets();
>     int totalTasksNumber = getRuntimeContext().
>     getNumberOfParallelSubtasks();
>     int currentTaskNumber = getRuntimeContext().
>     getIndexOfThisSubtask();
>     
>     int minAssigners = MathUtils.min
>     (numAssigners, totalTasksNumber);
>     
>     assigner = new HashBucketAssigner(
>         minAssigners,
>         totalTasksNumber,  // numChannels
>         currentTaskNumber   // assignId
>     );
> }{{}}
> Parameters (when initial-buckets=1 and parallelism=4 ):
>  * numAssigners = 1
>  * totalTasksNumber = 4
>  * currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
>  * minAssigners = MathUtils.min(1, 4) = 1
> HashBucketAssigner creation :
>  * Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
>  * Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
>  * Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
>  * Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)
> h3. Step 4: HashBucketAssigner Validates recordAssignId
> Location : org.apache.paimon.index.HashBucketAssigner
> HashBucketAssigner validates that the recordAssignId matches the assignId :
> // HashBucketAssigner.java
> @Override
> public int assign(BinaryRow partition, int hash) 
> \{
>     int partitionHash = partition.hashCode();
>     int recordAssignId = computeAssignId
>     (partitionHash, hash);
>     
>     checkArgument(
>         recordAssignId == assignId,
>         "This is a bug, record assign id %s 
>         should equal to assign id %s.",
>         recordAssignId,
>         assignId);
>     }
> private int computeAssignId(int partitionHash, 
> int hash) \{
>     return computeAssigner(partitionHash, hash, 
>     numChannels, numAssigners);
> }{{}}
> Validation (when initial-buckets=1 and parallelism=4 ):
>  * recordAssignId = computeAssigner(partitionHash, hash, numChannels=4, 
> numAssigners=1) = 0
>  * Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
>  * Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
>  * Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
>  * Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS
> h3. The Conflict
> The core conflict is between two components :
>  # PaimonHashFunction (via RowAssignerChannelComputer) :
>  * Calculates recordAssignId = 0 for all records when numAssigners=1
>  * This is correct: all data should route to assigner 0
>  # BucketAssignOperator (creates HashBucketAssigner) :
>  * Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
>  * Expects each subtask to receive records with matching recordAssignId
>  * But all records have recordAssignId=0 , so only subtask 0 passes validation
> The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all 
> records to subtask 0, but BucketAssignOperator creates HashBucketAssigner 
> instances with different assignId values for each subtask, causing validation 
> failures.
> h2. Solution
> Modify BucketAssignOperator.java to align with the design principle that when 
> numAssigners=1 , all data should route to assigner 0:
> {{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to