Hi,
i have a question that , when we use KafkaConsumerBase, we will have to
fetch data from different partition
in different parllel thread like the method shown in
KafkaConsumerBase.java (version 1.2.0)
protected static List<KafkaTopicPartition> assignPartitions(
List<KafkaTopicPartition> allPartitions,
int numConsumers, int consumerIndex) {
final List<KafkaTopicPartition> thisSubtaskPartitions = new
ArrayList<>(
allPartitions.size() / numConsumers + 1);
for (int i = 0; i < allPartitions.size(); i++) {
if (i % numConsumers == consumerIndex) {
thisSubtaskPartitions.add(allPartitions.get(i));
}
}
return thisSubtaskPartitions;
}
but i have not find any place invoke this method , in
KafkaConsumerThread.java it used
consumerCallBridge.assignPartitions(consumer,
convertKafkaPartitions(subscribedPartitions));
i think here subscribedPartitions is all the partitions , not
subtaskPartitions. Can any one address my problem
--
View this message in context:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KafkaConsumerBase-tp14636.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at
Nabble.com.