ShivsundarR commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ########## @@ -644,8 +645,12 @@ private ShareFetch<K, V> collect(Map<TopicIdPartition, NodeAcknowledgements> ack if (currentFetch.isEmpty()) { final ShareFetch<K, V> fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { + // Check for any acknowledgements which could have come from control records (GAP) and include them. + Map<TopicIdPartition, NodeAcknowledgements> combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap); + combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords()); + // Fetch more records and send any waiting acknowledgements - applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap)); + applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements)); Review Comment: Yes :)) turns out it can. - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 revealed that in transactions, when client receives only a control record(eg. an abort marker) in the `ShareFetchResponse` (without any non-control record), then in the `ShareCompletedFetch`, these control records are never acknowledged(ideally acknowledged with GAP, indicating the client is ignoring these control records) and are never presented to the consumer application. - It is expected that control records are skipped and are not presented to the application, but client should still acknowledge them with GAP (https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33) - Now these control records are usually auto acknowledged with `GAP` and will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as `fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598 - Now after this PR, any possible acknowledgements that came in with the empty fetch (from control records) to the `ShareFetchEvent` are added so that it can be sent on the next poll(). - I agree it looks a bit odd though for readability. But yeah there is a case when this could happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org