Steven Zhen Wu created FLINK-7143:
-------------------------------------
Summary: Partition assignment for Kafka consumer is not stable
Key: FLINK-7143
URL: https://issues.apache.org/jira/browse/FLINK-7143
Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 1.3.1
Reporter: Steven Zhen Wu
Priority: Blocker
while deploying Flink 1.3 release to hundreds of routing jobs, we found some
issues with partition assignment for Kafka consumer. some partitions weren't
assigned and some partitions got assigned more than once.
Here is the bug introduced in Flink 1.3.
{code}
protected static void initializeSubscribedPartitionsToStartOffsets(...)
{
...
for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
if (i % numParallelSubtasks == indexOfThisSubtask) {
if (startupMode !=
StartupMode.SPECIFIC_OFFSETS) {
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i),
startupMode.getStateSentinel());
}
...
}
{code}
The bug is using array index {code}i{code} to mode against
{code}numParallelSubtasks{code}. if the {code}kafkaTopicPartitions{code} has
different order among different subtasks, assignment is not stable cross
subtasks and creates the assignment issue mentioned earlier. fix is also very
simple, we should use partition id to do the mod {code}if
(kafkaTopicPartitions.get(i).getPartition() % numParallelSubtasks ==
indexOfThisSubtask){code}. That would result in stable assignment cross
subtasks that independent of ordering in the array.
marking it as blocker because of its impact.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)