[ https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bojan Blagojevic updated KAFKA-14639: ------------------------------------- Attachment: consumers-jira.log > Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance > cycle > -------------------------------------------------------------------------------- > > Key: KAFKA-14639 > URL: https://issues.apache.org/jira/browse/KAFKA-14639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer > Affects Versions: 3.2.1 > Reporter: Bojan Blagojevic > Priority: Major > Attachments: consumers-jira.log > > > I have an application that runs 6 consumers in parallel. I am getting some > unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I > understand the mechanism correctly, if the consumer looses partition in one > rebalance cycle, the partition should be assigned in the next rebalance cycle. > This assumption is based on the > [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html] > documentation and few blog posts that describe the protocol, like [this > one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/] > on Confluent blog. > {quote}The assignor should not reassign any owned partitions immediately, but > instead may indicate consumers the need for partition revocation so that the > revoked partitions can be reassigned to other consumers in the next rebalance > event. This is designed for sticky assignment logic which attempts to > minimize partition reassignment with cooperative adjustments. > {quote} > {quote}Any member that revoked partitions then rejoins the group, triggering > a second rebalance so that its revoked partitions can be assigned. Until > then, these partitions are unowned and unassigned. > {quote} > These are the logs from the application that uses > {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle > ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to > {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 > consumers. > Mind that the log is in reverse(bottom to top) > {code:java} > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New > partition assignment: partition-59, seek to min common offset: 85120524 > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions > [partition-59] assigned successfully > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions > assigned: [partition-59] > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Adding newly assigned partitions: partition-59 > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Notifying assignor about the new > Assignment(partitions=[partition-59]) > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Request joining group due to: need to revoke partitions > [partition-26, partition-74] as indicated by the current assignment and > re-join > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions > [partition-26, partition-74] revoked successfully > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished > removing partition data > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] (Re-)joining group > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New > partition assignment: partition-74, seek to min common offset: 107317730 > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions > [partition-74] assigned successfully > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions > assigned: [partition-74] > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Adding newly assigned partitions: partition-74 > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Notifying assignor about the new > Assignment(partitions=[partition-74]) > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Request joining group due to: need to revoke partitions > [partition-57] as indicated by the current assignment and re-join > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions > [partition-57] revoked successfully > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Finished > removing partition data > 2022-12-14 11:18:22 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions > revoked: [partition-26, partition-74] > 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Revoke previously assigned partitions partition-26, > partition-74 > 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Updating assignment with\n\tAssigned partitions: > [partition-59]\n\tCurrent owned partitions: [partition-26, > partition-74]\n\tAdded partitions (assigned - owned): > [partition-59]\n\tRevoked partitions (owned - assigned): [partition-26, > partition-74] > 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Successfully synced group in generation > Generation{generationId=640, > memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af', > protocol='cooperative-sticky'} > 2022-12-14 11:18:22 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Successfully joined group with generation > Generation{generationId=640, > memberId='partition-3-my-client-id-my-group-id-c31afd19-3f22-43cb-ad07-9088aa98d3af', > protocol='cooperative-sticky'} > 2022-12-14 11:18:22 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions > revoked: [partition-57] > 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Revoke previously assigned partitions partition-57 > 2022-12-14 11:18:22 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Updating assignment with\n\tAssigned partitions: > [partition-74]\n\tCurrent owned partitions: [partition-57]\n\tAdded > partitions (assigned - owned): [partition-74]\n\tRevoked partitions (owned - > assigned): [partition-57] > 2022-12-14 11:18:21 1 — [id-1] o.a.k.c.c.internals.ConsumerCoordinator : > [Consumer clientId=partition-4-my-client-id-my-group-id, groupId=my-group-id] > Successfully synced group in generation Generation{generationId=640, > memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477', > protocol='cooperative-sticky'} > 2022-12-14 11:18:21 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Successfully joined group with generation > Generation{generationId=640, > memberId='partition-4-my-client-id-my-group-id-ae2af665-edc9-4a8e-b658-98372d142477', > protocol='cooperative-sticky'} {code} > Is this expected? > Kafka client version is 3.2.1. -- This message was sent by Atlassian Jira (v8.20.10#820010)