vvcephei commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r438477719



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map<UUID, 
ClientState> clientState
                     .flatMap(Collection::stream)
                     .collect(Collectors.toList());
 
-            final Collection<TopicPartition> allPreexistingChangelogPartitions 
= new ArrayList<>(allChangelogPartitions);
-            allPreexistingChangelogPartitions.removeIf(partition -> 
newlyCreatedChangelogs.contains(partition.topic()));
+            final Set<TopicPartition> preexistingChangelogPartitions = new 
HashSet<>();
+            final Set<TopicPartition> preexistingSourceChangelogPartitions = 
new HashSet<>();
+            final Set<TopicPartition> newlyCreatedChangelogPartitions = new 
HashSet<>();
+            for (final TopicPartition changelog : allChangelogPartitions) {
+                if (newlyCreatedChangelogs.contains(changelog.topic())) {
+                    newlyCreatedChangelogPartitions.add(changelog);
+                } else if 
(optimizedSourceChangelogs.contains(changelog.topic())) {
+                    preexistingSourceChangelogPartitions.add(changelog);
+                } else {
+                    preexistingChangelogPartitions.add(changelog);
+                }
+            }
+
+            // Make the listOffsets request first so it can  fetch the offsets 
for non-source changelogs
+            // asynchronously while we use the blocking Consumer#committed 
call to fetch source-changelog offsets
+            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> 
endOffsetsFuture =
+                fetchEndOffsetsFuture(preexistingChangelogPartitions, 
adminClient);
 
-            final Collection<TopicPartition> 
allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-            
allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
+            final Map<TopicPartition, Long> sourceChangelogEndOffsets =
+                fetchCommittedOffsets(preexistingSourceChangelogPartitions, 
taskManager.mainConsumer());
 
-            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
-                fetchEndOffsets(allPreexistingChangelogPartitions, 
adminClient);
+            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = 
ClientUtils.getEndOffsets(endOffsetsFuture);
 
-            allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, 
changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
+            allTaskEndOffsetSums = computeEndOffsetSumsByTask(
+                changelogsByStatefulTask,
+                endOffsets,
+                sourceChangelogEndOffsets,
+                newlyCreatedChangelogPartitions);
             fetchEndOffsetsSuccessful = true;
-        } catch (final StreamsException e) {
+        } catch (final StreamsException | TimeoutException e) {

Review comment:
       That sounds reasonable, but I think if you throw an exception in the 
assignor, it just calls the assignor again in a tight loop, which seems worse 
than backing off and trying again later.
   
   If you want to propose this change, maybe you can verify what exactly 
happens if we throw. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to