[
https://issues.apache.org/jira/browse/FLINK-39321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
鲨鱼辣椒 updated FLINK-39321:
-------------------------
Description:
h2. Bug Description
When using Flink CDC to write to a Paimon dynamic bucket table with
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following
exception is thrown:
java.lang.IllegalArgumentException: This is a bug, record assign id 3 should
equal to assign id 0.
at org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
h2. Reproduction Steps
# Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set
dynamic-bucket.initial-buckets=1
# Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
# Start the Flink CDC job to write data to the Paimon table
# Observe the exception in the task manager logs
h2. Detailed Root Cause Analysis
h3. Step 1: PaimonHashFunction Calculates Routing
Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction
PaimonHashFunction is responsible for calculating which subtask each record
should be routed to. It uses RowAssignerChannelComputer from Paimon:
// PaimonHashFunction.java
public PaimonHashFunction(
Options options, TableId tableId, Schema
schema, ZoneId zoneId, int parallelism) \{
this.parallelism = parallelism;
Catalog catalog = FlinkCatalogFactory.
createPaimonCatalog(options);
FileStoreTable table = (FileStoreTable)
catalog.getTable(Identifier.fromString
(tableId.toString()));
if (table instanceof AppendOnlyFileStore) {
this.fieldGetters = null;
channelComputer = null;
} else \{
this.fieldGetters = PaimonWriterHelper.
createFieldGetters(schema, zoneId);
channelComputer = new
RowAssignerChannelComputer(table.schema
(), parallelism);
channelComputer.setup(parallelism);
}
}
@Override
public int hashcode(DataChangeEvent event) \{
if (channelComputer != null) {
GenericRow genericRow =
PaimonWriterHelper.
convertEventToGenericRow(event,
fieldGetters);
return channelComputer.channel
(genericRow);
} else \{
return ThreadLocalRandom.current().
nextInt(parallelism);
}
}
Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
h3. Step 2: RowAssignerChannelComputer Computes recordAssignId
Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer
RowAssignerChannelComputer is responsible for computing the assigner ID for
each record:
// RowAssignerChannelComputer.java
@Override
public void setup(int numChannels) \{
this.numChannels = numChannels;
this.numAssigners = MathUtils.min
(numAssigners, numChannels);
this.extractor = new RowPartitionKeyExtractor
(schema);
}
@Override
public int channel(InternalRow record) \{
int partitionHash = extractor.partition
(record).hashCode();
int keyHash = extractor.trimmedPrimaryKey
(record).hashCode();
return computeAssigner(partitionHash,
keyHash, numChannels, numAssigners);
}{{}}
Calculation (when initial-buckets=1 and parallelism=4 ):
* numChannels = 4 (from setup(parallelism) )
* numAssigners = MathUtils.min(1, 4) = 1
* recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4,
numAssigners=1)
* recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
Result : All records have recordAssignId = 0 , meaning they should all route to
subtask 0.
This is correct behavior : When numAssigners=1 , all data should route to
assigner 0.
h3. Step 3: BucketAssignOperator Creates HashBucketAssigner
Location :
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator
BucketAssignOperator is responsible for creating HashBucketAssigner instances
for each subtask:
// BucketAssignOperator.java (before fix)
public void open() throws Exception \{
super.open();
int numAssigners = table.coreOptions().
dynamicBucketInitialBuckets();
int totalTasksNumber = getRuntimeContext().
getNumberOfParallelSubtasks();
int currentTaskNumber = getRuntimeContext().
getIndexOfThisSubtask();
int minAssigners = MathUtils.min
(numAssigners, totalTasksNumber);
assigner = new HashBucketAssigner(
minAssigners,
totalTasksNumber, // numChannels
currentTaskNumber // assignId
);
}{{}}
Parameters (when initial-buckets=1 and parallelism=4 ):
* numAssigners = 1
* totalTasksNumber = 4
* currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
* minAssigners = MathUtils.min(1, 4) = 1
HashBucketAssigner creation :
* Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
* Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
* Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
* Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)
h3. Step 4: HashBucketAssigner Validates recordAssignId
Location : org.apache.paimon.index.HashBucketAssigner
HashBucketAssigner validates that the recordAssignId matches the assignId :
// HashBucketAssigner.java
@Override
public int assign(BinaryRow partition, int hash)
\{
int partitionHash = partition.hashCode();
int recordAssignId = computeAssignId
(partitionHash, hash);
checkArgument(
recordAssignId == assignId,
"This is a bug, record assign id %s
should equal to assign id %s.",
recordAssignId,
assignId);
}
private int computeAssignId(int partitionHash,
int hash) \{
return computeAssigner(partitionHash, hash,
numChannels, numAssigners);
}{{}}
Validation (when initial-buckets=1 and parallelism=4 ):
* recordAssignId = computeAssigner(partitionHash, hash, numChannels=4,
numAssigners=1) = 0
* Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
* Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
* Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
* Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS
h3. The Conflict
The core conflict is between two components :
# PaimonHashFunction (via RowAssignerChannelComputer) :
* Calculates recordAssignId = 0 for all records when numAssigners=1
* This is correct: all data should route to assigner 0
# BucketAssignOperator (creates HashBucketAssigner) :
* Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
* Expects each subtask to receive records with matching recordAssignId
* But all records have recordAssignId=0 , so only subtask 0 passes validation
The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all
records to subtask 0, but BucketAssignOperator creates HashBucketAssigner
instances with different assignId values for each subtask, causing validation
failures.
h2. Solution
Modify BucketAssignOperator.java to align with the design principle that when
numAssigners=1 , all data should route to assigner 0:
{{}}
was:
*Problem Background*
When using Flink CDC 3.5.0 to write to a Paimon dynamic bucket table with
dynamic-bucket.initial-buckets=1 and Flink parallelism > 1。
*Problem Analysis*
When {{{}minAssigners=1{}}}: * PaimonHashFunction (via
RowAssignerChannelComputer) calculates {{{}recordAssignId = (partitionHash % 1
+ keyHash % 1) % numChannels = 0{}}}.
* BucketAssignOperator expects all data to be routed to assigner 0.
* However, BucketAssignOperator uses {{{}numChannels=totalTasksNumber{}}}, so
data is routed to all subtasks, such as 0, 1, 2, 3, while {{recordAssignId=0}}
for all records.
* This causes validation failures when data enters other subtasks.
{code:java}
// BucketAssigner.java
static boolean isMyBucket(int bucket, int numAssigners, int assignId) {
return bucket % numAssigners == assignId % numAssigners; }{code}
thrown:
{code:java}
java.lang.IllegalArgumentException: This is a bug, record assign id ? should
equal to assign id ?.{code}
> Writing data from FlinkCDC to a Paimon dynamic bucket table. Assigner ID
> error.
> -------------------------------------------------------------------------------
>
> Key: FLINK-39321
> URL: https://issues.apache.org/jira/browse/FLINK-39321
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: 鲨鱼辣椒
> Assignee: 鲨鱼辣椒
> Priority: Major
>
> h2. Bug Description
> When using Flink CDC to write to a Paimon dynamic bucket table with
> dynamic-bucket.initial-buckets=1 and Flink parallelism > 1, the following
> exception is thrown:
> java.lang.IllegalArgumentException: This is a bug, record assign id 3 should
> equal to assign id 0.
> at
> org.apache.paimon.index.HashBucketAssigner.assign(HashBucketAssigner.java:85)
> h2. Reproduction Steps
> # Create a Paimon table with dynamic bucket mode ( bucket=-1 ) and set
> dynamic-bucket.initial-buckets=1
> # Configure Flink CDC pipeline with parallelism > 1 (e.g., parallelism=4)
> # Start the Flink CDC job to write data to the Paimon table
> # Observe the exception in the task manager logs
> h2. Detailed Root Cause Analysis
> h3. Step 1: PaimonHashFunction Calculates Routing
> Location : org.apache.flink.cdc.connectors.paimon.sink.PaimonHashFunction
> PaimonHashFunction is responsible for calculating which subtask each record
> should be routed to. It uses RowAssignerChannelComputer from Paimon:
> // PaimonHashFunction.java
> public PaimonHashFunction(
> Options options, TableId tableId, Schema
> schema, ZoneId zoneId, int parallelism) \{
> this.parallelism = parallelism;
> Catalog catalog = FlinkCatalogFactory.
> createPaimonCatalog(options);
> FileStoreTable table = (FileStoreTable)
> catalog.getTable(Identifier.fromString
> (tableId.toString()));
>
> if (table instanceof AppendOnlyFileStore) {
> this.fieldGetters = null;
> channelComputer = null;
> } else \{
> this.fieldGetters = PaimonWriterHelper.
> createFieldGetters(schema, zoneId);
> channelComputer = new
> RowAssignerChannelComputer(table.schema
> (), parallelism);
> channelComputer.setup(parallelism);
> }
> }
> @Override
> public int hashcode(DataChangeEvent event) \{
> if (channelComputer != null) {
> GenericRow genericRow =
> PaimonWriterHelper.
> convertEventToGenericRow(event,
> fieldGetters);
> return channelComputer.channel
> (genericRow);
> } else \{
> return ThreadLocalRandom.current().
> nextInt(parallelism);
> }
> }
> Key Point : channelComputer.setup(parallelism) is called with parallelism=4 .
> h3. Step 2: RowAssignerChannelComputer Computes recordAssignId
> Location : org.apache.paimon.flink.sink.RowAssignerChannelComputer
> RowAssignerChannelComputer is responsible for computing the assigner ID for
> each record:
> // RowAssignerChannelComputer.java
> @Override
> public void setup(int numChannels) \{
> this.numChannels = numChannels;
> this.numAssigners = MathUtils.min
> (numAssigners, numChannels);
> this.extractor = new RowPartitionKeyExtractor
> (schema);
> }
> @Override
> public int channel(InternalRow record) \{
> int partitionHash = extractor.partition
> (record).hashCode();
> int keyHash = extractor.trimmedPrimaryKey
> (record).hashCode();
> return computeAssigner(partitionHash,
> keyHash, numChannels, numAssigners);
> }{{}}
> Calculation (when initial-buckets=1 and parallelism=4 ):
> * numChannels = 4 (from setup(parallelism) )
> * numAssigners = MathUtils.min(1, 4) = 1
> * recordAssignId = computeAssigner(partitionHash, keyHash, numChannels=4,
> numAssigners=1)
> * recordAssignId = (partitionHash % 1 + keyHash % 1) % 4 = 0
> Result : All records have recordAssignId = 0 , meaning they should all route
> to subtask 0.
> This is correct behavior : When numAssigners=1 , all data should route to
> assigner 0.
> h3. Step 3: BucketAssignOperator Creates HashBucketAssigner
> Location :
> org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator
> BucketAssignOperator is responsible for creating HashBucketAssigner instances
> for each subtask:
> // BucketAssignOperator.java (before fix)
> public void open() throws Exception \{
> super.open();
> int numAssigners = table.coreOptions().
> dynamicBucketInitialBuckets();
> int totalTasksNumber = getRuntimeContext().
> getNumberOfParallelSubtasks();
> int currentTaskNumber = getRuntimeContext().
> getIndexOfThisSubtask();
>
> int minAssigners = MathUtils.min
> (numAssigners, totalTasksNumber);
>
> assigner = new HashBucketAssigner(
> minAssigners,
> totalTasksNumber, // numChannels
> currentTaskNumber // assignId
> );
> }{{}}
> Parameters (when initial-buckets=1 and parallelism=4 ):
> * numAssigners = 1
> * totalTasksNumber = 4
> * currentTaskNumber = 0, 1, 2, 3 (varies by subtask)
> * minAssigners = MathUtils.min(1, 4) = 1
> HashBucketAssigner creation :
> * Subtask 0: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=0)
> * Subtask 1: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=1)
> * Subtask 2: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=2)
> * Subtask 3: HashBucketAssigner(numAssigners=1, numChannels=4, assignId=3)
> h3. Step 4: HashBucketAssigner Validates recordAssignId
> Location : org.apache.paimon.index.HashBucketAssigner
> HashBucketAssigner validates that the recordAssignId matches the assignId :
> // HashBucketAssigner.java
> @Override
> public int assign(BinaryRow partition, int hash)
> \{
> int partitionHash = partition.hashCode();
> int recordAssignId = computeAssignId
> (partitionHash, hash);
>
> checkArgument(
> recordAssignId == assignId,
> "This is a bug, record assign id %s
> should equal to assign id %s.",
> recordAssignId,
> assignId);
> }
> private int computeAssignId(int partitionHash,
> int hash) \{
> return computeAssigner(partitionHash, hash,
> numChannels, numAssigners);
> }{{}}
> Validation (when initial-buckets=1 and parallelism=4 ):
> * recordAssignId = computeAssigner(partitionHash, hash, numChannels=4,
> numAssigners=1) = 0
> * Subtask 0: recordAssignId (0) == assignId (0) ✅ PASSES
> * Subtask 1: recordAssignId (0) == assignId (1) ❌ FAILS
> * Subtask 2: recordAssignId (0) == assignId (2) ❌ FAILS
> * Subtask 3: recordAssignId (0) == assignId (3) ❌ FAILS
> h3. The Conflict
> The core conflict is between two components :
> # PaimonHashFunction (via RowAssignerChannelComputer) :
> * Calculates recordAssignId = 0 for all records when numAssigners=1
> * This is correct: all data should route to assigner 0
> # BucketAssignOperator (creates HashBucketAssigner) :
> * Creates HashBucketAssigner with assignId=currentTaskNumber (0, 1, 2, 3)
> * Expects each subtask to receive records with matching recordAssignId
> * But all records have recordAssignId=0 , so only subtask 0 passes validation
> The mismatch : When numAssigners=1 , PaimonHashFunction correctly routes all
> records to subtask 0, but BucketAssignOperator creates HashBucketAssigner
> instances with different assignId values for each subtask, causing validation
> failures.
> h2. Solution
> Modify BucketAssignOperator.java to align with the design principle that when
> numAssigners=1 , all data should route to assigner 0:
> {{}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)