vvcephei commented on a change in pull request #8677: URL: https://github.com/apache/kafka/pull/8677#discussion_r428152062
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -171,7 +173,7 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams "This can happen if the Kafka cluster is temporary not available. " + "You can increase admin client config `retries` to be resilient against this error.", retries); log.error(timeoutAndRetryError); - throw new StreamsException(timeoutAndRetryError); + throw new TaskMigratedException("Time out for creating internal topics", new TimeoutException(timeoutAndRetryError)); Review comment: I was tempted to say we should just return an empty assignment, which would prompt everyone to rejoin again immediately, but I think the FallbackPriorTaskAssignor is a preferable alternative. IIUC, we should be able to rely on the precondition that any previously assigned tasks we correctly initialized _before_ they were assigned initially, right? So we know they are all safe to keep working (if possible) while we wait a suitable backoff period before trying to create these topics again. I could see the idea to instead just remove any tasks we couldn't initialize instead of calling the FallbackPriorTaskAssignor, but if I'm reading this code right, we might just have failed to verify that the topics exist, not only fail to create topics we know didn't exist. So, we might actually remove tasks that were previously assigned if we do this. It's not clear which strategy is better, since it would depend on the exact nature of the failure, but maybe at a very high level, it's better to continue processing existing work and delay starting new work than potentially to start new work but delay processing existing work. Or we could try for the "best of both worlds", where we assign the union of all previously assigned tasks and any new tasks we _were_ able to set up. Finally, even if we re-assign previously assigned tasks, I'm not sure if we actually need/want to use the FallbackPriorTaskAssignor in particular. There doesn't seem to be anything wrong with just computing a new assignment for a subset of the tasks while we also schedule a re-attempt to set up the rest of the tasks after a back-off period. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org