[ https://issues.apache.org/jira/browse/KAFKA-17599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937840#comment-17937840 ]
Luke Chen commented on KAFKA-17599: ----------------------------------- Removed the fixed version since this is not a blocker for v3.9.1. > Update Consumer Subscription with Current Assignment Before Partition > Revocation > -------------------------------------------------------------------------------- > > Key: KAFKA-17599 > URL: https://issues.apache.org/jira/browse/KAFKA-17599 > Project: Kafka > Issue Type: Improvement > Components: connect, consumer, group-coordinator > Affects Versions: 3.8.0 > Reporter: Pritam Kumar > Assignee: Pritam Kumar > Priority: Minor > Labels: connect, consumer, group-coordinator > > +_*Summary:*_+ > Ensure that the Kafka consumer subscription object is updated with the > current partition assignment before partition revocation, so that close() > method of the sink connector's has the access to current updated partition. > *{+}_Description_{+}:* > In scenarios where Kafka Connect Sink Tasks handle partition revocation > events, it is critical to update the consumer subscription object with the > current partition assignment before the revocation process begins. This will > allow for better management of partition state and ensure that partition > ownership is clear and accurately reflected in the consumer group. > +_*Current Behaviour:*_+ > When partitions are revoked due to rebalancing, the current assignment is not > explicitly updated in the consumer subscription object before revocation, > leading close() being rendered to the old assignment state and it might be > possible that no open is called for this. > +_*Race Conditions:*_+ > Also this leads to some very rare race conditions. Race conditions in some > scenario in sink connectors: There could be race conditions leading to > sometime updated values and sometime old values of the current assigned > partitions in some scenarios. > Consider this: > Since the put call goes in a loop and we have some work in put which is using > "context.assignment" to access the current assignment. Let's say the task is > *assigned [0, 1]* and *[2] is being added* to it and *[0] is being removed* > from this. If the call happens in this case: > 1. Close call partition [0]. > 2. Put call comes with a batch of records > 3. Open call comes [2] > In this scenario accessing context.assignment inside put gives -> [0,1]. > But if the call happens in this way: > 1. Close call partition [0]. > 2. Open call comes for partition [2] > 3. Put call comes with a batch of records. > In this scenario accessing context.assignment inside put gives -> [1,2]. > This leads to stale and inconsistent situation which leads to inconsistent > behaviour for the connectors. > +_*Proposed Behavior:*_+ > Before partition revocation occurs, update the consumer subscription object > with the current partition assignment to ensure consistent state tracking and > smoother transitions during rebalancing. > -- This message was sent by Atlassian Jira (v8.20.10#820010)