[ 
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16091448#comment-16091448
 ] 

ASF GitHub Bot commented on FLINK-7143:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/4357

    (release-1.3) [FLINK-7143, FLINK-7195] Collection of Kafka fixes for 
release-1.3

    This PR subsumes #4344 and #4301, including changes in both PRs merged and 
conflicts resolved.
    Apparently, some new tests added in one of the PRs relies also on the fix 
of the other PR, so opening this one to have a better overall view of the 
status of the fixes.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink kafka-13-fixes

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4357.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4357
    
----
commit 919b23a6e1c650a3d08f5418a53e712e86d51506
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-07-11T17:03:01Z

    [FLINK-7143] [kafka] Fix indeterminate partition assignment in 
FlinkKafkaConsumer
    
    Apart from fixing the previous incorrect, indeterministic assignment
    logic, this commit also adds an explicitly defined method that properly
    states a strict contract for the assignment, instead of just relying on
    some hashCode implementation that doesn't convey this contract as well
    as the importance of the assignment's deterministic characteristic well.

commit 00bcdbf24c177276f203063f905886becfe23db5
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Date:   2017-07-14T11:51:03Z

    [FLINK-7195] [kafka] Remove partition list querying when restoring state in 
FlinkKafkaConsumer
    
    Previously, querying the partition list and using it to filter out
    restored partition states is problematic since the queried partition
    list may be missing partitions due to temporary downtime of Kafka
    brokers. Effectively, this caused the potential dropping of state on
    restores.
    
    This commit fixes this by completely removing partition querying if
    we're restoring state (as notified by
    FunctionInitializationContext.isRestored()). The subscribed partitions
    will always be exactly what the restored state contains.

commit a4ca2f559b1d530e68ce3516035964f569ff7c7f
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2017-07-17T17:06:09Z

    [FLINK-7143] [kafka] Fix detection of restored bit in Kafka Consumer
    
    Before, the problem was that empty state was associated with the source
    not being restored. However, a source can have empty restored state in
    one of two cases:
    
    1. The source was not restored.
    2. The overall job was restored but the source simply didn't get any
    operator state assigned.

commit faf957209220d2779062321d7ab58c9356906ad8
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2017-07-18T08:35:54Z

    [hotfix] [kafka] Make checkpoint methods final in KafkaConsumerBase
    
    This prevents concrete Kafka Source implementations from accidentally
    overriding the checkpointing methods. This would be problematic when not
    providing tests. We test the checkpoint methods of the ConsumerBase but
    derived methods would not be tested.

commit 5180f898c48ce2e416547dcdf76caef72c5a8dee
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2017-07-18T09:57:46Z

    [FLINK-7143] [kafka] Add test for Kafka Consumer rescaling
    
    This verifies that the consumer always correctly knows whether it is
    restored or not and is not affected by changes in the partitions as
    reported by Kafka.
    
    Previously, operator state reshuffling could lead to partitions being
    subscribed to multiple times.

----


> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
>                 Key: FLINK-7143
>                 URL: https://issues.apache.org/jira/browse/FLINK-7143
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Steven Zhen Wu
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some 
> issues with partition assignment for Kafka consumer. some partitions weren't 
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3. 
> {code}
>       protected static void initializeSubscribedPartitionsToStartOffsets(...) 
> {
>                 ...
>               for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
>                       if (i % numParallelSubtasks == indexOfThisSubtask) {
>                               if (startupMode != 
> StartupMode.SPECIFIC_OFFSETS) {
>                                       
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
> startupMode.getStateSentinel());
>                               }
>                 ...
>          }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if 
> the {{kafkaTopicPartitions}} has different order among different subtasks, 
> assignment is not stable cross subtasks and creates the assignment issue 
> mentioned earlier. 
> fix is also very simple, we should use partitionId to do the mod {{if 
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == 
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks 
> that is independent of ordering in the array.
> marking it as blocker because of its impact.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to