The code of kafka partition assign is like follows: public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
// here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the start index return (startIndex + partition.getPartition()) % numParallelSubtasks; } It seems it will assign to multi sub tasks. I wonder how flink ensure some subtasks will simply remain idle -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/