apoorvmittal10 commented on code in PR #18672:
URL: https://github.com/apache/kafka/pull/18672#discussion_r1933841736


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) {
                         k -> sessionHandlers.computeIfAbsent(node.id(), n -> 
new ShareSessionHandler(logContext, n, memberId)));
 
                 TopicIdPartition tip = new TopicIdPartition(topicId, 
partition);
-                Acknowledgements acknowledgementsToSend = 
fetchAcknowledgementsToSend.remove(tip);
-                if (acknowledgementsToSend != null) {
-                    
metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size());
-                    fetchAcknowledgementsInFlight.put(tip, 
acknowledgementsToSend);
+                Acknowledgements acknowledgementsToSend = null;
+                Map<TopicIdPartition, Acknowledgements> 
nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id());

Review Comment:
   Query: Can there be ever some acknowledgments which can starve in the 
`fetchAcknowledgementsToSend` when some nodeId is never received from broker?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java:
##########
@@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback {
      *
      * @param exception The exception thrown during processing of the request, 
or null if the acknowledgement completed successfully.
      * <p><ul>
+     * <li> {@link AuthorizationException} if not authorized to the topic or 
group
      * <li> {@link InvalidRecordStateException} if the record state is invalid
-     * <li> {@link AuthorizationException} if not authorized to the topic of 
group
+     * <li> {@link NotLeaderOrFollowerException} if the leader had changed by 
the time the acknowledgements were sent
+     * <li> {@link DisconnectException} if the broker disconnected before the 
request could be completed
      * <li> {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is 
called before or while this function is called
      * <li> {@link InterruptException} if the calling thread is interrupted 
before or while this function is called
      * <li> {@link KafkaException} for any other unrecoverable errors
      * </ul>
+     * <p>Note that if the exception is a retriable exception, the 
acknowledgement could not be completed and the

Review Comment:
   nit: For non-retribale exceptions it will fetch anyways again so I expect we 
mean that `even` for retriable exception, the fetch will happen again?
   ```suggestion
        * <p>Note that if the exception is even a retriable exception, the 
acknowledgement could not be completed and the
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to