ableegoldman commented on a change in pull request #8677:
URL: https://github.com/apache/kafka/pull/8677#discussion_r426083712



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -171,7 +173,7 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
                 "This can happen if the Kafka cluster is temporary not 
available. " +
                 "You can increase admin client config `retries` to be 
resilient against this error.", retries);
             log.error(timeoutAndRetryError);
-            throw new StreamsException(timeoutAndRetryError);
+            throw new TaskMigratedException("Time out for creating internal 
topics", new TimeoutException(timeoutAndRetryError));

Review comment:
       For 441 we added a `nextScheduledRebalance` field to the assignment in 
order to signal when a followup rebalance is needed. Can we leverage that here 
as well so we don't have to go through the whole ordeal of `onPartitionsLost`?
   Check out the call to `fetchEndOffsets`in 
`StreamsPartitionAssignor#populateClientStatesMap`  where we schedule a 
followup rebalance on the leader if the `listOffsets` request fails. I think we 
can reuse the same logic/code path and keep track of a general flag like 
`adminClientRequestSuccessful` so the assignor can still finish the assignment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to