apoorvmittal10 commented on code in PR #18672: URL: https://github.com/apache/kafka/pull/18672#discussion_r1933841736
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ########## @@ -172,73 +172,58 @@ public PollResult poll(long currentTimeMs) { k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); TopicIdPartition tip = new TopicIdPartition(topicId, partition); - Acknowledgements acknowledgementsToSend = fetchAcknowledgementsToSend.remove(tip); - if (acknowledgementsToSend != null) { - metricsManager.recordAcknowledgementSent(acknowledgementsToSend.size()); - fetchAcknowledgementsInFlight.put(tip, acknowledgementsToSend); + Acknowledgements acknowledgementsToSend = null; + Map<TopicIdPartition, Acknowledgements> nodeAcknowledgementsMap = fetchAcknowledgementsToSend.get(node.id()); Review Comment: Query: Can there be ever some acknowledgments which can starve in the `fetchAcknowledgementsToSend` when some nodeId is never received from broker? ########## clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgementCommitCallback.java: ########## @@ -42,12 +44,16 @@ public interface AcknowledgementCommitCallback { * * @param exception The exception thrown during processing of the request, or null if the acknowledgement completed successfully. * <p><ul> + * <li> {@link AuthorizationException} if not authorized to the topic or group * <li> {@link InvalidRecordStateException} if the record state is invalid - * <li> {@link AuthorizationException} if not authorized to the topic of group + * <li> {@link NotLeaderOrFollowerException} if the leader had changed by the time the acknowledgements were sent + * <li> {@link DisconnectException} if the broker disconnected before the request could be completed * <li> {@link WakeupException} if {@link KafkaShareConsumer#wakeup()} is called before or while this function is called * <li> {@link InterruptException} if the calling thread is interrupted before or while this function is called * <li> {@link KafkaException} for any other unrecoverable errors * </ul> + * <p>Note that if the exception is a retriable exception, the acknowledgement could not be completed and the Review Comment: nit: For non-retribale exceptions it will fetch anyways again so I expect we mean that `even` for retriable exception, the fetch will happen again? ```suggestion * <p>Note that if the exception is even a retriable exception, the acknowledgement could not be completed and the ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org