[
https://issues.apache.org/jira/browse/KAFKA-15640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17921564#comment-17921564
]
Jun Rao commented on KAFKA-15640:
---------------------------------
The first issue. This doesn't seem to be a real issue since
AbstractFetch.fetchablePartitions() calls subscriptions.fetchablePartitions(),
which ignores paused partitions.
The second issue. This is not an issue either since
subscriptions.fetchablePartitions() checks nextInLineFetch.isConsumed() when
deciding if it's a buffered partition.
> Refactor CompletedFetch initialization
> --------------------------------------
>
> Key: KAFKA-15640
> URL: https://issues.apache.org/jira/browse/KAFKA-15640
> Project: Kafka
> Issue Type: Improvement
> Components: clients, consumer
> Reporter: Kirk True
> Assignee: Kirk True
> Priority: Major
> Labels: fetcher
>
> The interaction between {{{}FetchBuffer{}}}, {{{}FetchCollector{}}}, and
> {{CompletedFetch}} is awkward, to say the least. Per [~junrao]'s comments
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1349361459] and
> [here|https://github.com/apache/kafka/pull/14406#discussion_r1350773132],
> there are three issues...
>
> First:
> {quote}{color:#172b4d}This is an existing issue. But the way we handle paused
> partitions in {{collectFetch}} seems problematic. The application thread
> first calls {{fetchBuffer.setNextInLineFetch(null)}} and then calls
> {{{}fetchBuffer.addAll(pausedCompletedFetches){}}}. This could leave a brief
> window where the paused partition is not included in either
> {{nextInLineFetch}} or {{{}completedFetches{}}}. If the background thread
> kicks in in that window, it could have fetched another chunk for that
> partition and added the response back to FetchBuffer. This would violate the
> assumption there is no more than one pending {{CompletedFetch}} per partition
> in FetchBuffer and could cause records returned not in offset order or
> duplicates to be returned.{color}
> {quote}
>
> {color:#172b4d}Second:{color}
> {quote}{color:#172b4d}The second existing issue is on the
> {{fetchBuffer.setNextInLineFetch}} call in {{{}collectFetch{}}}. The issue is
> that after all records are drained from {{{}nextInLineFetch{}}}. We only call
> {{setNextInLineFetch}} when there is a new {{{}completedFetch{}}}. However,
> until the drained {{completedFetch}} is removed from {{{}nextInLineFetch{}}},
> the background thread can't fetch the next chunk. So, it seems that we will
> just be stuck here.{color}
> {quote}
>
> {color:#172b4d}Third:{color}
> {quote}{color:#172b4d}Currently, {{fetchBuffer.setNextInLineFetch}} and
> {{fetchBuffer.poll}} are separate operations and we expect the caller to call
> them in the right order to avoid a partition missing in FetchBuffer in the
> transition phase. It still leaves us with the situation that a partition
> could be in both completedFetches and nextInLineFetch at a particular time.
> It's not a problem for now, but it may be in the future. Could we make them
> an atomic operation? If not, could we add a comment to document the correct
> usage of the api and the impact on partition being duplicated in
> completedFetches and nextInLineFetch?{color}
> {quote}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)