lianetm commented on code in PR #19917: URL: https://github.com/apache/kafka/pull/19917#discussion_r2141084110
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -470,7 +475,19 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() } } - return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); + return convert(fetchable); + } + + private Map<Node, FetchSessionHandler.FetchRequestData> convert(Map<Node, FetchSessionHandler.Builder> fetchable) { + Map<Node, FetchSessionHandler.FetchRequestData> map = new HashMap<>(fetchable.size()); + + for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) { + Node node = entry.getKey(); + FetchSessionHandler.FetchRequestData fetchRequestData = entry.getValue().build(); + map.put(node, fetchRequestData); Review Comment: ```suggestion map.put(entry.getKey(), entry.getValue().build()); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -1135,7 +1134,9 @@ CompletableFuture<Void> revokePartitions(Set<TopicPartition> partitionsToRevoke) // Ensure the set of partitions to revoke are still assigned Set<TopicPartition> revokedPartitions = new HashSet<>(partitionsToRevoke); revokedPartitions.retainAll(subscriptions.assignedPartitions()); - log.info("Revoking previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + + if (log.isInfoEnabled()) + log.info("Revoking previously assigned partitions {}", Utils.topicPartitionString(revokedPartitions)); Review Comment: Here also I expect we can simplify and just print the set? Only difference is that it will include the brackets, which is ok I expect. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ########## @@ -166,9 +161,48 @@ public boolean contains(CompletableEvent<?> event) { } public List<CompletableEvent<?>> uncompletedEvents() { - return tracked.stream() - .filter(e -> !e.future().isDone()) - .collect(Collectors.toList()); + List<CompletableEvent<?>> events = new ArrayList<>(); + + for (CompletableEvent<?> event : tracked) { Review Comment: Should we add a comment saying that we're intentionally using for loop over the Streams API for perf, as this runs on every iteration of the background thread? (I'm afraid folks will be tempted to change this simple loops back to streams api) ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java: ########## @@ -77,11 +81,14 @@ public Exception invokePartitionsAssigned(final SortedSet<TopicPartition> assign } public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revokedPartitions) { - log.info("Revoke previously assigned partitions {}", revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + if (log.isInfoEnabled()) + log.info("Revoke previously assigned partitions {}", Utils.topicPartitionString(revokedPartitions)); + Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions(); revokePausedPartitions.retainAll(revokedPartitions); - if (!revokePausedPartitions.isEmpty()) - log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + + if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled()) + log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.topicPartitionString(revokePausedPartitions)); Review Comment: similar to above, if we are adding [] to the string of partitions, couldn't we just print the set? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -310,7 +311,9 @@ private void process(final AssignmentChangeEvent event) { manager.updateTimerAndMaybeCommit(event.currentTimeMs()); } - log.info("Assigned to partition(s): {}", event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + if (log.isInfoEnabled()) + log.info("Assigned to partition(s): {}", Utils.topicPartitionString(event.partitions())); Review Comment: ok with the change, even though this one is not on any hot-path I expect, right? (just API calls to consumer.assign that do a single trip to the background, no requests). ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java: ########## @@ -166,9 +161,48 @@ public boolean contains(CompletableEvent<?> event) { } public List<CompletableEvent<?>> uncompletedEvents() { - return tracked.stream() - .filter(e -> !e.future().isDone()) - .collect(Collectors.toList()); + List<CompletableEvent<?>> events = new ArrayList<>(); + + for (CompletableEvent<?> event : tracked) { + if (!event.future().isDone()) + events.add(event); + } + + return events; + } + + /** + * For all the {@link CompletableEvent}s in the collection, if they're not already complete, invoke + * {@link CompletableFuture#completeExceptionally(Throwable)}. + * + * @param events Collection of objects, assumed to be subclasses of {@link ApplicationEvent} or + * {@link BackgroundEvent}, but will only perform completion for any + * unfinished {@link CompletableEvent}s + * + * @return Number of events closed + */ + private long completeEventsExceptionallyOnClose(Collection<?> events) { + long count = 0; + + for (Object o : events) { + if (!(o instanceof CompletableEvent)) + continue; + + CompletableEvent<?> event = (CompletableEvent<?>) o; + + if (event.future().isDone()) + continue; + + TimeoutException error = new TimeoutException(String.format("%s could not be completed before the consumer closed", event.getClass().getSimpleName())); + + if (event.future().completeExceptionally(error)) { + log.debug("Event {} completed exceptionally since the consumer is closing", event); + count++; Review Comment: incrementing here is not exactly the same that it used to happen before, right? Before the PR, the count inc would happen after the `peek(expireEvent)`, so it would count events that were expired but not completedExceptionally (the ones going through line 202, where now we don't increment the count) This count doesn't seem used anyways, but better to be accurate and consistent with what it's done in the `reap` func (there the count includes all expired events, no matter if we completed them exceptionally or not) https://github.com/apache/kafka/blob/2b2552da1fbcdb687c99a03a81034c8f78e86871/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java#L115 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java: ########## @@ -414,8 +413,8 @@ public CompletableFuture<Void> signalPartitionsRevoked(Set<TopicPartition> parti private void logPausedPartitionsBeingRevoked(Set<TopicPartition> partitionsToRevoke) { Set<TopicPartition> revokePausedPartitions = subscriptions.pausedPartitions(); revokePausedPartitions.retainAll(partitionsToRevoke); - if (!revokePausedPartitions.isEmpty()) { - log.info("The pause flag in partitions [{}] will be removed due to revocation.", revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled()) { + log.info("The pause flag in partitions [{}] will be removed due to revocation.", Utils.topicPartitionString(revokePausedPartitions)); Review Comment: Given that er end up printing the string in between [], can't we simply print the set directly? ```suggestion log.info("The pause flag in partitions {} will be removed due to revocation.", revokePausedPartitions); ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java: ########## @@ -103,11 +114,14 @@ public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoked } public Exception invokePartitionsLost(final SortedSet<TopicPartition> lostPartitions) { - log.info("Lost previously assigned partitions {}", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + if (log.isInfoEnabled()) + log.info("Lost previously assigned partitions {}", Utils.topicPartitionString(lostPartitions)); + Set<TopicPartition> lostPausedPartitions = subscriptions.pausedPartitions(); lostPausedPartitions.retainAll(lostPartitions); - if (!lostPausedPartitions.isEmpty()) - log.info("The pause flag in partitions [{}] will be removed due to partition lost.", lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", "))); + + if (!lostPausedPartitions.isEmpty() && log.isInfoEnabled()) + log.info("The pause flag in partitions [{}] will be removed due to partition lost.", Utils.topicPartitionString(lostPartitions)); Review Comment: ditto ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -362,12 +368,18 @@ void cleanup() { * If there is a metadata error, complete all uncompleted events that require subscription metadata. */ private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) { - List<? extends CompletableApplicationEvent<?>> subscriptionMetadataEvent = events.stream() - .filter(e -> e instanceof CompletableApplicationEvent<?>) - .map(e -> (CompletableApplicationEvent<?>) e) - .filter(CompletableApplicationEvent::requireSubscriptionMetadata) - .collect(Collectors.toList()); - + List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new ArrayList<>(); + + for (CompletableEvent<?> ce : events) { + if (!(ce instanceof CompletableApplicationEvent)) + continue; + + CompletableApplicationEvent<?> cae = (CompletableApplicationEvent<?>) ce; + + if (cae.requireSubscriptionMetadata()) + subscriptionMetadataEvent.add(cae); Review Comment: maybe simplify ? ```suggestion if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata()) { subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org