[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-----------------------------
    Description: 
h2. Foreword

      Because our consumers' consumption logic is sometimes heavier, we refer 
to the configuration of Kafka stream 
[https://kafka.apache.org/documentation/#upgrade_10201_notable]
 Set max.poll.interval.ms to Integer.MAX_VALUE
 Our consumers have adopted method : 
consumer.subscribe(Pattern.compile(".*riven.*"));

 
h2. Recurrence of the problem scene

operate steps are
 (1) Test environment Kafka cluster: three brokers
 (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
and rivenTest88
 (3) Only one consumer is needed, group.id is "rivenReassign", 
consumer.subscribe(Pattern.compile(".*riven.*"));
 (4) At the beginning, the group status is stable, and everything is normal for 
consumers, then I delete topic: rivenTest88

 
h2. Phenomenon

      Problem phenomenon
  (1) The consumer is blocked in the poll method, no longer consume any 
messages, and the consumer log is always printing
 [main] WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit failed 
on partition rivenTest88-1 at offset 0: This server does not host this 
topic-partition.
 (2) The describe consumerGroup interface of Adminclient  has always timed out, 
and the group status is no longer stable
 (3) The cpu and traffic of the broker are *significantly increased*

 

 
h2. Problem tracking

   By analyzing the kafkaConsumer code, the version is 2.8.1.
 I found that you introduced the waitForJoinGroup variable in the 
updateAssignmentMetadataIfNeeded method. For the reason, I attached the comment 
on the method: "try to update assignment metadata BUT do not need to block on 
the timer for join group".

 
{code:java}
// if not wait for join group, we would just use a timer of 0
      if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need 
// to update the original timer's current time after the call 
      timer.update(time.milliseconds()); 
      return false; 
}
{code}
By tracing the code back layer by layer, it is found that the function of this 
variable is to construct a time.timer(0L) and pass it back to
 The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
 But you will find that there is a submethod onJoinPrepare in the method stack 
of joinGroupIfNeeded, and then there is a line of code in the onJoinPrepare 
method
 maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the value of rebalanceConfig.rebalanceTimeoutMs is actually 
max.poll.interval.ms.
 Finally, I tracked down ConsumerCoordinator's method 
commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer)
 The input parameter offsets is subscriptions.allConsumed(), when I delete the 
topic: rivenTest88, commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> 
offsets, Timer timer) method will *fall into an infinite loop! !*
{code:java}
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> 
offsets, Timer timer) {
 invokeCompletedOffsetCommitCallbacks();

 if (offsets.isEmpty())
 return true;

 do {
 if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
 return false;
 }

 RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
 client.poll(future, timer);

 // We may have had in-flight offset commits when the synchronous commit began. 
If so, ensure that
 // the corresponding callbacks are invoked prior to returning in order to 
preserve the order that
 // the offset commits were applied.
 invokeCompletedOffsetCommitCallbacks();

 if (future.succeeded()) {
 if (interceptors != null)
 interceptors.onCommit(offsets);
 return true;
 }

 if (future.failed() && !future.isRetriable())
 throw future.exception();

 timer.sleep(rebalanceConfig.retryBackoffMs);
 } while (timer.notExpired());

 return false;
}{code}
 

 

*The reason for the endless loop is:*
 (1) The expiration time of the timer is too long, which is max.poll.interval.ms
 (2) The offsets to be submitted contain dirty data and TopicPartition that no 
longer exists
 (3) The response future of sendOffsetCommitRequest(final Map<TopicPartition, 
OffsetAndMetadata> offsets) has always failed, and the exception in the future 
is UnknownTopicOrPartitionException. This exception is allowed to be retried.

Then since the infinite loop interval above is 100ms by default, 
timer.sleep(rebalanceConfig.retryBackoffMs);
 If a large number of consumers have this problem at the same time, a large 
number of network requests will be generated to the Kafka broker, *resulting in 
a sharp increase in the cpu and traffic of the broker machine!*

 

 
h2. Suggest

1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the time of this method is recommended not to use max.poll.interval.ms,
 This parameter is open to users to configure. Through the explanation of this 
parameter on the official website, I would never think that this parameter will 
be used in this place. At the same time, it will block KafkaConsumer's poll 
(final Duration timeout), even if I set consumer.poll (Duration.ofMillis(1000)).
 2. In fact, in the poll (Timer timer, boolean waitForJoinGroup) method of 
