[ https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090113#comment-16090113 ]
ASF GitHub Bot commented on FLINK-7143: --------------------------------------- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4301 Oye, this is more complicated than I thought. On `release-1.3` the assignment actually works if the Kafka brokers always return the partitions in the same order. The reason is that the assignment of partitions and the assignment of operator state (in `RoundRobinOperatorStateRepartitioner`) is aligned. This meant that it's not a problem when sources think that they are "fresh" (not restored from state) because they didn't get any state. If they tried to assign a partition to themselves this would also mean that they have the state for that (again, because partition assignment and operator state assignment are aligned). This PR breaks the alignment because the `startIndex` is not necessarily `0`. However, this is not caught by any tests because the `StateAssignmentOperation` has an optimisation where it doesn't repartition operator state if the parallelism doesn't change. If we deactivate that optimisation by turning this line into `if (true)`: https://github.com/apache/flink/blob/b1f762127234e323b947aa4a363935f87be1994f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L561-L561 the test in Kafka09ITCase will fail. The fix is to properly forward the information of whether we're restored in `initializeState()`, I did a commit for that: https://github.com/aljoscha/flink/tree/finish-pr-4301-kafka-13-fixings. The problem is that it is not easy to change the tests to catch this bug. I think an ITCase that uses Kafka and does a savepoint and rescaling would do the trick. > Partition assignment for Kafka consumer is not stable > ----------------------------------------------------- > > Key: FLINK-7143 > URL: https://issues.apache.org/jira/browse/FLINK-7143 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Steven Zhen Wu > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.2 > > > while deploying Flink 1.3 release to hundreds of routing jobs, we found some > issues with partition assignment for Kafka consumer. some partitions weren't > assigned and some partitions got assigned more than once. > Here is the bug introduced in Flink 1.3. > {code} > protected static void initializeSubscribedPartitionsToStartOffsets(...) > { > ... > for (int i = 0; i < kafkaTopicPartitions.size(); i++) { > if (i % numParallelSubtasks == indexOfThisSubtask) { > if (startupMode != > StartupMode.SPECIFIC_OFFSETS) { > > subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), > startupMode.getStateSentinel()); > } > ... > } > {code} > The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if > the {{kafkaTopicPartitions}} has different order among different subtasks, > assignment is not stable cross subtasks and creates the assignment issue > mentioned earlier. > fix is also very simple, we should use partitionId to do the mod {{if > (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == > indexOfThisSubtask)}}. That would result in stable assignment cross subtasks > that is independent of ordering in the array. > marking it as blocker because of its impact. -- This message was sent by Atlassian JIRA (v6.4.14#64029)