[ 
https://issues.apache.org/jira/browse/KAFKA-9048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet updated KAFKA-9048:
------------------------------------
    Description: 
https://issues.apache.org/jira/browse/KAFKA-9039 
([https://github.com/apache/kafka/pull/7443]) improves the performance of the 
replica fetcher (at both small and large numbers of partitions), but it does 
not improve its complexity or scalability in the number of partitions.

I took a profile using async-profiler for the 1000 partition JMH replica 
fetcher benchmark. The big remaining culprits are:
 * ~18% looking up logStartOffset
 * ~45% FetchSessionHandler$Builder.add
 * ~19% FetchSessionHandler$Builder.build

*Suggestions*
 # The logStartOffset is looked up for every partition on each doWork pass. 
This requires a hashmap lookup even though the logStartOffset changes rarely. 
If the replica fetcher could be notified of updates to the logStartOffset, then 
we could reduce the overhead to a function of the number of updates to the 
logStartOffset instead of O( n ) on each pass.
 # The use of FetchSessionHandler means that we maintain a partitionStates 
hashmap in the replica fetcher, and a sessionPartitions hashmap in the 
FetchSessionHandler. On each incremental fetch session pass, we need to 
reconcile these two hashmaps to determine which partitions were added/updated 
and which partitions were removed. This reconciliation process is especially 
expensive, requiring multiple passes over the fetching partitions, and hashmap 
remove and puts for most partitions. The replica fetcher could be smarter by 
maintaining the fetch session *updated* hashmap containing 
FetchRequest.PartitionData(s) directly, as well as *removed* partitions list so 
that these do not need to be generated by reconciled on each fetch pass.
 # maybeTruncate requires an O( n ) pass over the elements in partitionStates 
even if there are no partitions in truncating state. If we can maintain some 
additional state about whether truncating partitions exist in partitionStates, 
or if we could separate these states into a separate data structure, we would 
not need to iterate across all partitions on every doWork pass. I’ve seen 
clusters where this work takes about 0.5%-1% of CPU, which is minor but will 
become more substantial as the number of partitions increases.

  was:
https://issues.apache.org/jira/browse/KAFKA-9039 
([https://github.com/apache/kafka/pull/7443]) improves the performance of the 
replica fetcher (at both small and large numbers of partitions), but it does 
not improve its complexity or scalability in the number of partitions.

I took a profile using async-profiler for the 1000 partition JMH replica 
fetcher benchmark. The big remaining culprits are:
 * ~18% looking up logStartOffset
 * ~45% FetchSessionHandler$Builder.add
 * ~19% FetchSessionHandler$Builder.build

*Suggestions*
 # The logStartOffset is looked up for every partition on each doWork pass. 
This requires a hashmap lookup even though the logStartOffset changes rarely. 
If the replica fetcher could be notified of updates to the logStartOffset, then 
we could reduce the overhead to a function of the number of updates to the 
logStartOffset instead of O(n) on each pass.
 # The use of FetchSessionHandler means that we maintain a partitionStates 
hashmap in the replica fetcher, and a sessionPartitions hashmap in the 
FetchSessionHandler. On each incremental fetch session pass, we need to 
reconcile these two hashmaps to determine which partitions were added/updated 
and which partitions were removed. This reconciliation process is especially 
expensive, requiring multiple passes over the fetching partitions, and hashmap 
remove and puts for most partitions. The replica fetcher could be smarter by 
maintaining the fetch session *updated* hashmap containing 
FetchRequest.PartitionData(s) directly, as well as *removed* partitions list so 
that these do not need to be generated by reconciled on each fetch pass.
 # maybeTruncate requires an O(n) pass over the elements in partitionStates 
even if there are no partitions in truncating state. If we can maintain some 
additional state about whether truncating partitions exist in partitionStates, 
or if we could separate these states into a separate data structure, we would 
not need to iterate across all partitions on every doWork pass. I’ve seen 
clusters where this work takes about 0.5%-1% of CPU, which is minor but will 
become more substantial as the number of partitions increases.


> Improve scalability in number of partitions in replica fetcher
> --------------------------------------------------------------
>
>                 Key: KAFKA-9048
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9048
>             Project: Kafka
>          Issue Type: Task
>          Components: core
>            Reporter: Lucas Bradstreet
>            Priority: Major
>
> https://issues.apache.org/jira/browse/KAFKA-9039 
> ([https://github.com/apache/kafka/pull/7443]) improves the performance of the 
> replica fetcher (at both small and large numbers of partitions), but it does 
> not improve its complexity or scalability in the number of partitions.
> I took a profile using async-profiler for the 1000 partition JMH replica 
> fetcher benchmark. The big remaining culprits are:
>  * ~18% looking up logStartOffset
>  * ~45% FetchSessionHandler$Builder.add
>  * ~19% FetchSessionHandler$Builder.build
> *Suggestions*
>  # The logStartOffset is looked up for every partition on each doWork pass. 
> This requires a hashmap lookup even though the logStartOffset changes rarely. 
> If the replica fetcher could be notified of updates to the logStartOffset, 
> then we could reduce the overhead to a function of the number of updates to 
> the logStartOffset instead of O( n ) on each pass.
>  # The use of FetchSessionHandler means that we maintain a partitionStates 
> hashmap in the replica fetcher, and a sessionPartitions hashmap in the 
> FetchSessionHandler. On each incremental fetch session pass, we need to 
> reconcile these two hashmaps to determine which partitions were added/updated 
> and which partitions were removed. This reconciliation process is especially 
> expensive, requiring multiple passes over the fetching partitions, and 
> hashmap remove and puts for most partitions. The replica fetcher could be 
> smarter by maintaining the fetch session *updated* hashmap containing 
> FetchRequest.PartitionData(s) directly, as well as *removed* partitions list 
> so that these do not need to be generated by reconciled on each fetch pass.
>  # maybeTruncate requires an O( n ) pass over the elements in partitionStates 
> even if there are no partitions in truncating state. If we can maintain some 
> additional state about whether truncating partitions exist in 
> partitionStates, or if we could separate these states into a separate data 
> structure, we would not need to iterate across all partitions on every doWork 
> pass. I’ve seen clusters where this work takes about 0.5%-1% of CPU, which is 
> minor but will become more substantial as the number of partitions increases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to