The `KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)` method returns the index of the target subtask for a given Kafka partition. The implementation in that method ensures that the same subtask index will always be returned for the same partition.
Each consumer subtask will locally invoke this assignment method for each Kafka partition. If the returned subtask index doesn’t equal the subtask’s index, that partition will be filtered out and not be read by the subtask. On 8 November 2017 at 6:38:54 PM, yunfan123 (yunfanfight...@foxmail.com) wrote: The code of kafka partition assign is like follows: public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks; // here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the start index return (startIndex + partition.getPartition()) % numParallelSubtasks; } It seems it will assign to multi sub tasks. I wonder how flink ensure some subtasks will simply remain idle -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/