ShivsundarR commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1934073652


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
                         k -> sessionHandlers.computeIfAbsent(node.id(), n -> 
new ShareSessionHandler(logContext, n, memberId)));
 
                 TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
-                Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-                if (acknowledgementsToSend != null) {
-                    
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-                    fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
+                Acknowledgements acknowledgementsToSend = null;
+                Map<TopicIdPartition, Acknowledgements> 
nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());
+                if (nodeAcknowledgementsMap != null) {
+                    acknowledgementsToSend = 
nodeAcknowledgementsMap.remove(tip);
+                    if (acknowledgementsToSend != null) {
+                        
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+                        
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new 
HashMap<>()).put(tip, acknowledgementsToSend);
+                    }
                 }
+
                 handler.addPartitionToFetch(tip, acknowledgementsToSend);
-                fetchedPartitions.add(tip);
                 topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), 
tip.partition()), tip.topic());
 
                 log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
             }
         }
 
-        // Map storing the list of partitions to forget in the upcoming 
request.
-        Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new 
HashMap<>();
+
+        // Iterate over the session handlers to see if there are 
acknowledgements to be sent for partitions
+        // which are no longer part of the current subscription, or whose 
records were fetched from a
+        // previous leader.
         Cluster cluster = metadata.fetch();
-        // Iterating over the session handlers to see if there are 
acknowledgements to be sent for partitions
-        // which are no longer part of the current subscription.
         sessionHandlers.forEach((nodeId, sessionHandler) -> {
             Node node = cluster.nodeById(nodeId);
             if (node != null) {
                 if (nodesWithPendingRequests.contains(node.id())) {
-                    log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", node.id());
+                    log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", nodeId);
                 } else {
-                    for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-                        if (!fetchedPartitions.contains(tip)) {
-                            Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-
-                            if (acknowledgementsToSend != null) {
-                                
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-                                fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
-
-                                sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
-                                handlerMap.put(node, sessionHandler);
-
-                                partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
-                                partitionsToForgetMap.get(node).add(tip);
-
-                                topicNamesMap.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
-                                fetchedPartitions.add(tip);
-                                log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, node.id());
-                            }
-                        }
+                    Map<TopicIdPartition, Acknowledgements> 
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
+                    if (nodeAcksFromFetchMap != null) {
+                        nodeAcksFromFetchMap.forEach((tip, acks) -> {
+                            
metricsManager.recordAcknowledgementSent(acks.size());

Review Comment:
   Should we remove the mapping (tip, acks) from 
`nodeAcksFromFetchMap`(effectively removing them from 
`fetchAcknowledgementsToSend`) so that we do not process these acknowledgements 
again in the next iteration.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
                         k -> sessionHandlers.computeIfAbsent(node.id(), n -> 
new ShareSessionHandler(logContext, n, memberId)));
 
                 TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
-                Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-                if (acknowledgementsToSend != null) {
-                    
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-                    fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
+                Acknowledgements acknowledgementsToSend = null;
+                Map<TopicIdPartition, Acknowledgements> 
nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());
+                if (nodeAcknowledgementsMap != null) {
+                    acknowledgementsToSend = 
nodeAcknowledgementsMap.remove(tip);
+                    if (acknowledgementsToSend != null) {
+                        
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
+                        
fetchAcknowledgementsInFlight.computeIfAbsent(node.id(), k -> new 
HashMap<>()).put(tip, acknowledgementsToSend);
+                    }
                 }
+
                 handler.addPartitionToFetch(tip, acknowledgementsToSend);
-                fetchedPartitions.add(tip);
                 topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), 
tip.partition()), tip.topic());
 
                 log.debug("Added fetch request for partition {} to node {}", 
tip, node.id());
             }
         }
 
-        // Map storing the list of partitions to forget in the upcoming 
request.
-        Map<Node, List<TopicIdPartition>> partitionsToForgetMap = new 
HashMap<>();
+
+        // Iterate over the session handlers to see if there are 
acknowledgements to be sent for partitions
+        // which are no longer part of the current subscription, or whose 
records were fetched from a
+        // previous leader.
         Cluster cluster = metadata.fetch();
-        // Iterating over the session handlers to see if there are 
acknowledgements to be sent for partitions
-        // which are no longer part of the current subscription.
         sessionHandlers.forEach((nodeId, sessionHandler) -> {
             Node node = cluster.nodeById(nodeId);
             if (node != null) {
                 if (nodesWithPendingRequests.contains(node.id())) {
-                    log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", node.id());
+                    log.trace("Skipping fetch because previous fetch request 
to {} has not been processed", nodeId);
                 } else {
-                    for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-                        if (!fetchedPartitions.contains(tip)) {
-                            Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-
-                            if (acknowledgementsToSend != null) {
-                                
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-                                fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
-
-                                sessionHandler.addPartitionToFetch(tip, 
acknowledgementsToSend);
-                                handlerMap.put(node, sessionHandler);
-
-                                partitionsToForgetMap.putIfAbsent(node, new 
ArrayList<>());
-                                partitionsToForgetMap.get(node).add(tip);
-
-                                topicNamesMap.putIfAbsent(new 
IdAndPartition(tip.topicId(), tip.partition()), tip.topic());
-                                fetchedPartitions.add(tip);
-                                log.debug("Added fetch request for previously 
subscribed partition {} to node {}", tip, node.id());
-                            }
-                        }
+                    Map<TopicIdPartition, Acknowledgements> 
nodeAcksFromFetchMap = fetchAcknowledgementsToSend.get(nodeId);
+                    if (nodeAcksFromFetchMap != null) {
+                        nodeAcksFromFetchMap.forEach((tip, acks) -> {
+                            
metricsManager.recordAcknowledgementSent(acks.size());

Review Comment:
   Should we remove the mapping `(tip, acks)` from 
`nodeAcksFromFetchMap`(effectively removing them from 
`fetchAcknowledgementsToSend`) so that we do not process these acknowledgements 
again in the next iteration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to