[ 
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16082912#comment-16082912
 ] 

ASF GitHub Bot commented on FLINK-7143:
---------------------------------------

Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4301
  
    I think that would fix the bug. There are two things I would like to 
improve, though:
    
      1. Relying on `hashCode()` makes very implicit assumptions about the 
behavior of the hash code implementation. This does not really 
document/articulate well how critical this `int` value that we rely on is. For 
example, by Java specification, hashCode may vary between processes - it only 
needs to be stable within a single JVM. Our hash code implementation happens to 
be stable currently, as long as the JDK does not change the implementation of 
the String hash code method (which they could in theory do in any minor 
release, although they have not done that in a while).
    
      2. It is crucial that the distribution of partitions is uniform. That is 
a bit harder to guarantee when all sources pick up their own set of topics. At 
the least, distribution should be uniform of the partitions within a topic. For 
example, the topic defines "where to start" in the parallel subtasks, and the 
partitions then go "round robin".
    Well, as it happens, this is actually the implementation of the hash code 
function, but again, this looks a bit like it "coincidentally" behaves like 
that, rather than that we have a strict contract for that behavior. For 
example, changing the hashCode from `31 * topic + partition` to `31 * partition 
+ topic` results in non-uniform distribution, but is an equally valid hashCode.
    
    I would suggest to have a function `int assignmentIndex()` or so, for which 
we define the above contract. We should also have tests that this distributes 
partitions within a single topic uniform.


> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
>                 Key: FLINK-7143
>                 URL: https://issues.apache.org/jira/browse/FLINK-7143
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Steven Zhen Wu
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some 
> issues with partition assignment for Kafka consumer. some partitions weren't 
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3. 
> {code}
>       protected static void initializeSubscribedPartitionsToStartOffsets(...) 
> {
>                 ...
>               for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
>                       if (i % numParallelSubtasks == indexOfThisSubtask) {
>                               if (startupMode != 
> StartupMode.SPECIFIC_OFFSETS) {
>                                       
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
> startupMode.getStateSentinel());
>                               }
>                 ...
>          }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if 
> the {{kafkaTopicPartitions}} has different order among different subtasks, 
> assignment is not stable cross subtasks and creates the assignment issue 
> mentioned earlier. 
> fix is also very simple, we should use partitionId to do the mod {{if 
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == 
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks 
> that is independent of ordering in the array.
> marking it as blocker because of its impact.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to