RivenSun2 commented on pull request #11911: URL: https://github.com/apache/kafka/pull/11911#issuecomment-1073152235
Hi @guozhangwang > Why not always try to get from subscriptions.pausedPartitions(), than maintaining a local copy? The paused partitions could change at any time in between as the onJoinXXX may be triggered by the hb thread right? Because what this PR strives to do is: In each onJoinPrepare, record the `pausedPartitions` after the last rebalance is completed. Then in onJoinComplete, use the `pausedPartitions` variable to restore the paused marks of topicPartitions in the latest assignment. In fact, there is nothing special about whether each rebalance is triggered by the hb thread. If the new assignment of kafkaConsumer no longer contains topicPartitions that have been paused before rebalance, the paused mark of these topicPartitions will be lost forever on the kafkaConsumer side, even if in a future rebalance, the kafkaConsumer will hold these partitions again. > I'm wondering if we should also clear the fetched messages in the buffer when revoking the partitions as well? As suggested in the ticket itself: Come to think of it, it doesn't make much of a difference. For eager protocol, the default behavior is to clear the paused mark of all topicPartitions. If we clean up the messages in memory at the same time, the KafkaConsumer#poll method will still re-initiate the Fetch request for these topicPartitions, because these topicPartitions are no longer paused for KafkaConsumer. But users may wonder why they still polled the message for their pausedTopicPartitions The main purpose of this PR is: Make pause behavior consistent between cooperative and eager protocols. We still do not promise that rebalance will always maintain **all paused** marks of users. WDYT? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org