[
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Jacot resolved KAFKA-14639.
---------------------------------
Fix Version/s: 3.5.0
Reviewer: David Jacot
Resolution: Fixed
> Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance
> cycle
> --------------------------------------------------------------------------------
>
> Key: KAFKA-14639
> URL: https://issues.apache.org/jira/browse/KAFKA-14639
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Affects Versions: 3.2.1
> Reporter: Bojan Blagojevic
> Assignee: Philip Nee
> Priority: Major
> Fix For: 3.5.0
>
> Attachments: consumers-jira.log
>
>
> I have an application that runs 6 consumers in parallel. I am getting some
> unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I
> understand the mechanism correctly, if the consumer looses partition in one
> rebalance cycle, the partition should be assigned in the next rebalance cycle.
> This assumption is based on the
> [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
> documentation and few blog posts that describe the protocol, like [this
> one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
> on Confluent blog.
> {quote}The assignor should not reassign any owned partitions immediately, but
> instead may indicate consumers the need for partition revocation so that the
> revoked partitions can be reassigned to other consumers in the next rebalance
> event. This is designed for sticky assignment logic which attempts to
> minimize partition reassignment with cooperative adjustments.
> {quote}
> {quote}Any member that revoked partitions then rejoins the group, triggering
> a second rebalance so that its revoked partitions can be assigned. Until
> then, these partitions are unowned and unassigned.
> {quote}
> These are the logs from the application that uses
> {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle
> ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to
> {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4
> consumers.
> Mind that the log is in reverse(bottom to top)
> {code:java}
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New
> partition assignment: partition-59, seek to min common offset: 85120524
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions
> [partition-59] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions
> assigned: [partition-59]
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-3-my-client-id-my-group-id,
> groupId=my-group-id] Adding newly assigned partitions: partition-59
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-3-my-client-id-my-group-id,
> groupId=my-group-id] Notifying assignor about the new
> Assignment(partitions=[partition-59])
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-3-my-client-id-my-group-id,
> groupId=my-group-id] Request joining group due to: need to revoke partitions
> [partition-26, partition-74] as indicated by the current assignment and
> re-join
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions
> [partition-26, partition-74] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished
> removing partition data
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-4-my-client-id-my-group-id,
> groupId=my-group-id] (Re-)joining group
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New
> partition assignment: partition-74, seek to min common offset: 107317730
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions
> [partition-74] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions
> assigned: [partition-74]
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-4-my-client-id-my-group-id,
> groupId=my-group-id] Adding newly assigned partitions: partition-74
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-4-my-client-id-my-group-id,
> groupId=my-group-id] Notifying assignor about the new
> Assignment(partitions=[partition-74])
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-4-my-client-id-my-group-id,
> groupId=my-group-id] Request joining group due to: need to revoke partitions
> [partition-57] as indicated by the current assignment and re-join
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions
> [partition-57] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Finished
> removing partition data
> 2022-12-14 11:18:22 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions
> revoked: [partition-26, partition-74]
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-3-my-client-id-my-group-id,
> groupId=my-group-id] Revoke previously assigned partitions partition-26,
> partition-74
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-3-my-client-id-my-group-id,
> groupId=my-group-id] Updating assignment with\n\tAssigned partitions:
> [partition-59]\n\tCurrent owned partitions: [partition-26,
> partition-74]\n\tAdded partitions (assigned - owned):
> [partition-59]\n\tRevoked partitions (owned - assigned): [partition-26,
> partition-74]
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-3-my-client-id-my-group-id,
> groupId=my-group-id] Successfully synced group in generation
> Generation{generationId=640,
> memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
> protocol='cooperative-sticky'}
> 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-3-my-client-id-my-group-id,
> groupId=my-group-id] Successfully joined group with generation
> Generation{generationId=640,
> memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af',
> protocol='cooperative-sticky'}
> 2022-12-14 11:18:22 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions
> revoked: [partition-57]
> 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-4-my-client-id-my-group-id,
> groupId=my-group-id] Revoke previously assigned partitions partition-57
> 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-4-my-client-id-my-group-id,
> groupId=my-group-id] Updating assignment with\n\tAssigned partitions:
> [partition-74]\n\tCurrent owned partitions: [partition-57]\n\tAdded
> partitions (assigned - owned): [partition-74]\n\tRevoked partitions (owned -
> assigned): [partition-57]
> 2022-12-14 11:18:21 1 — [id-1] o.a.k.c.c.internals.ConsumerCoordinator :
> [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id]
> Successfully synced group in generation Generation{generationId=640,
> memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
> protocol='cooperative-sticky'}
> 2022-12-14 11:18:21 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator
> : [Consumer clientId=partition-4-my-client-id-my-group-id,
> groupId=my-group-id] Successfully joined group with generation
> Generation{generationId=640,
> memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477',
> protocol='cooperative-sticky'} {code}
> Is this expected?
> Kafka client version is 3.2.1.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)