[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302167#comment-17302167
 ] 

A. Sophie Blee-Goldman edited comment on KAFKA-12463 at 3/16/21, 2:51 AM:
--------------------------------------------------------------------------

Isn't the upgrade path for Connect identical to the one for the Consumer? 
There's no mention of Connect in KIP-429 because that KIP focuses on 
cooperative rebalancing for the plain Consumer and Kafka Streams, while Connect 
had its own separate KIP and protocol for incremental cooperative rebalancing. 
But if Connect relies on the same underlying consumer group protocol, then it 
should apply as well.

-That said, I just re-read the Consumer upgrade section and it's got some 
misinformation. Seems to be implying that if you have mixed assignor protocols, 
you can end up with some consumers choosing an EAGER assignor while others 
choose the COOPERATIVE one, which is just...not true. Fake News!-

-Anyways thanks for raising this, I'll go clean it up. That's also a good point 
about one vs two rolling bounces: technically you don't _need_ the second 
rolling bounce if you listed the CooperativeStickyAssignor first in the 
preferred list on the initial rolling bounce. I guess the intention was to 
leave the consumer group in a stable state, so if for example you deployed a 
new consumer without explicitly configuring it to use the 
CooperativeStickyAssignor then it would fail rather than silently switch back 
to EAGER rebalancing. But it's not required. I'll go clarify that in the KIP as 
well.-


was (Author: ableegoldman):
Isn't the upgrade path for Connect identical to the one for the Consumer? 
There's no mention of Connect in KIP-429 because that KIP focuses on 
cooperative rebalancing for the plain Consumer and Kafka Streams, while Connect 
had its own separate KIP and protocol for incremental cooperative rebalancing. 
But if Connect relies on the same underlying consumer group protocol, then it 
should apply as well.

That said, I just re-read the Consumer upgrade section and it's got some 
misinformation. Seems to be implying that if you have mixed assignor protocols, 
you can end up with some consumers choosing an EAGER assignor while others 
choose the COOPERATIVE one, which is just...not true. Fake News!

Anyways thanks for raising this, I'll go clean it up. That's also a good point 
about one vs two rolling bounces: technically you don't _need_ the second 
rolling bounce if you listed the CooperativeStickyAssignor first in the 
preferred list on the initial rolling bounce. I guess the intention was to 
leave the consumer group in a stable state, so if for example you deployed a 
new consumer without explicitly configuring it to use the 
CooperativeStickyAssignor then it would fail rather than silently switch back 
to EAGER rebalancing. But it's not required. I'll go clarify that in the KIP as 
well.

> Update default consumer partition assignor for sink tasks
> ---------------------------------------------------------
>
>                 Key: KAFKA-12463
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12463
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
>  
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by setting the {{partition.assignment.strategy}} property of 
> sink task consumers to the list 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}} when no worker-level or 
> connector-level override is present.
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
>  
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only be applied to the Connect framework in an upcoming minor 
> release. This does not preclude users from following the steps outlined here 
> to improve sink connector behavior on existing clusters by modifying their 
> worker configs to use {{consumer.partition.assignment.strategy =}} 
> {{org.apache.kafka.clients.consumer.CooperativeStickyAssignor, 
> org.apache.kafka.clients.consumer.RangeAssignor}}, or doing the same on a 
> per-connector basis using the 
> {{consumer.override.partition.assignment.strategy}} property.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to