ShivsundarR commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1934073652
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ########## @@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); - Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); - if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); - fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); + Acknowledgements acknowledgementsToSend = null; + Map<TopicIdPartition, Acknowledgements> nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id()); + if (nodeAcknowledgementsMap != null) { + acknowledgementsToSend = nodeAcknowledgementsMap.remove(tip); + if (acknowledgementsToSend != null) { + metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); + fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgementsToSend); + } } + handler.addPartitionToFetch(tip, acknowledgementsToSend); - fetchedPartitions.add(tip); topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); log.debug("Added fetch request for partition {} to node {}", tip, node.id()); } } - // Map storing the list of partitions to forget in the upcoming request. - Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new HashMap<>(); + + // Iterate over the session handlers to see if there are acknowledgements to be sent for partitions + // which are no longer part of the current subscription, or whose records were fetched from a + // previous leader. Cluster cluster = metadata.fetch(); - // Iterating over the session handlers to see if there are acknowledgements to be sent for partitions - // which are no longer part of the current subscription. sessionHandlers.forEach((nodeId, sessionHandler) -> { Node node = cluster.nodeById(nodeId); if (node != null) { if (nodesWithPendingRequests.contains(node.id())) { - log.trace("Skipping fetch because previous fetch request to {} has not been processed", node.id()); + log.trace("Skipping fetch because previous fetch request to {} has not been processed", nodeId); } else { - for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - if (!fetchedPartitions.contains(tip)) { - Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); - - if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); - fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); - - sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend); - handlerMap.put(node, sessionHandler); - - partitionsToForgetMap.putIfAbsent(node, new ArrayList<>()); - partitionsToForgetMap.get(node).add(tip); - - topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); - fetchedPartitions.add(tip); - log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id()); - } - } + Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); + if (nodeAcksFromFetchMap != null) { + nodeAcksFromFetchMap.forEach((tip, acks) -> { + metricsManager.recordAcknowledgementSent(acks.size()); Review Comment: Should we remove the mapping (tip, acks) from `nodeAcksFromFetchMap`(effectively removing them from `fetchAcknowledgementsToSend`) so that we do not process these acknowledgements again in the next iteration. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ########## @@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); - Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); - if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); - fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); + Acknowledgements acknowledgementsToSend = null; + Map<TopicIdPartition, Acknowledgements> nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id()); + if (nodeAcknowledgementsMap != null) { + acknowledgementsToSend = nodeAcknowledgementsMap.remove(tip); + if (acknowledgementsToSend != null) { + metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); + fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new HashMap<>()).put(tip, acknowledgementsToSend); + } } + handler.addPartitionToFetch(tip, acknowledgementsToSend); - fetchedPartitions.add(tip); topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); log.debug("Added fetch request for partition {} to node {}", tip, node.id()); } } - // Map storing the list of partitions to forget in the upcoming request. - Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new HashMap<>(); + + // Iterate over the session handlers to see if there are acknowledgements to be sent for partitions + // which are no longer part of the current subscription, or whose records were fetched from a + // previous leader. Cluster cluster = metadata.fetch(); - // Iterating over the session handlers to see if there are acknowledgements to be sent for partitions - // which are no longer part of the current subscription. sessionHandlers.forEach((nodeId, sessionHandler) -> { Node node = cluster.nodeById(nodeId); if (node != null) { if (nodesWithPendingRequests.contains(node.id())) { - log.trace("Skipping fetch because previous fetch request to {} has not been processed", node.id()); + log.trace("Skipping fetch because previous fetch request to {} has not been processed", nodeId); } else { - for (TopicIdPartition tip : sessionHandler.sessionPartitions()) { - if (!fetchedPartitions.contains(tip)) { - Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); - - if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); - fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); - - sessionHandler.addPartitionToFetch(tip, acknowledgementsToSend); - handlerMap.put(node, sessionHandler); - - partitionsToForgetMap.putIfAbsent(node, new ArrayList<>()); - partitionsToForgetMap.get(node).add(tip); - - topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); - fetchedPartitions.add(tip); - log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, node.id()); - } - } + Map<TopicIdPartition, Acknowledgements> nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId); + if (nodeAcksFromFetchMap != null) { + nodeAcksFromFetchMap.forEach((tip, acks) -> { + metricsManager.recordAcknowledgementSent(acks.size()); Review Comment: Should we remove the mapping `(tip, acks)` from `nodeAcksFromFetchMap`(effectively removing them from `fetchAcknowledgementsToSend`) so that we do not process these acknowledgements again in the next iteration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org