[
https://issues.apache.org/jira/browse/KAFKA-12330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rajini Sivaram resolved KAFKA-12330.
------------------------------------
Fix Version/s: 3.0.0
Resolution: Fixed
> FetchSessionCache may cause starvation for partitions when FetchResponse is
> full
> --------------------------------------------------------------------------------
>
> Key: KAFKA-12330
> URL: https://issues.apache.org/jira/browse/KAFKA-12330
> Project: Kafka
> Issue Type: Bug
> Reporter: Lucas Bradstreet
> Assignee: David Jacot
> Priority: Major
> Fix For: 3.0.0
>
>
> The incremental FetchSessionCache sessions deprioritizes partitions where a
> response is returned. This may happen if log metadata such as log start
> offset, hwm, etc is returned, or if data for that partition is returned.
> When a fetch response fills to maxBytes, data may not be returned for
> partitions even if the fetch offset is lower than the fetch upper bound.
> However, the fetch response will still contain updates to metadata such as
> hwm if that metadata has changed. This can lead to degenerate behavior where
> a partition's hwm or log start offset is updated resulting in the next fetch
> being unnecessarily skipped for that partition. At first this appeared to be
> worse, as hwm updates occur frequently, but starvation should result in hwm
> movement becoming blocked, allowing a fetch to go through and then becoming
> unstuck. However, it'll still require one more fetch request than necessary
> to do so. Consumers may be affected more than replica fetchers, however they
> often remove partitions with fetched data from the next fetch request and
> this may be helping prevent starvation.
> I believe we should only reorder the partition fetch priority if data is
> actually returned for a partition.
> {noformat}
> private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
> val updateFetchContextAndRemoveUnselected:
> Boolean)
> extends FetchSession.RESP_MAP_ITER {
> var nextElement: util.Map.Entry[TopicPartition,
> FetchResponse.PartitionData[Records]] = null
> override def hasNext: Boolean = {
> while ((nextElement == null) && iter.hasNext) {
> val element = iter.next()
> val topicPart = element.getKey
> val respData = element.getValue
> val cachedPart = session.partitionMap.find(new
> CachedPartition(topicPart))
> val mustRespond = cachedPart.maybeUpdateResponseData(respData,
> updateFetchContextAndRemoveUnselected)
> if (mustRespond) {
> nextElement = element
> // Example POC change:
> // Don't move partition to end of queue if we didn't actually fetch
> data
> // This should help avoid starvation even when we are filling the
> fetch response fully while returning metadata for these partitions
> if (updateFetchContextAndRemoveUnselected && respData.records != null
> && respData.records.sizeInBytes > 0) {
> session.partitionMap.remove(cachedPart)
> session.partitionMap.mustAdd(cachedPart)
> }
> } else {
> if (updateFetchContextAndRemoveUnselected) {
> iter.remove()
> }
> }
> }
> nextElement != null
> }{noformat}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)