[ 
https://issues.apache.org/jira/browse/KAFKA-15998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17906224#comment-17906224
 ] 

Jonathan Haapala commented on KAFKA-15998:
------------------------------------------

We are still periodically running into this issue, and it poses data corruption 
potential each time. 

!image-2024-12-16-15-36-47-059.png|width=623,height=244!

We detect this by saving off what gets passed to {{onAssignment()}} and then 
compare it to what gets passed to {{{}onPartitionsAssigned(){}}}. In the EAGER 
protocol, we expect these to be exactly the same, as {{onPartitionsRevoked()}} 
should be getting called before this point and revoke the entire assignment.

We also keep track of the "state" a partition is in, and atomically transition 
those states in each {{ConsumerRebalanceListener}} API. So we depending on the 
API being called, the protocol we are using (EAGER in this case), and the 
expected state of that partition at the time the API method is invoked, we can 
detect unexpected state changes. As a result, we can see that when we hit this 
bug, we were assigned no partitions in {{{}onAssignment(){}}}, but when 
{{onPartitionsAssigned()}} was called, we were passed partitions 141 and 89. 
During {{onPartitionsAssigned}} we update our partition states by taking the 
difference between partitions currently in an assigned/revoked state with the 
partitions we are now assigned, and mark those not still in the current 
assignment as "unassigned". Therefore, it is unexpected for us to be revoking a 
partition that is in the "unassigned" state, however you can see below that 
50ms after we detect this bug we see those same partitions being passed to 
{{onPartitionsRevoked()}} triggering our unexpected state monitoring. This 
further corroborates that claim that we are being assigned partitions before 
having the revoked.
!image-2024-12-16-15-53-53-816.png|width=921,height=190!

