vvcephei commented on a change in pull request #8677:
URL: https://github.com/apache/kafka/pull/8677#discussion_r428152062



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -171,7 +173,7 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
                 "This can happen if the Kafka cluster is temporary not 
available. " +
                 "You can increase admin client config `retries` to be 
resilient against this error.", retries);
             log.error(timeoutAndRetryError);
-            throw new StreamsException(timeoutAndRetryError);
+            throw new TaskMigratedException("Time out for creating internal 
topics", new TimeoutException(timeoutAndRetryError));

Review comment:
       I was tempted to say we should just return an empty assignment, which 
would prompt everyone to rejoin again immediately, but I think the 
FallbackPriorTaskAssignor is a preferable alternative.
   
   IIUC, we should be able to rely on the precondition that any previously 
assigned tasks we correctly initialized _before_ they were assigned initially, 
right? So we know they are all safe to keep working (if possible) while we wait 
a suitable backoff period before trying to create these topics again.
   
   I could see the idea to instead just remove any tasks we couldn't initialize 
instead of calling the FallbackPriorTaskAssignor, but if I'm reading this code 
right, we might just have failed to verify that the topics exist, not only fail 
to create topics we know didn't exist. So, we might actually remove tasks that 
were previously assigned if we do this.
   
   It's not clear which strategy is better, since it would depend on the exact 
nature of the failure, but maybe at a very high level, it's better to continue 
processing existing work and delay starting new work than potentially to start 
new work but delay processing existing work.
   
   Or we could try for the "best of both worlds", where we assign the union of 
all previously assigned tasks and any new tasks we _were_ able to set up.
   
   Finally, even if we re-assign previously assigned tasks, I'm not sure if we 
actually need/want to use the FallbackPriorTaskAssignor in particular. There 
doesn't seem to be anything wrong with just computing a new assignment for a 
subset of the tasks while we also schedule a re-attempt to set up the rest of 
the tasks after a back-off period.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to