[ https://issues.apache.org/jira/browse/KAFKA-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15705691#comment-15705691 ]
Ismael Juma edited comment on KAFKA-4405 at 11/29/16 4:04 PM: -------------------------------------------------------------- [~ysysberserk], are you really seeing the behaviour you have described here? Fetcher should only try to prefetch a given partition if we don't have records for it, see the code below (`fetchablePartitions` in particular): {code} private List<TopicPartition> fetchablePartitions() { List<TopicPartition> fetchable = subscriptions.fetchablePartitions(); if (nextInLineRecords != null && !nextInLineRecords.isEmpty()) fetchable.remove(nextInLineRecords.partition); for (CompletedFetch completedFetch : completedFetches) fetchable.remove(completedFetch.partition); return fetchable; } /** * Create fetch requests for all nodes for which we have assigned partitions * that have no existing requests in flight. */ private Map<Node, FetchRequest> createFetchRequests() { // create the fetch info Cluster cluster = metadata.fetch(); Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>(); for (TopicPartition partition : fetchablePartitions()) { Node node = cluster.leaderFor(partition); if (node == null) { metadata.requestUpdate(); } else if (this.client.pendingRequestCount(node) == 0) { // if there is a leader and no in-flight requests, issue a new fetch LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node); if (fetch == null) { fetch = new LinkedHashMap<>(); fetchable.put(node, fetch); } long position = this.subscriptions.position(partition); fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); log.trace("Added fetch request for partition {} at offset {}", partition, position); } else { log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node); } } // create the fetches Map<Node, FetchRequest> requests = new HashMap<>(); for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) { Node node = entry.getKey(); FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue()); requests.put(node, fetch); } return requests; } {code} was (Author: ijuma): [~ysysberserk], are you really seeing the behaviour you have described here? Fetcher should only try to prefetch if we don't have records for that partition, see the code below (`fetchablePartitions` in particular): {code} private List<TopicPartition> fetchablePartitions() { List<TopicPartition> fetchable = subscriptions.fetchablePartitions(); if (nextInLineRecords != null && !nextInLineRecords.isEmpty()) fetchable.remove(nextInLineRecords.partition); for (CompletedFetch completedFetch : completedFetches) fetchable.remove(completedFetch.partition); return fetchable; } /** * Create fetch requests for all nodes for which we have assigned partitions * that have no existing requests in flight. */ private Map<Node, FetchRequest> createFetchRequests() { // create the fetch info Cluster cluster = metadata.fetch(); Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>(); for (TopicPartition partition : fetchablePartitions()) { Node node = cluster.leaderFor(partition); if (node == null) { metadata.requestUpdate(); } else if (this.client.pendingRequestCount(node) == 0) { // if there is a leader and no in-flight requests, issue a new fetch LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node); if (fetch == null) { fetch = new LinkedHashMap<>(); fetchable.put(node, fetch); } long position = this.subscriptions.position(partition); fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); log.trace("Added fetch request for partition {} at offset {}", partition, position); } else { log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node); } } // create the fetches Map<Node, FetchRequest> requests = new HashMap<>(); for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) { Node node = entry.getKey(); FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue()); requests.put(node, fetch); } return requests; } {code} > Kafka consumer improperly send prefetch request > ----------------------------------------------- > > Key: KAFKA-4405 > URL: https://issues.apache.org/jira/browse/KAFKA-4405 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.0.1 > Reporter: ysysberserk > > Now kafka consumer has added max.poll.records to limit the count of messages > return by poll(). > According to KIP-41, to implement max.poll.records, the prefetch request > should only be sent when the total number of retained records is less than > max.poll.records. > But in the code of 0.10.0.1 , the consumer will send a prefetch request if it > retained any records and never check if total number of retained records is > less than max.poll.records.. > If max.poll.records is set to a count much less than the count of message > fetched , the poll() loop will send a lot of requests than expected and will > have more and more records fetched and stored in memory before they can be > consumed. > So before sending a prefetch request , the consumer must check if total > number of retained records is less than max.poll.records. -- This message was sent by Atlassian JIRA (v6.3.4#6332)