ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -644,8 +645,12 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, 
NodeAcknowledgements> ack
         if (currentFetch.isEmpty()) {
             final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer);
             if (fetch.isEmpty()) {
+                // Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+                Map<TopicIdPartition, NodeAcknowledgements> 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+                
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
                 // Fetch more records and send any waiting acknowledgements
-                applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+                applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Yes :)) turns out it can. 
   
   - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 
revealed that in transactions, when client receives only a control record(eg. 
an abort marker) in the `ShareFetchResponse` (without any non-control record), 
then in the `ShareCompletedFetch`, these control records are acknowledged with 
GAP (indicating the client is ignoring these control records) and are never 
presented to the consumer application.
   
   - Now these control records are auto acknowledged with `GAP` and will be 
sent on the next `ShareFetch`/`ShareAcknowledge` request. But as 
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, 
we actually ignore the fetch here(meaning we never acknowledge these control 
records) - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
   
   - Now for this PR, we have added any possible acknowledgements that came in 
with the empty fetch (from control records) to the `ShareFetchEvent` so that it 
can be sent on the next poll().
   
   - I agree it looks a bit odd though for readability. But yeah there is a 
case when this could happen.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to