GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/4357
(release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafka fixes for release-1.3 This PR subsumes #4344 and #4301, including changes in both PRs merged and conflicts resolved. Apparently, some new tests added in one of the PRs relies also on the fix of the other PR, so opening this one to have a better overall view of the status of the fixes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink kafka-13-fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4357.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4357 ---- commit 919b23a6e1c650a3d08f5418a53e712e86d51506 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Date: 2017-07-11T17:03:01Z [FLINK-7143] [kafka] Fix indeterminate partition assignment in FlinkKafkaConsumer Apart from fixing the previous incorrect, indeterministic assignment logic, this commit also adds an explicitly defined method that properly states a strict contract for the assignment, instead of just relying on some hashCode implementation that doesn't convey this contract as well as the importance of the assignment's deterministic characteristic well. commit 00bcdbf24c177276f203063f905886becfe23db5 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Date: 2017-07-14T11:51:03Z [FLINK-7195] [kafka] Remove partition list querying when restoring state in FlinkKafkaConsumer Previously, querying the partition list and using it to filter out restored partition states is problematic since the queried partition list may be missing partitions due to temporary downtime of Kafka brokers. Effectively, this caused the potential dropping of state on restores. This commit fixes this by completely removing partition querying if we're restoring state (as notified by FunctionInitializationContext.isRestored()). The subscribed partitions will always be exactly what the restored state contains. commit a4ca2f559b1d530e68ce3516035964f569ff7c7f Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2017-07-17T17:06:09Z [FLINK-7143] [kafka] Fix detection of restored bit in Kafka Consumer Before, the problem was that empty state was associated with the source not being restored. However, a source can have empty restored state in one of two cases: 1. The source was not restored. 2. The overall job was restored but the source simply didn't get any operator state assigned. commit faf957209220d2779062321d7ab58c9356906ad8 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2017-07-18T08:35:54Z [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase This prevents concrete Kafka Source implementations from accidentally overriding the checkpointing methods. This would be problematic when not providing tests. We test the checkpoint methods of the ConsumerBase but derived methods would not be tested. commit 5180f898c48ce2e416547dcdf76caef72c5a8dee Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2017-07-18T09:57:46Z [FLINK-7143] [kafka] Add test for Kafka Consumer rescaling This verifies that the consumer always correctly knows whether it is restored or not and is not affected by changes in the partitions as reported by Kafka. Previously, operator state reshuffling could lead to partitions being subscribed to multiple times. ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---