[ https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098092#comment-16098092 ]
ASF GitHub Bot commented on FLINK-7143: --------------------------------------- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4387 [FLINK-7143] [kafka] Forward ports of new Kafka tests to master This PR forward ports all new tests added in #4357 to `master`, so that the behaviors is correctly guarded there also. ## Changes 1. Introduce `KafkaTopicPartitionAssigner` class to master branch, which strictly defines the partition assignment contract (discussed in #4301). 2. Port rescaling unit test. Note that some tested behaviors needed to be changed due to the differences between 1.3 and 1.4 for the Kafka consumer. 3. Make checkpoint methods final. 4. (new change, not a port) Remove invalid `checkRestoredNullCheckpointWhenFetcherNotReady` test, which was testing a legacy behavior of the consumer You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink kafka-forward-ports Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4387.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4387 ---- commit 7295777984084fc470edaee44e1bc32881409665 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Date: 2017-07-24T07:00:16Z [FLINK-7143] [kafka] Introduce KafkaTopicPartitionAssigner with stricter assignment contracts This commit refactors the local partition assignment logic to be located in a strict contract-defining method, to make it explicit of the expected partition to subtask assignment without relying solely on hashCode's of kafka partitions. commit 72a8c42505ba791f51001129a08744b104a171d7 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2017-07-18T09:57:46Z [FLINK-7143] [kafka] Add test for Kafka Consumer rescaling This verifies that the consumer always correctly knows whether it is restored or not and is not affected by changes in the partitions as reported by Kafka. Previously, operator state reshuffling could lead to partitions being subscribed to multiple times. commit 100936c7ab0b7bca4dab10143aa184dc31e2fd46 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2017-07-18T08:35:54Z [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase This prevents concrete Kafka Source implementations from accidentally overriding the checkpointing methods. This would be problematic when not providing tests. We test the checkpoint methods of the ConsumerBase but derived methods would not be tested. commit b0cf8779b76b5fe94beb4ffb6ba9adad16280be6 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Date: 2017-07-24T08:34:37Z [FLINK-7248] [kafka, tests] Remove invalid checkRestoredNullCheckpointWhenFetcherNotReady test This test is an invalid remnant from recent major Kafka consumer refactorings. The actual behaviour is covered by checkRestoredCheckpointWhenFetcherNotReady. When the fetcher is not yet ready and exposed and a checkpoint happens, we fallback to using any restored state as the checkpoint. ---- > Partition assignment for Kafka consumer is not stable > ----------------------------------------------------- > > Key: FLINK-7143 > URL: https://issues.apache.org/jira/browse/FLINK-7143 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.3.1 > Reporter: Steven Zhen Wu > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.2 > > > while deploying Flink 1.3 release to hundreds of routing jobs, we found some > issues with partition assignment for Kafka consumer. some partitions weren't > assigned and some partitions got assigned more than once. > Here is the bug introduced in Flink 1.3. > {code} > protected static void initializeSubscribedPartitionsToStartOffsets(...) > { > ... > for (int i = 0; i < kafkaTopicPartitions.size(); i++) { > if (i % numParallelSubtasks == indexOfThisSubtask) { > if (startupMode != > StartupMode.SPECIFIC_OFFSETS) { > > subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), > startupMode.getStateSentinel()); > } > ... > } > {code} > The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if > the {{kafkaTopicPartitions}} has different order among different subtasks, > assignment is not stable cross subtasks and creates the assignment issue > mentioned earlier. > fix is also very simple, we should use partitionId to do the mod {{if > (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == > indexOfThisSubtask)}}. That would result in stable assignment cross subtasks > that is independent of ordering in the array. > marking it as blocker because of its impact. -- This message was sent by Atlassian JIRA (v6.4.14#64029)