Pritam Kumar created KAFKA-17599:
------------------------------------

             Summary: Update Consumer Subscription with Current Assignment 
Before Partition Revocation
                 Key: KAFKA-17599
                 URL: https://issues.apache.org/jira/browse/KAFKA-17599
             Project: Kafka
          Issue Type: Improvement
          Components: connect, consumer, group-coordinator
            Reporter: Pritam Kumar


+_*Summary:*_+
Ensure that the Kafka consumer subscription object is updated with the current 
partition assignment before partition revocation, so that close() method of the 
sink connector's has the access to current updated partition. 
*{+}_Description_{+}:* 
In scenarios where Kafka Connect Sink Tasks handle partition revocation events, 
it is critical to update the consumer subscription object with the current 
partition assignment before the revocation process begins. This will allow for 
better management of partition state and ensure that partition ownership is 
clear and accurately reflected in the consumer group.
+_*Current Behaviour:*_+
When partitions are revoked due to rebalancing, the current assignment is not 
explicitly updated in the consumer subscription object before revocation, 
leading close() being rendered to the old assignment state and it might be 
possible that no open is called for this.
+_*Race Conditions:*_+
Also this leads to some very rare race conditions. Race conditions in some 
scenario in sink connectors: There could be race conditions leading to sometime 
updated values and sometime old values of the current assigned partitions in 
some scenarios. 
Consider this: 
Since the put call goes in a loop and we have some work in put which is using 
"context.assignment" to access the current assignment. Let's say the task is 
*assigned [0, 1]* and *[2] is being added* to it and *[0] is being removed* 
from this. If the call happens in this case: 
1. Close call partition [0]. 
2. Put call comes with a batch of records 
3. Open call comes [2] 
In this scenario accessing context.assignment inside put gives -> [0,1].
But if the call happens in this way: 
1. Close call partition [0]. 
2. Open call comes for partition [2] 
3. Put call comes with a batch of records.
In this scenario accessing context.assignment inside put gives -> [1,2]. 
This leads to stale and inconsistent situation which leads to inconsistent 
behaviour for the connectors.


+_*Proposed Behavior:*_+
Before partition revocation occurs, update the consumer subscription object 
with the current partition assignment to ensure consistent state tracking and 
smoother transitions during rebalancing.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to