[ 
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092528#comment-16092528
 ] 

ASF GitHub Bot commented on FLINK-7143:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4357#discussion_r128147294
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
    @@ -517,16 +519,13 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
                                        LOG.debug("Using the following offsets: 
{}", restoredState);
                                }
                        }
    -                   if (restoredState != null && restoredState.isEmpty()) {
    -                           restoredState = null;
    -                   }
                } else {
                        LOG.info("No restore state for FlinkKafkaConsumer.");
                }
        }
     
        @Override
    -   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
    +   public final void snapshotState(FunctionSnapshotContext context) throws 
Exception {
    --- End diff --
    
    @stevenzwu the snapshotState method was actually never intended to be 
overriden, hence making it final here to state that clearly. For example, the 
verrsion-specific implementations for `FlinkKafkaConsumerBase` may override 
that and have incorrect implementations, where as our tests would never realize 
it.


> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
>                 Key: FLINK-7143
>                 URL: https://issues.apache.org/jira/browse/FLINK-7143
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Steven Zhen Wu
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some 
> issues with partition assignment for Kafka consumer. some partitions weren't 
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3. 
> {code}
>       protected static void initializeSubscribedPartitionsToStartOffsets(...) 
> {
>                 ...
>               for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
>                       if (i % numParallelSubtasks == indexOfThisSubtask) {
>                               if (startupMode != 
> StartupMode.SPECIFIC_OFFSETS) {
>                                       
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
> startupMode.getStateSentinel());
>                               }
>                 ...
>          }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if 
> the {{kafkaTopicPartitions}} has different order among different subtasks, 
> assignment is not stable cross subtasks and creates the assignment issue 
> mentioned earlier. 
> fix is also very simple, we should use partitionId to do the mod {{if 
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == 
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks 
> that is independent of ordering in the array.
> marking it as blocker because of its impact.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to