ijuma commented on a change in pull request #7222:
URL: https://github.com/apache/kafka/pull/7222#discussion_r469307691
##########
File path:
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -639,26 +643,34 @@ public synchronized OffsetResetStrategy
resetStrategy(TopicPartition partition)
}
public synchronized boolean hasAllFetchPositions() {
- return assignment.stream().allMatch(state ->
state.value().hasValidPosition());
+ // Since this is in the hot-path for fetching, we do this instead of
using java.util.stream API
+ Iterator<TopicPartitionState> it = assignment.stateIterator();
+ while (it.hasNext()) {
+ if (!it.next().hasValidPosition()) {
+ return false;
+ }
+ }
+ return true;
Review comment:
@mumrah I now understand why the previous version was slower, it was
allocating a PartitionState instance per element. But we only use the value
here. So, we could still use `allMatch` without the performance penalty.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]