lianetm commented on code in PR #19917:
URL: https://github.com/apache/kafka/pull/19917#discussion_r2141084110


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -470,7 +475,19 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> 
prepareFetchRequests()
             }
         }
 
-        return 
fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().build()));
+        return convert(fetchable);
+    }
+
+    private Map<Node, FetchSessionHandler.FetchRequestData> convert(Map<Node, 
FetchSessionHandler.Builder> fetchable) {
+        Map<Node, FetchSessionHandler.FetchRequestData> map = new 
HashMap<>(fetchable.size());
+
+        for (Map.Entry<Node, FetchSessionHandler.Builder> entry : 
fetchable.entrySet()) {
+            Node node = entry.getKey();
+            FetchSessionHandler.FetchRequestData fetchRequestData = 
entry.getValue().build();
+            map.put(node, fetchRequestData);

Review Comment:
   ```suggestion
               map.put(entry.getKey(), entry.getValue().build());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1135,7 +1134,9 @@ CompletableFuture<Void> 
revokePartitions(Set<TopicPartition> partitionsToRevoke)
         // Ensure the set of partitions to revoke are still assigned
         Set<TopicPartition> revokedPartitions = new 
HashSet<>(partitionsToRevoke);
         revokedPartitions.retainAll(subscriptions.assignedPartitions());
-        log.info("Revoking previously assigned partitions {}", 
revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+
+        if (log.isInfoEnabled())
+            log.info("Revoking previously assigned partitions {}", 
Utils.topicPartitionString(revokedPartitions));

Review Comment:
   Here also I expect we can simplify and just print the set? Only difference 
is that it will include the brackets, which is ok I expect.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -166,9 +161,48 @@ public boolean contains(CompletableEvent<?> event) {
     }
 
     public List<CompletableEvent<?>> uncompletedEvents() {
-        return tracked.stream()
-                .filter(e -> !e.future().isDone())
-                .collect(Collectors.toList());
+        List<CompletableEvent<?>> events = new ArrayList<>();
+
+        for (CompletableEvent<?> event : tracked) {

Review Comment:
   Should we add a comment saying that we're intentionally using for loop over 
the Streams API for perf, as this runs on every iteration of the background 
thread? (I'm afraid folks will be tempted to change this simple loops back to 
streams api)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##########
@@ -77,11 +81,14 @@ public Exception invokePartitionsAssigned(final 
SortedSet<TopicPartition> assign
     }
 
     public Exception invokePartitionsRevoked(final SortedSet<TopicPartition> 
revokedPartitions) {
-        log.info("Revoke previously assigned partitions {}", 
revokedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+        if (log.isInfoEnabled())
+            log.info("Revoke previously assigned partitions {}", 
Utils.topicPartitionString(revokedPartitions));
+
         Set<TopicPartition> revokePausedPartitions = 
subscriptions.pausedPartitions();
         revokePausedPartitions.retainAll(revokedPartitions);
-        if (!revokePausedPartitions.isEmpty())
-                log.info("The pause flag in partitions [{}] will be removed 
due to revocation.", 
revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+
+        if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled())
+            log.info("The pause flag in partitions [{}] will be removed due to 
revocation.", Utils.topicPartitionString(revokePausedPartitions));

Review Comment:
   similar to above, if we are adding [] to the string of partitions, couldn't 
we just print the set?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -310,7 +311,9 @@ private void process(final AssignmentChangeEvent event) {
             manager.updateTimerAndMaybeCommit(event.currentTimeMs());
         }
 
-        log.info("Assigned to partition(s): {}", 
event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+        if (log.isInfoEnabled())
+            log.info("Assigned to partition(s): {}", 
Utils.topicPartitionString(event.partitions()));

Review Comment:
   ok with the change, even though this one is not on any hot-path I expect, 
right? (just API calls to consumer.assign that do a single trip to the 
background, no requests). 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java:
##########
@@ -166,9 +161,48 @@ public boolean contains(CompletableEvent<?> event) {
     }
 
     public List<CompletableEvent<?>> uncompletedEvents() {
-        return tracked.stream()
-                .filter(e -> !e.future().isDone())
-                .collect(Collectors.toList());
+        List<CompletableEvent<?>> events = new ArrayList<>();
+
+        for (CompletableEvent<?> event : tracked) {
+            if (!event.future().isDone())
+                events.add(event);
+        }
+
+        return events;
+    }
+
+    /**
+     * For all the {@link CompletableEvent}s in the collection, if they're not 
already complete, invoke
+     * {@link CompletableFuture#completeExceptionally(Throwable)}.
+     *
+     * @param events Collection of objects, assumed to be subclasses of {@link 
ApplicationEvent} or
+     *               {@link BackgroundEvent}, but will only perform completion 
for any
+     *               unfinished {@link CompletableEvent}s
+     *
+     *  @return Number of events closed
+     */
+    private long completeEventsExceptionallyOnClose(Collection<?> events) {
+        long count = 0;
+
+        for (Object o : events) {
+            if (!(o instanceof CompletableEvent))
+                continue;
+
+            CompletableEvent<?> event = (CompletableEvent<?>) o;
+
+            if (event.future().isDone())
+                continue;
+
+            TimeoutException error = new TimeoutException(String.format("%s 
could not be completed before the consumer closed", 
event.getClass().getSimpleName()));
+
+            if (event.future().completeExceptionally(error)) {
+                log.debug("Event {} completed exceptionally since the consumer 
is closing", event);
+                count++;

Review Comment:
   incrementing here is not exactly the same that it used to happen before, 
right? 
   
   Before the PR, the count inc would happen after the `peek(expireEvent)`, so 
it would count events that were expired but not completedExceptionally (the 
ones going through line 202, where now we don't increment the count)
   
   This count doesn't seem used anyways, but better to be accurate and 
consistent with what it's done in the `reap` func (there the count includes all 
expired events, no matter if we completed them exceptionally or not) 
https://github.com/apache/kafka/blob/2b2552da1fbcdb687c99a03a81034c8f78e86871/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java#L115
  



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java:
##########
@@ -414,8 +413,8 @@ public CompletableFuture<Void> 
signalPartitionsRevoked(Set<TopicPartition> parti
     private void logPausedPartitionsBeingRevoked(Set<TopicPartition> 
partitionsToRevoke) {
         Set<TopicPartition> revokePausedPartitions = 
subscriptions.pausedPartitions();
         revokePausedPartitions.retainAll(partitionsToRevoke);
-        if (!revokePausedPartitions.isEmpty()) {
-            log.info("The pause flag in partitions [{}] will be removed due to 
revocation.", 
revokePausedPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+        if (!revokePausedPartitions.isEmpty() && log.isInfoEnabled()) {
+            log.info("The pause flag in partitions [{}] will be removed due to 
revocation.", Utils.topicPartitionString(revokePausedPartitions));

Review Comment:
   Given that er end up printing the string in between [], can't we simply 
print the set directly?
   ```suggestion
               log.info("The pause flag in partitions {} will be removed due to 
revocation.", revokePausedPartitions);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##########
@@ -103,11 +114,14 @@ public Exception invokePartitionsRevoked(final 
SortedSet<TopicPartition> revoked
     }
 
     public Exception invokePartitionsLost(final SortedSet<TopicPartition> 
lostPartitions) {
-        log.info("Lost previously assigned partitions {}", 
lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+        if (log.isInfoEnabled())
+            log.info("Lost previously assigned partitions {}", 
Utils.topicPartitionString(lostPartitions));
+
         Set<TopicPartition> lostPausedPartitions = 
subscriptions.pausedPartitions();
         lostPausedPartitions.retainAll(lostPartitions);
-        if (!lostPausedPartitions.isEmpty())
-            log.info("The pause flag in partitions [{}] will be removed due to 
partition lost.", 
lostPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+
+        if (!lostPausedPartitions.isEmpty() && log.isInfoEnabled())
+            log.info("The pause flag in partitions [{}] will be removed due to 
partition lost.", Utils.topicPartitionString(lostPartitions));

Review Comment:
   ditto



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -362,12 +368,18 @@ void cleanup() {
      * If there is a metadata error, complete all uncompleted events that 
require subscription metadata.
      */
     private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
-        List<? extends CompletableApplicationEvent<?>> 
subscriptionMetadataEvent = events.stream()
-                .filter(e -> e instanceof CompletableApplicationEvent<?>)
-                .map(e -> (CompletableApplicationEvent<?>) e)
-                
.filter(CompletableApplicationEvent::requireSubscriptionMetadata)
-                .collect(Collectors.toList());
-        
+        List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new 
ArrayList<>();
+
+        for (CompletableEvent<?> ce : events) {
+            if (!(ce instanceof CompletableApplicationEvent))
+                continue;
+
+            CompletableApplicationEvent<?> cae = 
(CompletableApplicationEvent<?>) ce;
+
+            if (cae.requireSubscriptionMetadata())
+                subscriptionMetadataEvent.add(cae);

Review Comment:
   maybe simplify ?
   ```suggestion
               if (ce instanceof CompletableApplicationEvent && 
((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata()) {
                   
subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce);
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to