ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r435647575



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map<UUID, 
ClientState> clientState
                     .flatMap(Collection::stream)
                     .collect(Collectors.toList());
 
-            final Collection<TopicPartition> allPreexistingChangelogPartitions 
= new ArrayList<>(allChangelogPartitions);
-            allPreexistingChangelogPartitions.removeIf(partition -> 
newlyCreatedChangelogs.contains(partition.topic()));
+            final Set<TopicPartition> preexistingChangelogPartitions = new 
HashSet<>();
+            final Set<TopicPartition> preexistingSourceChangelogPartitions = 
new HashSet<>();
+            final Set<TopicPartition> newlyCreatedChangelogPartitions = new 
HashSet<>();
+            for (final TopicPartition changelog : allChangelogPartitions) {
+                if (newlyCreatedChangelogs.contains(changelog.topic())) {
+                    newlyCreatedChangelogPartitions.add(changelog);
+                } else if 
(optimizedSourceChangelogs.contains(changelog.topic())) {
+                    preexistingSourceChangelogPartitions.add(changelog);
+                } else {
+                    preexistingChangelogPartitions.add(changelog);
+                }
+            }
+
+            // Make the listOffsets request first so it can  fetch the offsets 
for non-source changelogs
+            // asynchronously while we use the blocking Consumer#committed 
call to fetch source-changelog offsets
+            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> 
endOffsetsFuture =
+                fetchEndOffsetsFuture(preexistingChangelogPartitions, 
adminClient);
 
-            final Collection<TopicPartition> 
allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-            
allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
+            final Map<TopicPartition, Long> sourceChangelogEndOffsets =
+                fetchCommittedOffsets(preexistingSourceChangelogPartitions, 
taskManager.mainConsumer());
 
-            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets =
-                fetchEndOffsets(allPreexistingChangelogPartitions, 
adminClient);
+            final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = 
ClientUtils.getEndOffsets(endOffsetsFuture);
 
-            allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, 
changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
+            allTaskEndOffsetSums = computeEndOffsetSumsByTask(
+                changelogsByStatefulTask,
+                endOffsets,
+                sourceChangelogEndOffsets,
+                newlyCreatedChangelogPartitions);
             fetchEndOffsetsSuccessful = true;
-        } catch (final StreamsException e) {
+        } catch (final StreamsException | TimeoutException e) {

Review comment:
       @vvcephei I've been wondering if maybe we should _only_  catch the 
TimeoutException, and interpret a StreamsException as fatal (like 
IllegalStateException for example). This is how we were using  
`Consumer#committed` in the StoreChangelogReader, and AFAICT that only throws 
KafkaException on "unrecoverable errors" (quoted from javadocs)
   But I can't tell whether the Admin's `listOffsets` might throw on transient 
errors, so I'm leaning towards catching both just to be safe. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to