[ 
https://issues.apache.org/jira/browse/KAFKA-3559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15547089#comment-15547089
 ] 

Guozhang Wang commented on KAFKA-3559:
--------------------------------------

Here are some more thoughts on this issue and how we can improve the situation:

Currently with Kafka Streams each rebalance is expensive, even if it is only 
"partial" (i.e. only a few of the non-leader members in the consumer group has 
decided to re-join, which will not trigger a full rebalance but only will cause 
the coordinator to send back the assignment again), since anyways 
{{onPartitionRevoked}} and {{onPartitionAssigned}} will be triggered, closing 
and (re-)constructing the tasks. For example, on my local (a very small) 
laptop, with a complex topology containing 10+ stores and 15+ internal topics, 
with 3 threads on rebalance could take up to 20 seconds.

On the other hand, we want to close the tasks in {{onPartitionRevoked}} before 
the synchronization barrier only because threads may hold some file locks 
related to these tasks. And since tasks are all committed right before closing, 
I think it is safe to delay the destruction of tasks so that we may be able to 
save the time of closing / reconstructing such tasks. More specifically:

1. In {{onPartitionRevoked}}, instead of closing the tasks, we only need to 
commit the tasks and "pause" them by calling their topology processor's newly 
added {{flush}} calls, releasing the corresponding file locks of the tasks: in 
fact, it is automatically done since we will not process any messages during 
the rebalance anyways.
2. Then in {{onPartitionAssigned}}, we can if there are any tasks that have 
really been migrated out of the thread; for those tasks, closing them (and note 
that since these tasks are already committed in {{onPartitionRevoked}}, closing 
them will only involve calling the topology processor's {{close}} function, as 
well as closing the state stores), otherwise "resume" processing.

We need to think through some minor issues such as the above mentioned file 
locks for persistent state stores, how clean-up will work without introducing 
deadlocks, etc. But I think in general this solution should work.

> Task creation time taking too long in rebalance callback
> --------------------------------------------------------
>
>                 Key: KAFKA-3559
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3559
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Eno Thereska
>              Labels: architecture
>             Fix For: 0.10.2.0
>
>
> Currently in Kafka Streams, we create stream tasks upon getting newly 
> assigned partitions in rebalance callback function {code} onPartitionAssigned 
> {code}, which involves initialization of the processor state stores as well 
> (including opening the rocksDB, restore the store from changelog, etc, which 
> takes time).
> With a large number of state stores, the initialization time itself could 
> take tens of seconds, which usually is larger than the consumer session 
> timeout. As a result, when the callback is completed, the consumer is already 
> treated as failed by the coordinator and rebalance again.
> We need to consider if we can optimize the initialization process, or move it 
> out of the callback function, and while initializing the stores one-by-one, 
> use poll call to send heartbeats to avoid being kicked out by coordinator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to