[ 
https://issues.apache.org/jira/browse/KAFKA-17599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937840#comment-17937840
 ] 

Luke Chen commented on KAFKA-17599:
-----------------------------------

Removed the fixed version since this is not a blocker for v3.9.1.

> Update Consumer Subscription with Current Assignment Before Partition 
> Revocation
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-17599
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17599
>             Project: Kafka
>          Issue Type: Improvement
>          Components: connect, consumer, group-coordinator
>    Affects Versions: 3.8.0
>            Reporter: Pritam Kumar
>            Assignee: Pritam Kumar
>            Priority: Minor
>              Labels: connect, consumer, group-coordinator
>
> +_*Summary:*_+
> Ensure that the Kafka consumer subscription object is updated with the 
> current partition assignment before partition revocation, so that close() 
> method of the sink connector's has the access to current updated partition. 
> *{+}_Description_{+}:* 
> In scenarios where Kafka Connect Sink Tasks handle partition revocation 
> events, it is critical to update the consumer subscription object with the 
> current partition assignment before the revocation process begins. This will 
> allow for better management of partition state and ensure that partition 
> ownership is clear and accurately reflected in the consumer group.
> +_*Current Behaviour:*_+
> When partitions are revoked due to rebalancing, the current assignment is not 
> explicitly updated in the consumer subscription object before revocation, 
> leading close() being rendered to the old assignment state and it might be 
> possible that no open is called for this.
> +_*Race Conditions:*_+
> Also this leads to some very rare race conditions. Race conditions in some 
> scenario in sink connectors: There could be race conditions leading to 
> sometime updated values and sometime old values of the current assigned 
> partitions in some scenarios. 
> Consider this: 
> Since the put call goes in a loop and we have some work in put which is using 
> "context.assignment" to access the current assignment. Let's say the task is 
> *assigned [0, 1]* and *[2] is being added* to it and *[0] is being removed* 
> from this. If the call happens in this case: 
> 1. Close call partition [0]. 
> 2. Put call comes with a batch of records 
> 3. Open call comes [2] 
> In this scenario accessing context.assignment inside put gives -> [0,1].
> But if the call happens in this way: 
> 1. Close call partition [0]. 
> 2. Open call comes for partition [2] 
> 3. Put call comes with a batch of records.
> In this scenario accessing context.assignment inside put gives -> [1,2]. 
> This leads to stale and inconsistent situation which leads to inconsistent 
> behaviour for the connectors.
> +_*Proposed Behavior:*_+
> Before partition revocation occurs, update the consumer subscription object 
> with the current partition assignment to ensure consistent state tracking and 
> smoother transitions during rebalancing.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to