Hi, i have a question that , when we use KafkaConsumerBase, we will have to fetch data from different partition in different parllel thread like the method shown in KafkaConsumerBase.java (version 1.2.0)
protected static List<KafkaTopicPartition> assignPartitions( List<KafkaTopicPartition> allPartitions, int numConsumers, int consumerIndex) { final List<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<>( allPartitions.size() / numConsumers + 1); for (int i = 0; i < allPartitions.size(); i++) { if (i % numConsumers == consumerIndex) { thisSubtaskPartitions.add(allPartitions.get(i)); } } return thisSubtaskPartitions; } but i have not find any place invoke this method , in KafkaConsumerThread.java it used consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitions)); i think here subscribedPartitions is all the partitions , not subtaskPartitions. Can any one address my problem -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.