ConsumerCoordinatord, before calling the ensureActiveGroup method, the consumer 
ensures that the local metadata is up to date, see the code

 
{code:java}
if (rejoinNeededOrPending()) {
    // due to a race condition between the initial metadata fetch and the 
initial rebalance,
    // we need to ensure that the metadata is fresh before joining initially. 
This ensures
    // that we have matched the pattern against the cluster's topics at least 
once before joining.
    if (subscriptions.hasPatternSubscription()) {
        // For consumer group that uses pattern-based subscription, after a 
topic is created,
        // any consumer that discovers the topic after metadata refresh can 
trigger rebalance
        // across the entire consumer group. Multiple rebalances can be 
triggered after one topic
        // creation if consumers refresh metadata at vastly different times. We 
can significantly
        // reduce the number of rebalances caused by single topic creation by 
asking consumer to
        // refresh metadata before re-joining the group as long as the refresh 
backoff time has
        // passed.
        if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
            this.metadata.requestUpdate();
        }

        if (!client.ensureFreshMetadata(timer)) {
            return false;
        }
    }

    if (!ensureActiveGroup(timer)) {
        return false;
    }
}
{code}
 

That is to say, the consumer knows which topic/topicPartition is legal before 
onJoinPrepare. In this case, why didn't you find the 
UnknownTopicOrPartitionException in the commitOffsetsSync method mentioned 
above,do not put the submitted offsets and the latest local metadata together 
for analysis, remove the non-existent topicpartitions, and then try to submit 
the offsets again. I think I can break out of the infinite loop by doing this

3. Why must the offset be submitted synchronously in the onJoinPrepare method? 
Can't the offset be submitted asynchronously? Or provide a parameter for the 
user to choose whether to submit synchronously or asynchronously. Or provide a 
new parameter to control the maximum number of retries for synchronous 
submission here, instead of using the Timer constructed by max.poll.interval.ms.
 And if you don’t really submit the offset here, it will not have much impact. 
It may cause repeated consumption of some messages. I still suggest to provide 
a new parameter to control whether you need to submit the offset.

  was:
h2. Foreword

      Because our consumers' consumption logic is sometimes heavier, we refer 
to the configuration of Kafka stream 
[https://kafka.apache.org/documentation/#upgrade_10201_notable]
 Set max.poll.interval.ms to Integer.MAX_VALUE
 Our consumers have adopted method : 
consumer.subscribe(Pattern.compile(".*riven.*"));

 
h2. Recurrence of the problem scene

operate steps are
 (1) Test environment Kafka cluster: three brokers
 (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
and rivenTest88
 (3) Only one consumer is needed, group.id is "rivenReassign", 
consumer.subscribe(Pattern.compile(".*riven.*"));
 (4) At the beginning, the group status is stable, and everything is normal for 
consumers, then I delete topic: rivenTest88

 
h2. Phenomenon

      Problem phenomenon
  (1) The consumer is blocked in the poll method, no longer consume any 
messages, and the consumer log is always printing
 [main] WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit failed 
on partition rivenTest88-1 at offset 0: This server does not host this 
topic-partition.
 (2) The describe consumerGroup interface of Adminclient  has always timed out, 
and the group status is no longer stable
 (3) The cpu and traffic of the broker are *significantly increased*

 

 
h2. Problem tracking

   By analyzing the kafkaConsumer code, the version is 2.8.1.
 I found that you introduced the waitForJoinGroup variable in the 
updateAssignmentMetadataIfNeeded method. For the reason, I attached the comment 
on the method: "try to update assignment metadata BUT do not need to block on 
the timer for join group".

 
{code:java}
// if not wait for join group, we would just use a timer of 0
      if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need 
// to update the original timer's current time after the call 
      timer.update(time.milliseconds()); 
      return false; 
}
{code}
By tracing the code back layer by layer, it is found that the function of this 
variable is to construct a time.timer(0L) and pass it back to
 The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
 But you will find that there is a submethod onJoinPrepare in the method stack 
of joinGroupIfNeeded, and then there is a line of code in the onJoinPrepare 
method
 maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
max.poll.interval.ms.
 Finally, I tracked down ConsumerCoordinator's method 
commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer)
 The input parameter offsets is subscriptions.allConsumed(), when I delete the 
