[ https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16089969#comment-16089969 ]
ASF GitHub Bot commented on FLINK-7143: --------------------------------------- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4301 Yes, I don't think we can get around this when restoring from "old" state. I also have another suspicion: I don't think that `KafkaConsumerTestBase.runMultipleSourcesOnePartitionExactlyOnceTest()` accurately catches some cases and I think there is a problem that we cannot accurately detect whether we are restoring or whether we are opening from scratch. Consider this case: 5 partitions, 5 parallel source instances. Now we rescale to 10 parallel source instances. Some sources don't get state, so they think that we are starting from scratch and they will run partition discovery. Doesn't this mean that they could possibly read from a topic where already another source is reading from, because it got the state for that? (Not this doesn't occur on master because all sources get all state.) > Partition assignment for Kafka consumer is not stable > ----------------------------------------------------- > > Key: FLINK-7143 > URL: https://issues.apache.org/jira/browse/FLINK-7143 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Steven Zhen Wu > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.2 > > > while deploying Flink 1.3 release to hundreds of routing jobs, we found some > issues with partition assignment for Kafka consumer. some partitions weren't > assigned and some partitions got assigned more than once. > Here is the bug introduced in Flink 1.3. > {code} > protected static void initializeSubscribedPartitionsToStartOffsets(...) > { > ... > for (int i = 0; i < kafkaTopicPartitions.size(); i++) { > if (i % numParallelSubtasks == indexOfThisSubtask) { > if (startupMode != > StartupMode.SPECIFIC_OFFSETS) { > > subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), > startupMode.getStateSentinel()); > } > ... > } > {code} > The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if > the {{kafkaTopicPartitions}} has different order among different subtasks, > assignment is not stable cross subtasks and creates the assignment issue > mentioned earlier. > fix is also very simple, we should use partitionId to do the mod {{if > (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == > indexOfThisSubtask)}}. That would result in stable assignment cross subtasks > that is independent of ordering in the array. > marking it as blocker because of its impact. -- This message was sent by Atlassian JIRA (v6.4.14#64029)