> EAGER rebalance onPartitionsAssigned() called with no previous 
> onPartitionsLost() nor onPartitionsRevoked()
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15998
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15998
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 3.4.0
>            Reporter: Jonathan Haapala
>            Priority: Major
>         Attachments: image-2024-12-16-15-36-47-059.png, 
> image-2024-12-16-15-53-53-816.png
>
>
> I ran into a case where {{onPartitionsAssigned()}} was called without first 
> calling {{onPartitionsRevoked()}} and there is no indication that 
> {{onPartitionsLost()}} was called or had any reason to be called. We are 
> using the *EAGER* rebalance protocol and the *StickyAssignor* on kafka 3.4.0.
> Our services rely on the API contract that {{{}onPartitionsRevoked(){}}}:
> {quote}In eager rebalancing, it will always be called at the start of a 
> rebalance and after the consumer stops fetching data.
> {quote}
> We internally keep track of partition states with a state machine, and rely 
> on these APIs to assert what expected states we are in. So when a partition 
> is Revoked and then re-Assigned, we know that we kept ownership. Moreover, if 
> we are assigned partitions in EAGER rebalancing, we expect that entire 
> assignment is passed to {{{}onPartitionsAssigned(){}}}, because if 
> {{onPartitionsRevoked()}} is always called at the start of a rebalance and 
> EAGER protocol always revokes the entire assignment, then by the time we hit 
> {{onPartitionsAssigned()}} there should be nothing assigned from the 
> consumer's point of view, and therefore the entire assignment is newly added.
> However, we recently ran into a situation where we received an assignment 
> while the consumer's existing assignment was non-empty:
> |     *Pod*|                                       *Message*|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Notifying assignor about the {*}new 
> Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
> topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117, 
> topic-123, topic-130, topic-137, topic-141])|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:25,715\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: topic-26, 
> topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
> topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, 
> topic-141|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:31,923\{UTC} 
> [kafka-coordinator-heartbeat-thread \\| metric-aggregator] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] *Request joining group* due to: group is already 
> rebalancing|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,132\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Successfully joined group with generation 
> Generation\{generationId=12417, 
> memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
>  protocol='sticky'}|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,134\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Successfully synced group in generation 
> Generation{generationId={*}12417{*}, 
> memberId='consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223',
>  protocol='sticky'}|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Notifying assignor about the {*}new 
> Assignment{*}(partitions=[topic-26, topic-44, topic-60, topic-71, topic-78, 
> topic-82, topic-88, topic-101, topic-105, topic-109, topic-113, topic-117])|
> |aggregator-ff95b6cf-r2jkm|2023-12-07 23:02:32,135\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Adding {*}newly assigned partitions{*}: |
> Here you can see we get assigned partitions:
>   26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117, 123, 130, 137, 141
> And promptly see them all as newly added when passed to 
> {{{}onPartitionsAssigned(){}}}. 6 seconds later the heartbeat thread notices 
> another rebalance and requests to join. It quickly succeeds and then almost 
> immediately successfully syncs. We then get a new assignment:
>   26, 44, 60, 71, 78, 82, 88, 101, 105, 109, 113, 117
> This is a subset of the partitions we were assigned previously, missing 123, 
> 130, 137, and 141. Because {{onPartitionsRevoked()}} was not called at the 
> beginning of this rebalance, the consumer still has the old assignment as its 
> current assignment rather than it being empty, and so it thinks there are no 
> newly assigned partitions.
> Using this diagram from 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceProtocol:Stop-The-WorldEffect]
>  as a visual guide, it seems like we sent the JoinGroup and succeeded in 
> joining, but then we seemingly skipped to the SyncGroup and got our 
> assignment.  
> !https://cwiki.apache.org/confluence/download/attachments/103090108/Rebalance%20Today.jpg?version=1&modificationDate=1554837450000&api=v2!
> Here are the group coordinator assignment logs for the initial assignment and 
> then the assignment without a revoke. You can see they are sequential 
> generations 12416 and 12417, so none are missed.
> ||Pod||Message||
> |aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:25,709\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Finished assignment for group at generation 
> {*}12416{*}: 
> {consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
>  topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134, 
> topic-136, topic-27, topic-56, topic-119, topic-22, topic-65, topic-43, 
> topic-64]), 
> consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
>  topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102, 
> topic-106, topic-110, topic-114, topic-118, topic-124, topic-131, topic-138, 
> topic-142]), 
> consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
>  topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103, 
> topic-107, topic-111, topic-115, topic-120, topic-125, topic-132, topic-139, 
> topic-143]), 
> consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
>  topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42, 
> topic-52, topic-63, topic-69, topic-87, topic-91, topic-93, topic-96, 
> topic-98]), 
> consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
>  topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
> topic-105, topic-109, topic-113, topic-117, topic-123, topic-130, topic-137, 
> topic-141]), 
> consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
>  topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34, 
> topic-36, topic-47, topic-48, topic-50, topic-51, topic-53, topic-59, 
> topic-61]), 
> consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
>  topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99, 
> topic-104, topic-108, topic-112, topic-116, topic-122, topic-127, topic-135, 
> topic-140]), 
> consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
>  topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46, 
> topic-55, topic-67, topic-75, topic-89, topic-92, topic-94, topic-97, 
> topic-100]), 
> consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
>  topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68, 
> topic-41, topic-58, topic-121, topic-24, topic-84, topic-76, topic-126])}|
> |aggregator-ff95b6cf-nzkrx|2023-12-07 23:02:32,119\{UTC} 
> [KafkaConsumerAutoService] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer.metric-data-points.metric-aggregator, 
> groupId=metric-aggregator] Finished assignment for group at generation 
> {*}12417{*}: 
> {consumer.metric-data-points.metric-aggregator-8d718b78-9b7e-4803-afda-49b1c35e2654=Assignment(partitions=[topic-28,
>  topic-30, topic-57, topic-77, topic-128, topic-129, topic-133, topic-134, 
> topic-136, topic-27, topic-56, topic-119]), 
> consumer.metric-data-points.metric-aggregator-ab7e7027-3ca9-4dac-bb09-71958b515ab6=Assignment(partitions=[topic-35,
>  topic-45, topic-62, topic-72, topic-79, topic-83, topic-90, topic-102, 
> topic-106, topic-110, topic-114, topic-118]), 
> consumer.metric-data-points.metric-aggregator-76fd27b3-fb5b-49aa-af2a-b53150897cf9=Assignment(partitions=[topic-22,
>  topic-51, topic-61, topic-76, topic-92, topic-96, topic-100, topic-124, 
> topic-127, topic-132, topic-138, topic-141]), 
> consumer.metric-data-points.metric-aggregator-9f6ab144-85a7-468c-9aa1-11f5445084b9=Assignment(partitions=[topic-43,
>  topic-59, topic-65, topic-91, topic-94, topic-98, topic-123, topic-126, 
> topic-131, topic-137, topic-140, topic-143]), 
> consumer.metric-data-points.metric-aggregator-da772740-a386-491a-aa6d-69b26550dbf9=Assignment(partitions=[topic-37,
>  topic-49, topic-66, topic-73, topic-80, topic-85, topic-95, topic-103, 
> topic-107, topic-111, topic-115, topic-120]), 
> consumer.metric-data-points.metric-aggregator-779655ef-3297-44d1-8edf-2e4b6c9b4f55=Assignment(partitions=[topic-7,
>  topic-11, topic-14, topic-16, topic-21, topic-29, topic-38, topic-42, 
> topic-52, topic-63, topic-69, topic-87]), 
> consumer.metric-data-points.metric-aggregator-a43be1e2-eba1-444c-96dd-ccb52cdba223=Assignment(partitions=[topic-26,
>  topic-44, topic-60, topic-71, topic-78, topic-82, topic-88, topic-101, 
> topic-105, topic-109, topic-113, topic-117]), 
> consumer.metric-data-points.metric-aggregator-d06d548e-1e7f-48fb-98a0-2f6072fa3a5b=Assignment(partitions=[topic-9,
>  topic-13, topic-18, topic-20, topic-25, topic-32, topic-33, topic-34, 
> topic-36, topic-47, topic-48, topic-50]), 
> consumer.metric-data-points.metric-aggregator-841a72e3-7ae4-4734-9793-7e947c4fa0a4=Assignment(partitions=[topic-19,
>  topic-39, topic-54, topic-70, topic-74, topic-81, topic-86, topic-99, 
> topic-104, topic-108, topic-112, topic-116]), 
> consumer.metric-data-points.metric-aggregator-939df068-59ea-43d7-bb48-3c6baf97023a=Assignment(partitions=[topic-8,
>  topic-12, topic-15, topic-17, topic-23, topic-31, topic-40, topic-46, 
> topic-55, topic-67, topic-75, topic-89]), 
> consumer.metric-data-points.metric-aggregator-91b1e207-0978-430e-9393-679ac44647b8=Assignment(partitions=[topic-24,
>  topic-53, topic-64, topic-84, topic-93, topic-97, topic-122, topic-125, 
> topic-130, topic-135, topic-139, topic-142]), 
> consumer.metric-data-points.metric-aggregator-933f7c51-27f5-436f-9b56-640eb55d64b2=Assignment(partitions=[topic-0,
>  topic-1, topic-2, topic-3, topic-4, topic-5, topic-6, topic-10, topic-68, 
> topic-41, topic-58, topic-121])}|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to