The `KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)` method 
returns the index of the target subtask for a given Kafka partition.
The implementation in that method ensures that the same subtask index will 
always be returned for the same partition.

Each consumer subtask will locally invoke this assignment method for each Kafka 
partition.
If the returned subtask index doesn’t equal the subtask’s index, that partition 
will be filtered out and not be read by the subtask.

On 8 November 2017 at 6:38:54 PM, yunfan123 (yunfanfight...@foxmail.com) wrote:

The code of kafka partition assign is like follows:  

public static int assign(KafkaTopicPartition partition, int  
numParallelSubtasks) {  
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) %  
numParallelSubtasks;  

// here, the assumption is that the id of Kafka partitions are always  
ascending  
// starting from 0, and therefore can be used directly as the offset  
clockwise from the start index  
return (startIndex + partition.getPartition()) % numParallelSubtasks;  
}  

It seems it will assign to multi sub tasks.  
I wonder how flink ensure some subtasks will simply remain idle  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 

Reply via email to