Hi,Gordon
Yes, just now i again read the code in assignTopicPartitions method , it indeed subscribe the partition the subtask should subscribe to. i didn't read the for loop generate subscribedPartitions for each subtasks in assignTopicPartitions carefully before for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) { subscribedPartitions.add(kafkaTopicPartitions.get(i)); } you ar right : "the partitions are still filtered out to only be the partitions for each local subtask, using the `assignTopicPartitions` method" Thanks aitozi -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636p14642.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.