topic: rivenTest88, commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> 
offsets, Timer timer) method will *fall into an infinite loop! !*
{code:java}
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> 
offsets, Timer timer) {
 invokeCompletedOffsetCommitCallbacks();

 if (offsets.isEmpty())
 return true;

 do {
 if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
 return false;
 }

 RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
 client.poll(future, timer);

 // We may have had in-flight offset commits when the synchronous commit began. 
If so, ensure that
 // the corresponding callbacks are invoked prior to returning in order to 
preserve the order that
 // the offset commits were applied.
 invokeCompletedOffsetCommitCallbacks();

 if (future.succeeded()) {
 if (interceptors != null)
 interceptors.onCommit(offsets);
 return true;
 }

 if (future.failed() && !future.isRetriable())
 throw future.exception();

 timer.sleep(rebalanceConfig.retryBackoffMs);
 } while (timer.notExpired());

 return false;
}{code}
 

 

*The reason for the endless loop is:*
 (1) The expiration time of the timer is too long, which is max.poll.interval.ms
 (2) The offsets to be submitted contain dirty data and TopicPartition that no 
longer exists
 (3) The response future of sendOffsetCommitRequest(final Map<TopicPartition, 
OffsetAndMetadata> offsets) has always failed, and the exception in the future 
is UnknownTopicOrPartitionException. This exception is allowed to be retried.

Then since the infinite loop interval above is 100ms by default, 
timer.sleep(rebalanceConfig.retryBackoffMs);
 If a large number of consumers have this problem at the same time, a large 
number of network requests will be generated to the Kafka broker, *resulting in 
a sharp increase in the cpu and traffic of the broker machine!*

 

 
h2. Suggest

1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the time of this method is recommended not to use max.poll.interval.ms,
 This parameter is open to users to configure. Through the explanation of this 
parameter on the official website, I would never think that this parameter will 
be used in this place. At the same time, it will block KafkaConsumer's poll 
(final Duration timeout), even if I set consumer.poll (Duration.ofMillis(1000)).
 2. In fact, in the poll (Timer timer, boolean waitForJoinGroup) method of 
ConsumerCoordinatord, before calling the ensureActiveGroup method, the consumer 
ensures that the local metadata is up to date, see the code

 
{code:java}
if (rejoinNeededOrPending()) {
    // due to a race condition between the initial metadata fetch and the 
initial rebalance,
    // we need to ensure that the metadata is fresh before joining initially. 
This ensures
    // that we have matched the pattern against the cluster's topics at least 
once before joining.
    if (subscriptions.hasPatternSubscription()) {
        // For consumer group that uses pattern-based subscription, after a 
topic is created,
        // any consumer that discovers the topic after metadata refresh can 
trigger rebalance
        // across the entire consumer group. Multiple rebalances can be 
triggered after one topic
        // creation if consumers refresh metadata at vastly different times. We 
can significantly
        // reduce the number of rebalances caused by single topic creation by 
asking consumer to
        // refresh metadata before re-joining the group as long as the refresh 
backoff time has
        // passed.
        if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
            this.metadata.requestUpdate();
        }

        if (!client.ensureFreshMetadata(timer)) {
            return false;
        }
    }

    if (!ensureActiveGroup(timer)) {
        return false;
    }
}
{code}
 

That is to say, the consumer knows which topic/topicPartition is legal before 
onJoinPrepare. In this case, why didn't you find the 
UnknownTopicOrPartitionException in the commitOffsetsSync method mentioned 
above, not put the submitted offsets and the latest local metadata together for 
analysis, remove the non-existent topicpartitions, and then try to submit the 
offsets again. I think I can break out of the infinite loop by doing this

3. Why must the offset be submitted synchronously in the onJoinPrepare method? 
Can't the offset be submitted asynchronously? Or provide a parameter for the 
user to choose whether to submit synchronously or asynchronously. Or provide a 
new parameter to control the maximum number of retries for synchronous 
submission here, instead of using the Timer constructed by max.poll.interval.ms.
 And if you don’t really submit the offset here, it will not have much impact. 
It may cause repeated consumption of some messages. I still suggest to provide 
a new parameter to control whether you need to submit the offset.


> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13310
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13310
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.8.1
>         Environment: prod
>            Reporter: RivenSun
>            Priority: Major
>         Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>       if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>       timer.update(time.milliseconds()); 
>       return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMs is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map<TopicPartition, 
> OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> 
> offsets, Timer timer) {
>  invokeCompletedOffsetCommitCallbacks();
>  if (offsets.isEmpty())
>  return true;
>  do {
>  if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
>  return false;
>  }
>  RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
>  client.poll(future, timer);
>  // We may have had in-flight offset commits when the synchronous commit 
> began. If so, ensure that
>  // the corresponding callbacks are invoked prior to returning in order to 
> preserve the order that
>  // the offset commits were applied.
>  invokeCompletedOffsetCommitCallbacks();
>  if (future.succeeded()) {
>  if (interceptors != null)
>  interceptors.onCommit(offsets);
>  return true;
>  }
>  if (future.failed() && !future.isRetriable())
>  throw future.exception();
>  timer.sleep(rebalanceConfig.retryBackoffMs);
>  } while (timer.notExpired());
>  return false;
> }{code}
>  
>  
> *The reason for the endless loop is:*
>  (1) The expiration time of the timer is too long, which is 
> max.poll.interval.ms
>  (2) The offsets to be submitted contain dirty data and TopicPartition that 
> no longer exists
>  (3) The response future of sendOffsetCommitRequest(final Map<TopicPartition, 
> OffsetAndMetadata> offsets) has always failed, and the exception in the 
> future is UnknownTopicOrPartitionException. This exception is allowed to be 
> retried.
> Then since the infinite loop interval above is 100ms by default, 
> timer.sleep(rebalanceConfig.retryBackoffMs);
>  If a large number of consumers have this problem at the same time, a large 
> number of network requests will be generated to the Kafka broker, *resulting 
> in a sharp increase in the cpu and traffic of the broker machine!*
>  
>  
> h2. Suggest
> 1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the time of this method is recommended not to use max.poll.interval.ms,
>  This parameter is open to users to configure. Through the explanation of 
> this parameter on the official website, I would never think that this 
> parameter will be used in this place. At the same time, it will block 
> KafkaConsumer's poll (final Duration timeout), even if I set consumer.poll 
> (Duration.ofMillis(1000)).
>  2. In fact, in the poll (Timer timer, boolean waitForJoinGroup) method of 
> ConsumerCoordinatord, before calling the ensureActiveGroup method, the 
> consumer ensures that the local metadata is up to date, see the code
>  
> {code:java}
> if (rejoinNeededOrPending()) {
>     // due to a race condition between the initial metadata fetch and the 
> initial rebalance,
>     // we need to ensure that the metadata is fresh before joining initially. 
> This ensures
>     // that we have matched the pattern against the cluster's topics at least 
> once before joining.
>     if (subscriptions.hasPatternSubscription()) {
>         // For consumer group that uses pattern-based subscription, after a 
> topic is created,
>         // any consumer that discovers the topic after metadata refresh can 
> trigger rebalance
>         // across the entire consumer group. Multiple rebalances can be 
> triggered after one topic
>         // creation if consumers refresh metadata at vastly different times. 
> We can significantly
>         // reduce the number of rebalances caused by single topic creation by 
> asking consumer to
>         // refresh metadata before re-joining the group as long as the 
> refresh backoff time has
>         // passed.
>         if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {
>             this.metadata.requestUpdate();
>         }
>         if (!client.ensureFreshMetadata(timer)) {
>             return false;
>         }
>     }
>     if (!ensureActiveGroup(timer)) {
>         return false;
>     }
> }
> {code}
>  
> That is to say, the consumer knows which topic/topicPartition is legal before 
> onJoinPrepare. In this case, why didn't you find the 
> UnknownTopicOrPartitionException in the commitOffsetsSync method mentioned 
> above,do not put the submitted offsets and the latest local metadata together 
> for analysis, remove the non-existent topicpartitions, and then try to submit 
> the offsets again. I think I can break out of the infinite loop by doing this
> 3. Why must the offset be submitted synchronously in the onJoinPrepare 
> method? Can't the offset be submitted asynchronously? Or provide a parameter 
> for the user to choose whether to submit synchronously or asynchronously. Or 
> provide a new parameter to control the maximum number of retries for 
> synchronous submission here, instead of using the Timer constructed by 
> max.poll.interval.ms.
>  And if you don’t really submit the offset here, it will not have much 
> impact. It may cause repeated consumption of some messages. I still suggest 
> to provide a new parameter to control whether you need to submit the offset.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to