[ 
https://issues.apache.org/jira/browse/KAFKA-13463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490704#comment-17490704
 ] 

Guozhang Wang commented on KAFKA-13463:
---------------------------------------

Thanks for brining this up [~RivenSun].

Just to bring some context here, [~hachikuji], [~dajac] and myself are 
brainstorming to refactor the threading model along with the improved rebalance 
protocol, and I'm labeling this ticket to be considered under that effort as 
well. As for the fix, I think I'm also leaning towards option 2) --- i.e. 
generally speaking, when partitions are marked as paused while there are 
already some fetched records parked inside the consumer, we will skip them from 
being returned in the `poll` call; and then at the end of the rebalance in 
which we would not augment the offsets since we do not return them, we would 
clear those fetched records.

> Improvement: KafkaConsumer pause(Collection<TopicPartition> partitions)
> -----------------------------------------------------------------------
>
>                 Key: KAFKA-13463
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13463
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>    Affects Versions: 3.0.0
>            Reporter: RivenSun
>            Priority: Major
>              Labels: new-rebalance-should-fix
>
> h1. 1.Background
> When users use the kafkaConsumer#pause(...) method, they will maybe ignore: 
> the pause method may no longer work, and data will be lost.
> For example, the following simple code:
> {code:java}
> while (true) {
>     try {
>         kafkaConsumer.pause(kafkaConsumer.assignment());
>         ConsumerRecords<String, String> records = 
> kafkaConsumer.poll(Duration.ofSeconds(2));
>         if (!records.isEmpty()) {
>             log.error("kafka poll for rebalance discard some record!");
>         }
>     } catch (Exception e) {
>         log.error("maintain poll for rebalance with error:{}", 
> e.getMessage(), e);
>     }
> }{code}
> Even if you call pause(assignment) before the poll method every time, the 
> poll method may still return messages.
>  
> h1. 2. RootCause:
> In short, during the rebalance of the group, 
> ConsumerCoordinator#invokePartitionsRevoked(...) will clear the paused mark 
> on the partitions previously held by kafkaConsumer. However, while clearing 
> the paused mark of partitions, the corresponding message in the memory 
> (Fetcher.completedFetches) of pausedPartitions was not cleared, resulting in 
> Fetcher#fetchedRecords() still fetching the message and returning it to the 
> customer.
> For more detailed analysis, if you are interested, you can read Jira 
> https://issues.apache.org/jira/browse/KAFKA-13425 
> looking forward to your reply.
>  
> h1. 3.Discuss : Can KafkaConsumer support the pause method that is not 
> affected by groupRebalance?
> The KafkaConsumer#pause method actually stated one point at the beginning of 
> its design:
>  * Rebalance does not preserve pause/resume state.
> link:https://issues.apache.org/jira/browse/KAFKA-2350
> Unfortunately, I did not see this from the comments of the 
> KafkaConsumer#pause(...) method. At the same time, 
> ConsumerCoordinator#invokePartitionsRevoked did not have any log output when 
> cleaning up the paused mark. I believe that this will cause many users to use 
> the KafkaConsumer#pause(...) method incorrectly.
> But I think it is necessary for KafkaConsumer to provide a pause method that 
> is not affected by groupRebalance.
>  
> h1. 4. Suggestions
> I will optimize the existing pause method from several different 
> perspectives, or provide some new {{pause}} methods, and each point is an 
> independent solution
> h2. 1)ConsumerCoordinator#invokePartitionsRevoked should also trigger Fetcher 
> to clean up the revokedAndPausedPartitions message in memory when clearing 
> the paused mark
> This can prevent the Fetcher#fetchedRecords() method from mistakenly thinking 
> that revokedAndPausedPartitions is legal and returning messages. There are 
> various checks on the partition in the fetchedRecords method.
> The price of this is that if the user does not call the pause(...) method 
> before calling the poll method next time, a new FetchMessage request may be 
> initiated, which will cause additional network transmission.
>  
> h2. 2)Efforts to maintain the old paused mark on the KafkaConsumer side
> <1>In the ConsumerCoordinator#onJoinPrepare(...) method, record all 
> pausedTopicPartitions from the current assignment of KafkaConsumer;
>  <2> In the ConsumerCoordinator#onJoinComplete(...) method, use 
> pausedTopicPartitions to render the latest assignment and restore the paused 
> marks of the partitions that are still in the latest assignment.
> {*}Note{*}: If the new assignment of kafkaConsumer no longer contains 
> topicPartitions that have been paused before rebalance, the paused mark of 
> these topicPartitions will be lost forever on the kafkaConsumer side, even if 
> in a future rebalance, the kafkaConsumer will hold these partitions again.
> At the end of the Jira KAFKA-13425 I mentioned above, I gave a draft code 
> suggestion on this point
> <3> In fact, for consumers who use the RebalanceProtocol.COOPERATIVE protocol
> For example, consumers who use the currently supported PartitionAssignor: 
> CooperativeStickyAssignor, through code analysis, we can find that the 
> default behavior of these consumers is to maintain the old paused flag, and 
> consumers who use the RebalanceProtocol.EAGER protocol default to clear all 
> paused marks.
> I suggest that the KafkaConsumer behavior of the two RebalanceProtocol should 
> be consistent, otherwise it will cause ambiguity to the existing 
> KafkaConsumer#pause(...) and cause great confusion to users.
>  
> h2. 3)In the groupRebalance process, pass the paused flag of topicPartitions
> In the JoinGroup request, in addition to reporting the topic that it wants to 
> subscribe to, each consumerMember should also report its 
> pausedTopicPartitions. The JoinGroup response received by the LeaderConsumer 
> should contain all paused partitions under the entire group.
> The latest assignment made by LeaderConsumer should maintain the paused mark 
> and be packaged in LeaderConsumer's SyncGroup request
> In this way, after groupRebalance is completed, even if a paused 
> topicpartition is assigned to a new consumer, the new consumer can continue 
> to maintain the paused mark.
> The KafkaConsumer#paused() method can return the partitions that 
> KafkaConsumer did not call the pause(Collection<TopicPartition> partitions) 
> method.
>  
> h2. 4)KafkaConsumer provides a pause method for topic level and supports 
> regular expressions
> {{KafkaConsumer#pause(Collection<String> topics)}}
> {{KafkaConsumer#pause(Pattern pattern)}}
> Similar to the paused mark in SubscriptionState.assignment, we need to 
> provide a new instance variable ‘TopicState’ in SubscriptionState to store 
> the topic-level paused mark. The ‘TopicState’ data structure can refer to the 
> existing TopicPartitionState.
> <1> ‘TopicState’ should not be affected by groupRebalance, and the paused 
> mark in TopicState will not be changed during the groupRebalance process. 
> TopicState should be the memory mark of a single KafkaConsumer, and it does 
> not have to be passed to other consumers after the rebalance is completed.
>  
> <2> {{{}pause(Collection<String> topics){}}}, throws IllegalStateException if 
> this consumer is not currently subscribed to any topic provided
>  
> <3> Fetcher's fetchedRecords() and sendFetches() can be combined with 
> TopicState considerations to decide whether to return a message to the user 
> or initiate a Fetch request
>  
> <4> Provide KafkaConsumer#resume(Collection<String> topics) and 
> KafkaConsumer#resume(Pattern pattern) methods to clean up topic-level paused 
> marks.
>  
> h2. 5)KafkaConsumer provides a pause method for the consumer level
> {{KafkaConsumer#pause()}}
> The existing pause method is for topicPartition and may sometimes be too 
> fine-grained. And the paused mark is bound in the assignment, it is 
> inevitable that it will not be affected by groupRebalance.
> <1> This method may also be the user's most urgent need. After calling this 
> pause() method, kafkaConsumer will mark itself as a paused state, and the 
> poll method will determine the value of isKafkaConsumerPaused to decide 
> whether to return a message to the user or initiate a Fetch request. This 
> isKafkaConsumerPaused mark should also be held by a single KafkaConsumer 
> itself.
>  
> <2> Users do not need to worry about the poll method returning data after 
> calling the KafkaConsumer#pause() method.
> Users can always call the poll method to avoid the following two results if 
> kafkaConsumer does not call the poll method for a long time
>              (1) The heartbeat thread detection mechanism causes 
> kafkaConsumer to actively leaveGroup;
>              (2) At this time, groupRebalance is triggered. The 
> groupCoordinator will wait for the consumer to initiate a Join Group request. 
> The groupRebalance cannot be completed for a long time (limited by 
> max.poll.interval.ms), causing all consumers under the entire group to 
> suspend consumption.
>  
> <3> Provide KafkaConsumer#resume() at the kafkaConsumer level, to clean up 
> the paused mark of KafkaConsumer



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to