ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -644,8 +645,12 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, 
NodeAcknowledgements> ack
         if (currentFetch.isEmpty()) {
             final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
             if (fetch.isEmpty()) {
+                // Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+                Map<TopicIdPartition, NodeAcknowledgements> 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+                
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
                 // Fetch more records and send any waiting acknowledgements
-                applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+                applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Yes :)) turns out it can. 
   
   - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 
revealed that in transactions, when client receives only a control record(eg. 
an abort marker) in the `ShareFetchResponse` (without any non-control record), 
then in the `ShareCompletedFetch`, these control records are never 
acknowledged(ideally acknowledged with GAP, indicating the client is ignoring 
these control records) and are never presented to the consumer application. 
   
   - It is expected that control records are skipped and are not presented to 
the application, but client should still acknowledge them with GAP 
(https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33)
   
   - Now these control records are usually auto acknowledged with `GAP` and 
will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as 
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, 
we actually ignore the fetch here(meaning we never acknowledge these control 
records) - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
   
   - Now for this PR, we have added any possible acknowledgements that came in 
with the empty fetch (from control records) to the `ShareFetchEvent` so that it 
can be sent on the next poll().
   
   - I agree it looks a bit odd though for readability. But yeah there is a 
case when this could happen. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to