[ https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15547089#comment-15547089 ]
Guozhang Wang commented on KAFKA-3559: -------------------------------------- Here are some more thoughts on this issue and how we can improve the situation: Currently with Kafka Streams each rebalance is expensive, even if it is only "partial" (i.e. only a few of the non-leader members in the consumer group has decided to re-join, which will not trigger a full rebalance but only will cause the coordinator to send back the assignment again), since anyways {{onPartitionRevoked}} and {{onPartitionAssigned}} will be triggered, closing and (re-)constructing the tasks. For example, on my local (a very small) laptop, with a complex topology containing 10+ stores and 15+ internal topics, with 3 threads on rebalance could take up to 20 seconds. On the other hand, we want to close the tasks in {{onPartitionRevoked}} before the synchronization barrier only because threads may hold some file locks related to these tasks. And since tasks are all committed right before closing, I think it is safe to delay the destruction of tasks so that we may be able to save the time of closing / reconstructing such tasks. More specifically: 1. In {{onPartitionRevoked}}, instead of closing the tasks, we only need to commit the tasks and "pause" them by calling their topology processor's newly added {{flush}} calls, releasing the corresponding file locks of the tasks: in fact, it is automatically done since we will not process any messages during the rebalance anyways. 2. Then in {{onPartitionAssigned}}, we can if there are any tasks that have really been migrated out of the thread; for those tasks, closing them (and note that since these tasks are already committed in {{onPartitionRevoked}}, closing them will only involve calling the topology processor's {{close}} function, as well as closing the state stores), otherwise "resume" processing. We need to think through some minor issues such as the above mentioned file locks for persistent state stores, how clean-up will work without introducing deadlocks, etc. But I think in general this solution should work. > Task creation time taking too long in rebalance callback > -------------------------------------------------------- > > Key: KAFKA-3559 > URL: https://issues.apache.org/jira/browse/KAFKA-3559 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Eno Thereska > Labels: architecture > Fix For: 0.10.2.0 > > > Currently in Kafka Streams, we create stream tasks upon getting newly > assigned partitions in rebalance callback function {code} onPartitionAssigned > {code}, which involves initialization of the processor state stores as well > (including opening the rocksDB, restore the store from changelog, etc, which > takes time). > With a large number of state stores, the initialization time itself could > take tens of seconds, which usually is larger than the consumer session > timeout. As a result, when the callback is completed, the consumer is already > treated as failed by the coordinator and rebalance again. > We need to consider if we can optimize the initialization process, or move it > out of the callback function, and while initializing the stores one-by-one, > use poll call to send heartbeats to avoid being kicked out by coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)