[jira] [Created] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu a

2021-09-18 Thread RivenSun (Jira)
RivenSun created KAFKA-13310:


 Summary: KafkaConsumer cannot jump out of the poll method, and the 
consumer is blocked in the ConsumerCoordinator method 
maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu and traffic increase 
sharply
 Key: KAFKA-13310
 URL: https://issues.apache.org/jira/browse/KAFKA-13310
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.8.1
 Environment: prod
Reporter: RivenSun
 Attachments: brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png

Foreword:

      Because our consumers' consumption logic is sometimes heavier, we refer 
to the configuration of Kafka stream 
https://kafka.apache.org/documentation/#upgrade_10201_notable
Set max.poll.interval.ms to Integer.MAX_VALUE
Our consumers have adopted method : 
consumer.subscribe(Pattern.compile(".*riven.*"));

Recurrence of the problem scene:
 (1) Test environment Kafka cluster: three brokers
(2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
and rivenTest88
 (3) Only one consumer is needed, group.id is "rivenReassign", 
consumer.subscribe(Pattern.compile(".*riven.*"));
 (4) At the beginning, the group status is stable, and everything is normal for 
consumers, then I delete topic: rivenTest88

Phenomenon:
 (1) The consumer is blocked in the poll method, no longer consume any 
messages, and the consumer log is always printing
[main] WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit failed 
on partition rivenTest88-1 at offset 0: This server does not host this 
topic-partition.
 (2) The describe consumerGroup interface of Adminclient  has always timed out, 
and the group status is no longer stable
 (3) The cpu and traffic of the broker are *significantly increased*


Problem tracking:
By analyzing the kafkaConsumer code, the version is 2.8.1.
I found that you introduced the waitForJoinGroup variable in the 
updateAssignmentMetadataIfNeeded method. For the reason, I attached the comment 
on the method: "try to update assignment metadata BUT do not need to block on 
the timer for join group".
By tracing the code back layer by layer, it is found that the function of this 
variable is to construct a time.timer(0L) and pass it back to
The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
But you will find that there is a submethod onJoinPrepare in the method stack 
of joinGroupIfNeeded, and then there is a line of code in the onJoinPrepare 
method
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), the 
value of rebalanceConfig.rebalanceTimeoutMsd is actually max.poll.interval.ms.
Finally, I tracked down ConsumerCoordinator's method 
commitOffsetsSync(Map offsets, Timer timer)
The input parameter offsets is subscriptions.allConsumed(), when I delete the 
topic: rivenTest88, commitOffsetsSync(Map 
offsets, Timer timer) method will *fall into an infinite loop! !*
*The reason for the endless loop is:*
 (1) The expiration time of the timer is too long, which is max.poll.interval.ms
 (2) The offsets to be submitted contain dirty data and TopicPartition that no 
longer exists
 (3) The response future of sendOffsetCommitRequest(final Map offsets) has always failed, and the exception in the future 
is UnknownTopicOrPartitionException. This exception is allowed to be retried.

Then since the infinite loop interval above is 100ms by default, 
timer.sleep(rebalanceConfig.retryBackoffMs);
If a large number of consumers have this problem at the same time, a large 
number of network requests will be generated to the Kafka broker, *resulting in 
a sharp increase in the cpu and traffic of the broker machine!*


Suggest:
1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the time of this method is recommended not to use max.poll.interval.ms,
This parameter is open to users to configure. Through the explanation of this 
parameter on the official website, I would never think that this parameter will 
be used in this place. At the same time, it will block KafkaConsumer's poll 
(final Duration timeout), even if I set consumer.poll (Duration.ofMillis(1000)).
2. In fact, in the poll (Timer timer, boolean waitForJoinGroup) method of 
ConsumerCoordinatord, before calling the ensureActiveGroup method, the consumer 
ensures that the local metadata is up to date, see the code
if (!client.ensureFreshMetadata(timer)) {
 return false;
 }
That is to say, the consumer knows which topic/topicPartition is legal before 
onJoinPrepare. In this case, why didn't you find the 
UnknownTopicOrPartitionException in the commitOffsetsSync method mentioned 
above, not put the submitted offsets and the latest local metadata together for 
analysis, remove the non-existent topicpartitions, and then try to submit the 
offse

[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu a

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Description: 
Foreword:

      Because our consumers' consumption logic is sometimes heavier, we refer 
to the configuration of Kafka stream 
[https://kafka.apache.org/documentation/#upgrade_10201_notable]
 Set max.poll.interval.ms to Integer.MAX_VALUE
 Our consumers have adopted method : 
consumer.subscribe(Pattern.compile(".*riven.*"));

Recurrence of the problem scene:
 (1) Test environment Kafka cluster: three brokers
 (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
and rivenTest88
 (3) Only one consumer is needed, group.id is "rivenReassign", 
consumer.subscribe(Pattern.compile(".*riven.*"));
 (4) At the beginning, the group status is stable, and everything is normal for 
consumers, then I delete topic: rivenTest88

Phenomenon:
 (1) The consumer is blocked in the poll method, no longer consume any 
messages, and the consumer log is always printing
 [main] WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit failed 
on partition rivenTest88-1 at offset 0: This server does not host this 
topic-partition.
 (2) The describe consumerGroup interface of Adminclient  has always timed out, 
and the group status is no longer stable
 (3) The cpu and traffic of the broker are *significantly increased*

Problem tracking:
 By analyzing the kafkaConsumer code, the version is 2.8.1.
 I found that you introduced the waitForJoinGroup variable in the 
updateAssignmentMetadataIfNeeded method. For the reason, I attached the comment 
on the method: "try to update assignment metadata BUT do not need to block on 
the timer for join group".
 By tracing the code back layer by layer, it is found that the function of this 
variable is to construct a time.timer(0L) and pass it back to
 The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
 But you will find that there is a submethod onJoinPrepare in the method stack 
of joinGroupIfNeeded, and then there is a line of code in the onJoinPrepare 
method
 maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
max.poll.interval.ms.
 Finally, I tracked down ConsumerCoordinator's method 
commitOffsetsSync(Map offsets, Timer timer)
 The input parameter offsets is subscriptions.allConsumed(), when I delete the 
topic: rivenTest88, commitOffsetsSync(Map 
offsets, Timer timer) method will *fall into an infinite loop! !*


 *The reason for the endless loop is:*
 (1) The expiration time of the timer is too long, which is max.poll.interval.ms
 (2) The offsets to be submitted contain dirty data and TopicPartition that no 
longer exists
 (3) The response future of sendOffsetCommitRequest(final Map offsets) has always failed, and the exception in the future 
is UnknownTopicOrPartitionException. This exception is allowed to be retried.

Then since the infinite loop interval above is 100ms by default, 
timer.sleep(rebalanceConfig.retryBackoffMs);
 If a large number of consumers have this problem at the same time, a large 
number of network requests will be generated to the Kafka broker, *resulting in 
a sharp increase in the cpu and traffic of the broker machine!*

Suggest:
 1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the time of this method is recommended not to use max.poll.interval.ms,
 This parameter is open to users to configure. Through the explanation of this 
parameter on the official website, I would never think that this parameter will 
be used in this place. At the same time, it will block KafkaConsumer's poll 
(final Duration timeout), even if I set consumer.poll (Duration.ofMillis(1000)).
 2. In fact, in the poll (Timer timer, boolean waitForJoinGroup) method of 
ConsumerCoordinatord, before calling the ensureActiveGroup method, the consumer 
ensures that the local metadata is up to date, see the code


 if (!client.ensureFreshMetadata(timer))

{ return false; }

That is to say, the consumer knows which topic/topicPartition is legal before 
onJoinPrepare. In this case, why didn't you find the 
UnknownTopicOrPartitionException in the commitOffsetsSync method mentioned 
above, not put the submitted offsets and the latest local metadata together for 
analysis, remove the non-existent topicpartitions, and then try to submit the 
offsets again. I think I can break out of the infinite loop by doing this

3. Why must the offset be submitted synchronously in the onJoinPrepare method? 
Can't the offset be submitted asynchronously? Or provide a parameter for the 
user to choose whether to submit synchronously or asynchronously. Or provide a 
new parameter to control the maximum number of retries for synchronous 
submission here, instead of usi

[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu a

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Description: 
h2. Foreword

      Because our consumers' consumption logic is sometimes heavier, we refer 
to the configuration of Kafka stream 
[https://kafka.apache.org/documentation/#upgrade_10201_notable]
 Set max.poll.interval.ms to Integer.MAX_VALUE
 Our consumers have adopted method : 
consumer.subscribe(Pattern.compile(".*riven.*"));

 
h2. Recurrence of the problem scene

operate steps are
 (1) Test environment Kafka cluster: three brokers
 (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
and rivenTest88
 (3) Only one consumer is needed, group.id is "rivenReassign", 
consumer.subscribe(Pattern.compile(".*riven.*"));
 (4) At the beginning, the group status is stable, and everything is normal for 
consumers, then I delete topic: rivenTest88

 
h2. Phenomenon

      Problem phenomenon
 (1) The consumer is blocked in the poll method, no longer consume any 
messages, and the consumer log is always printing
 [main] WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit failed 
on partition rivenTest88-1 at offset 0: This server does not host this 
topic-partition.
 (2) The describe consumerGroup interface of Adminclient  has always timed out, 
and the group status is no longer stable
 (3) The cpu and traffic of the broker are *significantly increased*

 

 
h2. Problem tracking

   By analyzing the kafkaConsumer code, the version is 2.8.1.
 I found that you introduced the waitForJoinGroup variable in the 
updateAssignmentMetadataIfNeeded method. For the reason, I attached the comment 
on the method: "try to update assignment metadata BUT do not need to block on 
the timer for join group".

 
{code:java}
// if not wait for join group, we would just use a timer of 0
  if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need 
// to update the original timer's current time after the call 
  timer.update(time.milliseconds()); 
  return false; 
}
{code}
By tracing the code back layer by layer, it is found that the function of this 
variable is to construct a time.timer(0L) and pass it back to
 The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
 But you will find that there is a submethod onJoinPrepare in the method stack 
of joinGroupIfNeeded, and then there is a line of code in the onJoinPrepare 
method
 maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
max.poll.interval.ms.
 Finally, I tracked down ConsumerCoordinator's method 
commitOffsetsSync(Map offsets, Timer timer)
 The input parameter offsets is subscriptions.allConsumed(), when I delete the 
topic: rivenTest88, commitOffsetsSync(Map 
offsets, Timer timer) method will *fall into an infinite loop! !*
{code:java}
public boolean commitOffsetsSync(Map 
offsets, Timer timer) {
 invokeCompletedOffsetCommitCallbacks();

 if (offsets.isEmpty())
 return true;

 do {
 if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
 return false;
 }

 RequestFuture future = sendOffsetCommitRequest(offsets);
 client.poll(future, timer);

 // We may have had in-flight offset commits when the synchronous commit began. 
If so, ensure that
 // the corresponding callbacks are invoked prior to returning in order to 
preserve the order that
 // the offset commits were applied.
 invokeCompletedOffsetCommitCallbacks();

 if (future.succeeded()) {
 if (interceptors != null)
 interceptors.onCommit(offsets);
 return true;
 }

 if (future.failed() && !future.isRetriable())
 throw future.exception();

 timer.sleep(rebalanceConfig.retryBackoffMs);
 } while (timer.notExpired());

 return false;
}{code}
 

 

*The reason for the endless loop is:*
 (1) The expiration time of the timer is too long, which is max.poll.interval.ms
 (2) The offsets to be submitted contain dirty data and TopicPartition that no 
longer exists
 (3) The response future of sendOffsetCommitRequest(final Map offsets) has always failed, and the exception in the future 
is UnknownTopicOrPartitionException. This exception is allowed to be retried.

Then since the infinite loop interval above is 100ms by default, 
timer.sleep(rebalanceConfig.retryBackoffMs);
 If a large number of consumers have this problem at the same time, a large 
number of network requests will be generated to the Kafka broker, *resulting in 
a sharp increase in the cpu and traffic of the broker machine!*

 

 
h2. Suggest

1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the time of this method is recommended not to use max.poll.interval.ms,
 This parameter is open to users to configure. Through th

[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu a

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Description: 
h2. Foreword

      Because our consumers' consumption logic is sometimes heavier, we refer 
to the configuration of Kafka stream 
[https://kafka.apache.org/documentation/#upgrade_10201_notable]
 Set max.poll.interval.ms to Integer.MAX_VALUE
 Our consumers have adopted method : 
consumer.subscribe(Pattern.compile(".*riven.*"));

 
h2. Recurrence of the problem scene

operate steps are
 (1) Test environment Kafka cluster: three brokers
 (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
and rivenTest88
 (3) Only one consumer is needed, group.id is "rivenReassign", 
consumer.subscribe(Pattern.compile(".*riven.*"));
 (4) At the beginning, the group status is stable, and everything is normal for 
consumers, then I delete topic: rivenTest88

 
h2. Phenomenon

      Problem phenomenon
  (1) The consumer is blocked in the poll method, no longer consume any 
messages, and the consumer log is always printing
 [main] WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit failed 
on partition rivenTest88-1 at offset 0: This server does not host this 
topic-partition.
 (2) The describe consumerGroup interface of Adminclient  has always timed out, 
and the group status is no longer stable
 (3) The cpu and traffic of the broker are *significantly increased*

 

 
h2. Problem tracking

   By analyzing the kafkaConsumer code, the version is 2.8.1.
 I found that you introduced the waitForJoinGroup variable in the 
updateAssignmentMetadataIfNeeded method. For the reason, I attached the comment 
on the method: "try to update assignment metadata BUT do not need to block on 
the timer for join group".

 
{code:java}
// if not wait for join group, we would just use a timer of 0
  if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need 
// to update the original timer's current time after the call 
  timer.update(time.milliseconds()); 
  return false; 
}
{code}
By tracing the code back layer by layer, it is found that the function of this 
variable is to construct a time.timer(0L) and pass it back to
 The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
 But you will find that there is a submethod onJoinPrepare in the method stack 
of joinGroupIfNeeded, and then there is a line of code in the onJoinPrepare 
method
 maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
max.poll.interval.ms.
 Finally, I tracked down ConsumerCoordinator's method 
commitOffsetsSync(Map offsets, Timer timer)
 The input parameter offsets is subscriptions.allConsumed(), when I delete the 
topic: rivenTest88, commitOffsetsSync(Map 
offsets, Timer timer) method will *fall into an infinite loop! !*
{code:java}
public boolean commitOffsetsSync(Map 
offsets, Timer timer) {
 invokeCompletedOffsetCommitCallbacks();

 if (offsets.isEmpty())
 return true;

 do {
 if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
 return false;
 }

 RequestFuture future = sendOffsetCommitRequest(offsets);
 client.poll(future, timer);

 // We may have had in-flight offset commits when the synchronous commit began. 
If so, ensure that
 // the corresponding callbacks are invoked prior to returning in order to 
preserve the order that
 // the offset commits were applied.
 invokeCompletedOffsetCommitCallbacks();

 if (future.succeeded()) {
 if (interceptors != null)
 interceptors.onCommit(offsets);
 return true;
 }

 if (future.failed() && !future.isRetriable())
 throw future.exception();

 timer.sleep(rebalanceConfig.retryBackoffMs);
 } while (timer.notExpired());

 return false;
}{code}
 

 

*The reason for the endless loop is:*
 (1) The expiration time of the timer is too long, which is max.poll.interval.ms
 (2) The offsets to be submitted contain dirty data and TopicPartition that no 
longer exists
 (3) The response future of sendOffsetCommitRequest(final Map offsets) has always failed, and the exception in the future 
is UnknownTopicOrPartitionException. This exception is allowed to be retried.

Then since the infinite loop interval above is 100ms by default, 
timer.sleep(rebalanceConfig.retryBackoffMs);
 If a large number of consumers have this problem at the same time, a large 
number of network requests will be generated to the Kafka broker, *resulting in 
a sharp increase in the cpu and traffic of the broker machine!*

 

 
h2. Suggest

1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the time of this method is recommended not to use max.poll.interval.ms,
 This parameter is open to users to configure. Through t

[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu a

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Reviewer: Guozhang Wang

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Critical
> Attachments: brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map 
> offsets, Timer timer) {
>  invokeCompletedOffsetCommitCallbacks();
>  if (offsets.isEmpty())
>  return true;
>  do {
>  if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
>  return false;
>  }
>  RequestFuture future = sendOffsetCommitRequest(offsets);
>  client.poll(future, timer);
>  // We may have had in-flight offset commits when the synchronous commit 
> began. If so, ensure that
>  // the corresponding callbacks are invoked prior to returning in order to 
> preserve the order that
>  // the offset commits were applied.
>  invokeCompletedOffsetCommitCallbacks();
>  if (future.succeeded()) {
>  if (interceptors != null)
>  interceptors.onCommit(offsets);
>  return true;
>  }
>  if (future.failed() && !future.isRetriable())
>  throw future.exception();
>  timer.sleep(rebalanceConfig.retryBackoffMs);
>  } while (timer.notExpired());
>  ret

[jira] [Commented] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417083#comment-17417083
 ] 

RivenSun commented on KAFKA-13310:
--

[~guozhang] hi Guozhang, can you help deal with this issue? Thanks a lot.

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Critical
> Attachments: brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map 
> offsets, Timer timer) {
>  invokeCompletedOffsetCommitCallbacks();
>  if (offsets.isEmpty())
>  return true;
>  do {
>  if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
>  return false;
>  }
>  RequestFuture future = sendOffsetCommitRequest(offsets);
>  client.poll(future, timer);
>  // We may have had in-flight offset commits when the synchronous commit 
> began. If so, ensure that
>  // the corresponding callbacks are invoked prior to returning in order to 
> preserve the order that
>  // the offset commits were applied.
>  invokeCompletedOffsetCommitCallbacks();
>  if (future.succeeded()) {
>  if (interceptors != null)
>  interceptors.onCommit(offsets);
>  return true;
>  }
>  if (future.failed() && !future.isRetriable())
>  throw fut

[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu a

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Priority: Major  (was: Critical)

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map 
> offsets, Timer timer) {
>  invokeCompletedOffsetCommitCallbacks();
>  if (offsets.isEmpty())
>  return true;
>  do {
>  if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
>  return false;
>  }
>  RequestFuture future = sendOffsetCommitRequest(offsets);
>  client.poll(future, timer);
>  // We may have had in-flight offset commits when the synchronous commit 
> began. If so, ensure that
>  // the corresponding callbacks are invoked prior to returning in order to 
> preserve the order that
>  // the offset commits were applied.
>  invokeCompletedOffsetCommitCallbacks();
>  if (future.succeeded()) {
>  if (interceptors != null)
>  interceptors.onCommit(offsets);
>  return true;
>  }
>  if (future.failed() && !future.isRetriable())
>  throw future.exception();
>  timer.sleep(rebalanceConfig.retryBackoffMs);
>  } while (timer.notExpired());

[jira] [Commented] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu

2021-09-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417130#comment-17417130
 ] 

Luke Chen commented on KAFKA-13310:
---

Nice RCA and good suggestions.

 

My 2 cents:

For suggestion(1), I think, in general, we set "rebalanceTimeout" as 
"maxPollInterval" makes sense, because the rebalance period implies the delay 
between 2 polls (before and after rebalance). But I agree that, during 
poll(duration), it's not good to have another timer (set as max poll interval) 
to wait for commit offsets, which will delay the poll process. Maybe we can 
pass the timer from poll() to the `onJoinPrepare`?

 

For suggestion(2), I think even we use the metadata got before 
`ensureActiveGroup`, there's still possibility to have race condition after 
committing offsets. It's pretty difficult to identify the 
`UnknownTopicOrPartitionException` is topic deleted or not ready yet or other 
reasons, so I think the point here should be: set a good wait time (so, back to 
suggestion(1))

 

For suggestion(3), I also think we should set the wait time properly as 
described in (1).

 

Thank you.

 

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndM

[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu a

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Attachment: SecondDeleteConsumerLog.png

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, brokerCpu.png, 
> brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map 
> offsets, Timer timer) {
>  invokeCompletedOffsetCommitCallbacks();
>  if (offsets.isEmpty())
>  return true;
>  do {
>  if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
>  return false;
>  }
>  RequestFuture future = sendOffsetCommitRequest(offsets);
>  client.poll(future, timer);
>  // We may have had in-flight offset commits when the synchronous commit 
> began. If so, ensure that
>  // the corresponding callbacks are invoked prior to returning in order to 
> preserve the order that
>  // the offset commits were applied.
>  invokeCompletedOffsetCommitCallbacks();
>  if (future.succeeded()) {
>  if (interceptors != null)
>  interceptors.onCommit(offsets);
>  return true;
>  }
>  if (future.failed() && !future.isRetriable())
>  throw future.exception();
>  timer.sleep(rebalanceConfig.retryBackof

[jira] [Commented] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417145#comment-17417145
 ] 

RivenSun commented on KAFKA-13310:
--

Thank you [~showuon] very much for your reply

I performed the second scene reappearance, and the phenomenon remains the same 
as before.
For consumer logs, see the attachment: _SecondDeleteConsumerLog_.

!SecondDeleteConsumerLog.png!

 

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached _SecondDeleteDebugLog_.

!SecondDeleteDebugLog.png!

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.
So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether UnknownTopicOrPartitionException really means that 
the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.
thanks again

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
> max.poll

[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end cpu a

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Attachment: SecondDeleteDebugLog.png

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map 
> offsets, Timer timer) {
>  invokeCompletedOffsetCommitCallbacks();
>  if (offsets.isEmpty())
>  return true;
>  do {
>  if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
>  return false;
>  }
>  RequestFuture future = sendOffsetCommitRequest(offsets);
>  client.poll(future, timer);
>  // We may have had in-flight offset commits when the synchronous commit 
> began. If so, ensure that
>  // the corresponding callbacks are invoked prior to returning in order to 
> preserve the order that
>  // the offset commits were applied.
>  invokeCompletedOffsetCommitCallbacks();
>  if (future.succeeded()) {
>  if (interceptors != null)
>  interceptors.onCommit(offsets);
>  return true;
>  }
>  if (future.failed() && !future.isRetriable())
>  throw future.exception();
>  timer.sleep(reba

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker en

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417145#comment-17417145
 ] 

RivenSun edited comment on KAFKA-13310 at 9/18/21, 1:55 PM:


Thank you [~showuon] very much for your reply

I performed the second scene reappearance, and the phenomenon remains the same 
as before.
 For consumer logs, see the attachment: _SecondDeleteConsumerLog_.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached _SecondDeleteDebugLog_.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.
 So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether UnknownTopicOrPartitionException really means that 
the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.
 thanks again


was (Author: rivensun):
Thank you [~showuon] very much for your reply

I performed the second scene reappearance, and the phenomenon remains the same 
as before.
For consumer logs, see the attachment: _SecondDeleteConsumerLog_.

!SecondDeleteConsumerLog.png!

 

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached _SecondDeleteDebugLog_.

!SecondDeleteDebugLog.png!

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.
So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether UnknownTopicOrPartitionException really means that 
the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.
thanks again

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker en

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417145#comment-17417145
 ] 

RivenSun edited comment on KAFKA-13310 at 9/18/21, 1:56 PM:


Thank you [~showuon] very much for your reply

I performed the second scene reappearance, and the phenomenon remains the same 
as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.
 So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether UnknownTopicOrPartitionException really means that 
the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.
 thanks again


was (Author: rivensun):
Thank you [~showuon] very much for your reply

I performed the second scene reappearance, and the phenomenon remains the same 
as before.
 For consumer logs, see the attachment: _SecondDeleteConsumerLog_.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached _SecondDeleteDebugLog_.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.
 So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether UnknownTopicOrPartitionException really means that 
the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.
 thanks again

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGro

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker en

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417145#comment-17417145
 ] 

RivenSun edited comment on KAFKA-13310 at 9/18/21, 2:05 PM:


Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.
 So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether UnknownTopicOrPartitionException really means that 
the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.
 thanks again


was (Author: rivensun):
Thank you [~showuon] very much for your reply

I performed the second scene reappearance, and the phenomenon remains the same 
as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.
 So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether UnknownTopicOrPartitionException really means that 
the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.
 thanks again

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker en

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417145#comment-17417145
 ] 

RivenSun edited comment on KAFKA-13310 at 9/18/21, 2:54 PM:


Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.

sourceCode for update  *subscription* can refer as below:

ConsumerCoordinator.java
{code:java}
void maybeUpdateSubscriptionMetadata() { int version = 
metadata.updateVersion(); if (version > metadataSnapshot.version) { Cluster 
cluster = metadata.fetch(); if (subscriptions.hasPatternSubscription()) 
updatePatternSubscription(cluster); // Update the current snapshot, which will 
be used to check for subscription // changes that would require a rebalance 
(e.g. new partitions). metadataSnapshot = new MetadataSnapshot(subscriptions, 
cluster, version); } }
{code}
SubscriptionState.java
{code:java}
private boolean changeSubscription(Set topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}
{code}

 So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether `UnknownTopicOrPartitionException` really means 
that the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm.

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.


 thanks again


was (Author: rivensun):
Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.
 So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether UnknownTopicOrPartitionException really means that 
the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.
 thanks again

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At th

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker en

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417145#comment-17417145
 ] 

RivenSun edited comment on KAFKA-13310 at 9/18/21, 2:57 PM:


Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
*subscription*.

sourceCode for update  *subscription* can refer as below:

ConsumerCoordinator.java
{code:java}
void maybeUpdateSubscriptionMetadata() { int version = 
metadata.updateVersion(); if (version > metadataSnapshot.version) { Cluster 
cluster = metadata.fetch(); if (subscriptions.hasPatternSubscription()) 
updatePatternSubscription(cluster); // Update the current snapshot, which will 
be used to check for subscription // changes that would require a rebalance 
(e.g. new partitions). metadataSnapshot = new MetadataSnapshot(subscriptions, 
cluster, version); } }
{code}
SubscriptionState.java
{code:java}
private boolean changeSubscription(Set topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}
{code}
So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid *subscription* to clean up the *dirty topicPartitions* in the 
offsets we will submit. If you are not sure whether 
`UnknownTopicOrPartitionException` really means that the topic is deleted, you 
can further call the listTopics(final ListTopicsOptions options) method in 
KafkaAdminClient to confirm.

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.

thanks again


was (Author: rivensun):
Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
*subscription*.

sourceCode for update  *subscription* can refer as below:

ConsumerCoordinator.java
{code:java}
void maybeUpdateSubscriptionMetadata() { int version = 
metadata.updateVersion(); if (version > metadataSnapshot.version) { Cluster 
cluster = metadata.fetch(); if (subscriptions.hasPatternSubscription()) 
updatePatternSubscription(cluster); // Update the current snapshot, which will 
be used to check for subscription // changes that would require a rebalance 
(e.g. new partitions). metadataSnapshot = new MetadataSnapshot(subscriptions, 
cluster, version); } }
{code}
SubscriptionState.java
{code:java}
private boolean changeSubscription(Set topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}
{code}
So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid *subscription* to clean up the *dirty TopicPartition* in the offsets 
we will submit. If you are not sure whether `UnknownTopicOrPartitionException` 
really means that the topic is deleted, you can further call the 
listTopics(final ListTopicsOptions options) method in KafkaAdminClient to 
confirm.

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.

thanks again

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker en

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417145#comment-17417145
 ] 

RivenSun edited comment on KAFKA-13310 at 9/18/21, 2:57 PM:


Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
*subscription*.

sourceCode for update  *subscription* can refer as below:

ConsumerCoordinator.java
{code:java}
void maybeUpdateSubscriptionMetadata() { int version = 
metadata.updateVersion(); if (version > metadataSnapshot.version) { Cluster 
cluster = metadata.fetch(); if (subscriptions.hasPatternSubscription()) 
updatePatternSubscription(cluster); // Update the current snapshot, which will 
be used to check for subscription // changes that would require a rebalance 
(e.g. new partitions). metadataSnapshot = new MetadataSnapshot(subscriptions, 
cluster, version); } }
{code}
SubscriptionState.java
{code:java}
private boolean changeSubscription(Set topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}
{code}
So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid *subscription* to clean up the *dirty TopicPartition* in the offsets 
we will submit. If you are not sure whether `UnknownTopicOrPartitionException` 
really means that the topic is deleted, you can further call the 
listTopics(final ListTopicsOptions options) method in KafkaAdminClient to 
confirm.

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.

thanks again


was (Author: rivensun):
Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
topicSets.

sourceCode for update  *subscription* can refer as below:

ConsumerCoordinator.java
{code:java}
void maybeUpdateSubscriptionMetadata() { int version = 
metadata.updateVersion(); if (version > metadataSnapshot.version) { Cluster 
cluster = metadata.fetch(); if (subscriptions.hasPatternSubscription()) 
updatePatternSubscription(cluster); // Update the current snapshot, which will 
be used to check for subscription // changes that would require a rebalance 
(e.g. new partitions). metadataSnapshot = new MetadataSnapshot(subscriptions, 
cluster, version); } }
{code}
SubscriptionState.java
{code:java}
private boolean changeSubscription(Set topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}
{code}

 So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid topicSets to clean up the dirty data in the offsets we will submit. 
If you are not sure whether `UnknownTopicOrPartitionException` really means 
that the topic is deleted, you can further call the listTopics(final 
ListTopicsOptions options) method in KafkaAdminClient to confirm.

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.


 thanks again

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
>   

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker en

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417145#comment-17417145
 ] 

RivenSun edited comment on KAFKA-13310 at 9/18/21, 3:01 PM:


Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.
In this endless loop code, we can find that the *Set subscription* in 
the instance variable metadata in ConsumerCoordinator has been refreshed to the 
latest valid *subscription*.

SourceCode for update  *subscription* can refer as below:

ConsumerCoordinator.java
{code:java}
void maybeUpdateSubscriptionMetadata() { int version = 
metadata.updateVersion(); if (version > metadataSnapshot.version) { Cluster 
cluster = metadata.fetch(); if (subscriptions.hasPatternSubscription()) 
updatePatternSubscription(cluster); // Update the current snapshot, which will 
be used to check for subscription // changes that would require a rebalance 
(e.g. new partitions). metadataSnapshot = new MetadataSnapshot(subscriptions, 
cluster, version); } }
{code}
SubscriptionState.java
{code:java}
private boolean changeSubscription(Set topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}
{code}
So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid *subscription* to clean up the *dirty topicPartitions* in the 
offsets we will submit. If you are not sure whether 
`UnknownTopicOrPartitionException` really means that the topic is deleted, you 
can further call the listTopics(final ListTopicsOptions options) method in 
KafkaAdminClient to confirm.

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.

thanks again


was (Author: rivensun):
Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.

 

In this loop, we can find that the *Set subscription* in the instance 
variable metadata in ConsumerCoordinator has been refreshed to the latest valid 
*subscription*.

sourceCode for update  *subscription* can refer as below:

ConsumerCoordinator.java
{code:java}
void maybeUpdateSubscriptionMetadata() { int version = 
metadata.updateVersion(); if (version > metadataSnapshot.version) { Cluster 
cluster = metadata.fetch(); if (subscriptions.hasPatternSubscription()) 
updatePatternSubscription(cluster); // Update the current snapshot, which will 
be used to check for subscription // changes that would require a rebalance 
(e.g. new partitions). metadataSnapshot = new MetadataSnapshot(subscriptions, 
cluster, version); } }
{code}
SubscriptionState.java
{code:java}
private boolean changeSubscription(Set topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}
{code}
So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid *subscription* to clean up the *dirty topicPartitions* in the 
offsets we will submit. If you are not sure whether 
`UnknownTopicOrPartitionException` really means that the topic is deleted, you 
can further call the listTopics(final ListTopicsOptions options) method in 
KafkaAdminClient to confirm.

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.

thanks again

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>  

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker en

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417145#comment-17417145
 ] 

RivenSun edited comment on KAFKA-13310 at 9/18/21, 3:03 PM:


Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

 

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.
 In this endless loop code, we can find that the *Set subscription* in 
the instance variable metadata in ConsumerCoordinator has been refreshed to the 
latest valid *subscription*.  SourceCode for update  *subscription* can refer 
as below:

ConsumerCoordinator.java
{code:java}

void maybeUpdateSubscriptionMetadata() {
int version = metadata.updateVersion();
if (version > metadataSnapshot.version) {
Cluster cluster = metadata.fetch();

if (subscriptions.hasPatternSubscription())
updatePatternSubscription(cluster);

// Update the current snapshot, which will be used to check for 
subscription
// changes that would require a rebalance (e.g. new partitions).
metadataSnapshot = new MetadataSnapshot(subscriptions, cluster, 
version);
}
}
{code}
SubscriptionState.java
{code:java}
private boolean changeSubscription(Set topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}
{code}
 

So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid *subscription* to clean up the *dirty topicPartitions* in the 
offsets we will submit. If you are not sure whether 
`UnknownTopicOrPartitionException` really means that the topic is deleted, you 
can further call the listTopics(final ListTopicsOptions options) method in 
KafkaAdminClient to confirm.

 

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.

Thanks again.


was (Author: rivensun):
Thank you [~showuon] very much for your reply

I performed the second scene reappearance,  consumerGroup still only has *one 
consumer*, and the phenomenon remains the same as before.
 For consumer logs, see the attachment: *_SecondDeleteConsumerLog_*.

In fact, when kafkaConsumer falls into the above-mentioned infinite loop code, 
please refer to the attached *_SecondDeleteDebugLog_*.
In this endless loop code, we can find that the *Set subscription* in 
the instance variable metadata in ConsumerCoordinator has been refreshed to the 
latest valid *subscription*.

SourceCode for update  *subscription* can refer as below:

ConsumerCoordinator.java
{code:java}
void maybeUpdateSubscriptionMetadata() { int version = 
metadata.updateVersion(); if (version > metadataSnapshot.version) { Cluster 
cluster = metadata.fetch(); if (subscriptions.hasPatternSubscription()) 
updatePatternSubscription(cluster); // Update the current snapshot, which will 
be used to check for subscription // changes that would require a rebalance 
(e.g. new partitions). metadataSnapshot = new MetadataSnapshot(subscriptions, 
cluster, version); } }
{code}
SubscriptionState.java
{code:java}
private boolean changeSubscription(Set topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false;

subscription = topicsToSubscribe;
return true;
}
{code}
So if there is an exception of `UnknownTopicOrPartitionException`, we can use 
this valid *subscription* to clean up the *dirty topicPartitions* in the 
offsets we will submit. If you are not sure whether 
`UnknownTopicOrPartitionException` really means that the topic is deleted, you 
can further call the listTopics(final ListTopicsOptions options) method in 
KafkaAdminClient to confirm.

Later I will download the sourceCode of Kafka-client 2.8.1 version, try to 
write the code change scheme I mentioned above, build and generate the test jar 
package for testing, and hope that the problem will be fixed.

thanks again

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Broker end cpu and traffic increase sharply
> 
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Aff

[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffic

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Summary: KafkaConsumer cannot jump out of the poll method, and the consumer 
is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer 
timer). Cpu and traffic of  Broker‘s side increase sharply  (was: KafkaConsumer 
cannot jump out of the poll method, and the consumer is blocked in the 
ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Broker end 
cpu and traffic increase sharply)

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> ---
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMsd is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*
> {code:java}
> public boolean commitOffsetsSync(Map 
> offsets, Timer timer) {
>  invokeCompletedOffsetCommitCallbacks();
>  if (offsets.isEmpty())
>  return true;
>  do {
>  if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
>  return false;
>  }
>  RequestFuture future = sendOffsetCommitRequest(offsets);
>  client.poll(future, timer);
>  // We may have had in-flight offset commits when the synchronous commit 
> began. If so, ensure th

[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffic

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Description: 
h2. Foreword

      Because our consumers' consumption logic is sometimes heavier, we refer 
to the configuration of Kafka stream 
[https://kafka.apache.org/documentation/#upgrade_10201_notable]
 Set max.poll.interval.ms to Integer.MAX_VALUE
 Our consumers have adopted method : 
consumer.subscribe(Pattern.compile(".*riven.*"));

 
h2. Recurrence of the problem scene

operate steps are
 (1) Test environment Kafka cluster: three brokers
 (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
and rivenTest88
 (3) Only one consumer is needed, group.id is "rivenReassign", 
consumer.subscribe(Pattern.compile(".*riven.*"));
 (4) At the beginning, the group status is stable, and everything is normal for 
consumers, then I delete topic: rivenTest88

 
h2. Phenomenon

      Problem phenomenon
  (1) The consumer is blocked in the poll method, no longer consume any 
messages, and the consumer log is always printing
 [main] WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit failed 
on partition rivenTest88-1 at offset 0: This server does not host this 
topic-partition.
 (2) The describe consumerGroup interface of Adminclient  has always timed out, 
and the group status is no longer stable
 (3) The cpu and traffic of the broker are *significantly increased*

 

 
h2. Problem tracking

   By analyzing the kafkaConsumer code, the version is 2.8.1.
 I found that you introduced the waitForJoinGroup variable in the 
updateAssignmentMetadataIfNeeded method. For the reason, I attached the comment 
on the method: "try to update assignment metadata BUT do not need to block on 
the timer for join group".

 
{code:java}
// if not wait for join group, we would just use a timer of 0
  if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need 
// to update the original timer's current time after the call 
  timer.update(time.milliseconds()); 
  return false; 
}
{code}
By tracing the code back layer by layer, it is found that the function of this 
variable is to construct a time.timer(0L) and pass it back to
 The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
 But you will find that there is a submethod onJoinPrepare in the method stack 
of joinGroupIfNeeded, and then there is a line of code in the onJoinPrepare 
method
 maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the value of rebalanceConfig.rebalanceTimeoutMs is actually 
max.poll.interval.ms.
 Finally, I tracked down ConsumerCoordinator's method 
commitOffsetsSync(Map offsets, Timer timer)
 The input parameter offsets is subscriptions.allConsumed(), when I delete the 
topic: rivenTest88, commitOffsetsSync(Map 
offsets, Timer timer) method will *fall into an infinite loop! !*
{code:java}
public boolean commitOffsetsSync(Map 
offsets, Timer timer) {
 invokeCompletedOffsetCommitCallbacks();

 if (offsets.isEmpty())
 return true;

 do {
 if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
 return false;
 }

 RequestFuture future = sendOffsetCommitRequest(offsets);
 client.poll(future, timer);

 // We may have had in-flight offset commits when the synchronous commit began. 
If so, ensure that
 // the corresponding callbacks are invoked prior to returning in order to 
preserve the order that
 // the offset commits were applied.
 invokeCompletedOffsetCommitCallbacks();

 if (future.succeeded()) {
 if (interceptors != null)
 interceptors.onCommit(offsets);
 return true;
 }

 if (future.failed() && !future.isRetriable())
 throw future.exception();

 timer.sleep(rebalanceConfig.retryBackoffMs);
 } while (timer.notExpired());

 return false;
}{code}
 

 

*The reason for the endless loop is:*
 (1) The expiration time of the timer is too long, which is max.poll.interval.ms
 (2) The offsets to be submitted contain dirty data and TopicPartition that no 
longer exists
 (3) The response future of sendOffsetCommitRequest(final Map offsets) has always failed, and the exception in the future 
is UnknownTopicOrPartitionException. This exception is allowed to be retried.

Then since the infinite loop interval above is 100ms by default, 
timer.sleep(rebalanceConfig.retryBackoffMs);
 If a large number of consumers have this problem at the same time, a large 
number of network requests will be generated to the Kafka broker, *resulting in 
a sharp increase in the cpu and traffic of the broker machine!*

 

 
h2. Suggest

1.maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the time of this method is recommended not to use max.poll.interval.ms,
 This parameter is open to users to configure. Through th

[jira] [Commented] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffi

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417165#comment-17417165
 ] 

RivenSun commented on KAFKA-13310:
--

And I think the most important and critical point is *not to find a good wait 
time* for the method commitOffsetsSync(Map 
offsets, Timer timer), but to *immediately clean up/give up* 
unknownTopicPartitions when submitting offsets in this infinite loop code.


Even if you think that `UnknownTopicOrPartitionException` may not really mean 
that the topic is really deleted. But when we encounter this exception, 
shouldn't we temporarily give up submitting offsets for these 
unknownTopicPartitions?  What is the point of submitting offsets for these 
unknownTopicPartitions repeatedly?


The worst effect is that if unknownTopicPartitions may not be deleted by the 
broker, we will consume a small portion of the partition messages (if we give 
up submitting these unknownTopicPartitions)


What do you think? [~showuon]

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> ---
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to
>  The method joinGroupIfNeeded (final Timer timer) in AbstractCoordinator.
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalanceTimeoutMs is actually 
> max.poll.interval.ms.
>  Finally, I tracked down ConsumerCoordinator's method 
> commitOffsetsSync(Map offsets, Timer timer)
>  The input parameter offsets is subscriptions.allConsumed(), when I delete 
> the topic: rivenTest88, commitOffsetsSync(Map OffsetAndMetadata> offsets, Timer timer) method will *fall into an infinite 
> loop! !*

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and t

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417165#comment-17417165
 ] 

RivenSun edited comment on KAFKA-13310 at 9/18/21, 3:50 PM:


And I think the most important and critical point is *not to find a good wait 
time* for the method commitOffsetsSync(Map 
offsets, Timer timer), but to *immediately clean up/give up* 
unknownTopicPartitions when submitting offsets in this infinite loop code.

Even if you think that `UnknownTopicOrPartitionException` may not really mean 
that the topic is really deleted. But when we encounter this exception, 
shouldn't we temporarily give up submitting offsets for these 
unknownTopicPartitions?  What is the point of submitting offsets for these 
unknownTopicPartitions repeatedly?

*The worst effect* is that if unknownTopicPartitions may not be deleted by the 
broker, we will consume a small portion of the partition messages (if we give 
up submitting these unknownTopicPartitions)

What do you think? [~showuon]


was (Author: rivensun):
And I think the most important and critical point is *not to find a good wait 
time* for the method commitOffsetsSync(Map 
offsets, Timer timer), but to *immediately clean up/give up* 
unknownTopicPartitions when submitting offsets in this infinite loop code.


Even if you think that `UnknownTopicOrPartitionException` may not really mean 
that the topic is really deleted. But when we encounter this exception, 
shouldn't we temporarily give up submitting offsets for these 
unknownTopicPartitions?  What is the point of submitting offsets for these 
unknownTopicPartitions repeatedly?


The worst effect is that if unknownTopicPartitions may not be deleted by the 
broker, we will consume a small portion of the partition messages (if we give 
up submitting these unknownTopicPartitions)


What do you think? [~showuon]

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> ---
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group".
>  
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   retu

[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417203#comment-17417203
 ] 

Matthias J. Sax commented on KAFKA-13261:
-

Let keep the discussion on the mailing list. – But I agree to the problem you 
describe. The FK-join can only work, if both tables are only partitioned using 
their respective key. I don't think it will become a problem in practice, but 
it's worth to point out in the JavaDocs.

Furthermore, this problem seems to be very fundamental and I don't think we 
could solve it. As you pointed out, we don't have access to the value of the 
foreign key and thus could not pass it into the partitioner. I also agree that 
we should not send the full left-hand value to the right side just for this 
purpose. As a matter of fact, in the original FK-KIP, there was a discussion 
about sending the left value and computing the join result on the right side to 
avoid the response topics – this idea was rejected for various reasons.

After I read the first paragraphs, my thought was also "let's make the 
value-type {{Void}}", so I am happy that you propose the exact same thing! I am 
on-board with it! – I'll reply to the mailing list, too.

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip
> Attachments: KafkaTest.java
>
>
> KIP-775: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]
>  
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffic

2021-09-18 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13310:
-
Description: 
h2. Foreword

      Because our consumers' consumption logic is sometimes heavier, we refer 
to the configuration of Kafka stream 
[https://kafka.apache.org/documentation/#upgrade_10201_notable]
 Set max.poll.interval.ms to Integer.MAX_VALUE
 Our consumers have adopted method : 
consumer.subscribe(Pattern.compile(".*riven.*"));

 
h2. Recurrence of the problem scene

operate steps are
 (1) Test environment Kafka cluster: three brokers
 (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
and rivenTest88
 (3) Only one consumer is needed, group.id is "rivenReassign", 
consumer.subscribe(Pattern.compile(".*riven.*"));
 (4) At the beginning, the group status is stable, and everything is normal for 
consumers, then I delete topic: rivenTest88

 
h2. Phenomenon

      Problem phenomenon
  (1) The consumer is blocked in the poll method, no longer consume any 
messages, and the consumer log is always printing
 [main] WARN 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit failed 
on partition rivenTest88-1 at offset 0: This server does not host this 
topic-partition.
 (2) The describe consumerGroup interface of Adminclient  has always timed out, 
and the group status is no longer stable
 (3) The cpu and traffic of the broker are *significantly increased*

 

 
h2. Problem tracking

   By analyzing the kafkaConsumer code, the version is 2.8.1.
 I found that you introduced the waitForJoinGroup variable in the 
updateAssignmentMetadataIfNeeded method. For the reason, I attached the comment 
on the method: "try to update assignment metadata BUT do not need to block on 
the timer for join group". See as below:

 
{code:java}
 if (includeMetadataInTimeout) {
// try to update assignment metadata BUT do not need to block on the timer 
for join group
updateAssignmentMetadataIfNeeded(timer, false);
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) 
{
log.warn("Still waiting for metadata");
}
}{code}
 

 

By tracing the code back layer by layer, it is found that the function of this 
variable is to construct a time.timer(0L) and pass it back to the method 
joinGroupIfNeeded (final Timer timer) in AbstractCoordinator. See as below:
{code:java}
// if not wait for join group, we would just use a timer of 0
  if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
// since we may use a different timer in the callee, we'd still need 
// to update the original timer's current time after the call 
  timer.update(time.milliseconds()); 
  return false; 
}
{code}
 But you will find that there is a submethod onJoinPrepare in the method stack 
of joinGroupIfNeeded, and then there is a line of code in the onJoinPrepare 
method
 maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
the value of rebalanceConfig.rebalanceTimeoutMs is actually 
max.poll.interval.ms.
 Finally, I tracked down ConsumerCoordinator's method 
commitOffsetsSync(Map offsets, Timer timer)
 The input parameter offsets is subscriptions.allConsumed(), when I delete the 
topic: rivenTest88, commitOffsetsSync(Map 
offsets, Timer timer) method will *fall into an infinite loop! !*
{code:java}
public boolean commitOffsetsSync(Map 
offsets, Timer timer) {
 invokeCompletedOffsetCommitCallbacks();

 if (offsets.isEmpty())
 return true;

 do {
 if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
 return false;
 }

 RequestFuture future = sendOffsetCommitRequest(offsets);
 client.poll(future, timer);

 // We may have had in-flight offset commits when the synchronous commit began. 
If so, ensure that
 // the corresponding callbacks are invoked prior to returning in order to 
preserve the order that
 // the offset commits were applied.
 invokeCompletedOffsetCommitCallbacks();

 if (future.succeeded()) {
 if (interceptors != null)
 interceptors.onCommit(offsets);
 return true;
 }

 if (future.failed() && !future.isRetriable())
 throw future.exception();

 timer.sleep(rebalanceConfig.retryBackoffMs);
 } while (timer.notExpired());

 return false;
}{code}
 

 

*The reason for the endless loop is:*
 (1) The expiration time of the timer is too long, which is max.poll.interval.ms
 (2) The offsets to be submitted contain dirty data and TopicPartition that no 
longer exists
 (3) The response future of sendOffsetCommitRequest(final Map offsets) has always failed, and the exception in the future 
is UnknownTopicOrPartitionException. This exception is allowed to be retried.

Then since the infinite loop interval above is 100ms by default, 
timer.sleep(rebalanceConfig.retryBackoffMs);
 If a large number of consumers have this problem at the same time, a

[GitHub] [kafka] abbccdda commented on a change in pull request #11333: KAFKA-13306: Null connector config value passes validation, but fails creation

2021-09-18 Thread GitBox


abbccdda commented on a change in pull request #11333:
URL: https://github.com/apache/kafka/pull/11333#discussion_r711655376



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
##
@@ -150,4 +152,15 @@ public Config validate(Map 
connectorConfigs) {
  * @return The ConfigDef for this connector; may not be null.
  */
 public abstract ConfigDef config();
+
+private void validateConfigDoesNotContainNull(Map 
connectorConfigs) {
+final String keysWithNullValue = connectorConfigs.entrySet().stream()
+.filter(entry -> entry.getValue() == null)
+.map(Map.Entry::getKey)
+.collect(Collectors.joining(", "));
+
+if (!keysWithNullValue.isEmpty()) {
+throw new ConnectException(String.format("Null value found in 
config for key(s) %s", keysWithNullValue));

Review comment:
   Should this just be IllegalArgumentException?

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##
@@ -289,6 +290,23 @@ public void testBuildRestartPlanForUnknownConnector() {
 assertFalse(mayBeRestartPlan.isPresent());
 }
 
+@Test
+public void testConfigValidationNullConfig() {

Review comment:
   Could you also add a test case with multiple null values as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffi

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417270#comment-17417270
 ] 

RivenSun commented on KAFKA-13310:
--

[~showuon] 

After thinking about it, here is my latest points.
Hope you can reply me as soon as possible, thank you.


     1. Because the poll method of KafkaConsumer promises to return within the 
time specified by the customer, unless the customer sets the 
ConsumerRebalanceListener to perform a time-consuming operation. See the 
comment of poll(final Duration timeout) method as below:

 
{code:java}
This method returns immediately if there are records available. Otherwise, it 
will await the passed timeout. If the timeout expires, an empty record set will 
be returned. Note that this method may block beyond the timeout in order to 
execute custom ConsumerRebalanceListener callbacks.{code}
 

In order to keep this promise, I also suggest passing the *Timer set by the 
customer for poll()* to the maybeAutoCommitOffsetsSync(*Timer timer*) method of 
ConsumerCoordinator

 

     2. Modify ConsumerCoordinator's maybeAutoCommitOffsetsSync(Timer timer) 
method
Below are my preliminary code changes:

 
{code:java}
   private void maybeAutoCommitOffsetsSync(Timer timer) {
if (autoCommitEnabled) {
Map allConsumedOffsets = 
subscriptions.allConsumed();

cleanUpConsumedOffsets(allConsumedOffsets);

try {
log.debug("Sending synchronous auto-commit of offsets {}", 
allConsumedOffsets);
if (!commitOffsetsSync(allConsumedOffsets, timer))
log.debug("Auto-commit of offsets {} timed out before 
completion", allConsumedOffsets);
} catch (WakeupException | InterruptException e) {
log.debug("Auto-commit of offsets {} was interrupted before 
completion", allConsumedOffsets);
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate 
the exception
log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumedOffsets, e.getMessage());
}
}
}{code}
{code:java}
  private void cleanUpConsumedOffsets(Map 
willCommitOffsets) {

if (willCommitOffsets.isEmpty())
return;

Set subscription = subscriptions.subscription();
Set toGiveUpTopicPartitions = new HashSet<>();

Iterator> iterator = 
willCommitOffsets.entrySet().iterator();

while (iterator.hasNext()) {

Map.Entry entry = iterator.next();

if (!subscription.contains(entry.getKey().topic())) {

toGiveUpTopicPartitions.add(entry.getKey());
iterator.remove();
}

}

if (toGiveUpTopicPartitions.size() > 0) {

//Because toGiveUpTopicPartitions may receive 
`UnknownTopicOrPartitionException` when submitting their offsets.
//We are prepared to abandon them. The worst effect is that these 
partitions may repeatedly consume some messages
log.warn("Synchronous auto-commit of offsets {} will be abandoned", 
toGiveUpTopicPartitions);

}
}

{code}
 

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> ---
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer 

[GitHub] [kafka] vijaykriishna commented on pull request #10873: KAFKA-7360 Fixed code snippet

2021-09-18 Thread GitBox


vijaykriishna commented on pull request #10873:
URL: https://github.com/apache/kafka/pull/10873#issuecomment-922391928


   @halorgium @astubbs @alexism @glasser Please review the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and t

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417270#comment-17417270
 ] 

RivenSun edited comment on KAFKA-13310 at 9/19/21, 12:24 AM:
-

[~showuon]

After thinking about it, here is my latest points.
 Hope you can reply me as soon as possible, thank you.

     1. Because the poll method of KafkaConsumer promises to return within the 
time specified by the customer, unless the customer sets the 
ConsumerRebalanceListener to perform a time-consuming operation. See the 
comment of poll(final Duration timeout) method as below:

 
{code:java}
This method returns immediately if there are records available. Otherwise, it 
will await the passed timeout. If the timeout expires, an empty record set will 
be returned. Note that this method may block beyond the timeout in order to 
execute custom ConsumerRebalanceListener callbacks.{code}
 

In order to keep this promise, I also suggest passing the *Timer set by the 
customer for poll()* to the maybeAutoCommitOffsetsSync(*Timer timer*) method of 
ConsumerCoordinator
And the poll()'s Timer should be applied to *onJoinPrepare*.
However, due to other comprehensive considerations, the timers *in other code 
blocks* in ensureActiveGroup (final Timer timer) *should remain as they are*, 
using the Timer passed from the upper layer, perhaps time.timer(0L)

 

     2. Modify ConsumerCoordinator's maybeAutoCommitOffsetsSync(Timer timer) 
method
 Below are my preliminary code changes:

 
{code:java}
   private void maybeAutoCommitOffsetsSync(Timer timer) {
if (autoCommitEnabled) {
Map allConsumedOffsets = 
subscriptions.allConsumed();

cleanUpConsumedOffsets(allConsumedOffsets);

try {
log.debug("Sending synchronous auto-commit of offsets {}", 
allConsumedOffsets);
if (!commitOffsetsSync(allConsumedOffsets, timer))
log.debug("Auto-commit of offsets {} timed out before 
completion", allConsumedOffsets);
} catch (WakeupException | InterruptException e) {
log.debug("Auto-commit of offsets {} was interrupted before 
completion", allConsumedOffsets);
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate 
the exception
log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumedOffsets, e.getMessage());
}
}
}{code}
{code:java}
  private void cleanUpConsumedOffsets(Map 
willCommitOffsets) {

if (willCommitOffsets.isEmpty())
return;

Set subscription = subscriptions.subscription();
Set toGiveUpTopicPartitions = new HashSet<>();

Iterator> iterator = 
willCommitOffsets.entrySet().iterator();

while (iterator.hasNext()) {

Map.Entry entry = iterator.next();

if (!subscription.contains(entry.getKey().topic())) {

toGiveUpTopicPartitions.add(entry.getKey());
iterator.remove();
}

}

if (toGiveUpTopicPartitions.size() > 0) {

//Because toGiveUpTopicPartitions may receive 
`UnknownTopicOrPartitionException` when submitting their offsets.
//We are prepared to abandon them. The worst effect is that these 
partitions may repeatedly consume some messages
log.warn("Synchronous auto-commit of offsets {} will be abandoned", 
toGiveUpTopicPartitions);

}
}

{code}
 


was (Author: rivensun):
[~showuon] 

After thinking about it, here is my latest points.
Hope you can reply me as soon as possible, thank you.


     1. Because the poll method of KafkaConsumer promises to return within the 
time specified by the customer, unless the customer sets the 
ConsumerRebalanceListener to perform a time-consuming operation. See the 
comment of poll(final Duration timeout) method as below:

 
{code:java}
This method returns immediately if there are records available. Otherwise, it 
will await the passed timeout. If the timeout expires, an empty record set will 
be returned. Note that this method may block beyond the timeout in order to 
execute custom ConsumerRebalanceListener callbacks.{code}
 

In order to keep this promise, I also suggest passing the *Timer set by the 
customer for poll()* to the maybeAutoCommitOffsetsSync(*Timer timer*) method of 
ConsumerCoordinator

 

     2. Modify ConsumerCoordinator's maybeAutoCommitOffsetsSync(Timer timer) 
method
Below are my preliminary code changes:

 
{code:java}
   private void maybeAutoCommitOffsetsSync(Timer timer) {
if (autoCommitEnabled) {
Map allConsumedOffsets = 
subscriptions.allConsumed();

cleanUpConsumedOffsets(allConsumedOffsets);

try {
log.debug("Sending synchronous auto-commit of offsets {}", 
allConsumedOffsets);
if (!commitOffset

[jira] [Assigned] (KAFKA-13186) Proposal for commented code

2021-09-18 Thread Vijay (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vijay reassigned KAFKA-13186:
-

Assignee: Vijay

> Proposal for commented code
> ---
>
> Key: KAFKA-13186
> URL: https://issues.apache.org/jira/browse/KAFKA-13186
> Project: Kafka
>  Issue Type: Wish
>Reporter: CoolGuy
>Assignee: Vijay
>Priority: Trivial
>
> Hello! I saw in your [coding 
> guidelines|https://kafka.apache.org/coding-guide.html] that ??Don't check in 
> commented out code. ??
> However, I still witness commented code in some files like:
> * 
> connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
> * 
> streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
> * 
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
>  
> Would you like to remove these commented code lines?
> If so, I may help and open a pull request.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffi

2021-09-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417277#comment-17417277
 ] 

Luke Chen commented on KAFKA-13310:
---

[~RivenSun], thanks for your response. For your latest comment, I understand 
what you are trying to achieve, to update the offset metadata (to remove 
non-existed partitions), before go to the "possible" infinite commit offset 
loop before timeout. But unfortunately, it could still have possibility to 
cause the issue you reported, because there's still a race condition. Simply 
put, thread A tried to commit offsets, thread B tried to delete topics, we 
never know who will reach broker first, and which command will be processed 
first. So, even you update to the "latest" metadata before committing offset in 
thread A, it's still possible thread B comes and deletes the some topics before 
thread A committing offsets.

 

So, to achieve what you wanted, we need to `cleanUpConsumedOffsets` inside the 
while loop in the `commitOffsetsSync`. Each time there's an 
`UnknownTopicOrPartitionException` returned, we tried to update the metadata. 
And in the `cleanUpConsumedOffsets`, we need to make sure to get an up-to-date 
subscription there.

 

This is a way to fix this issue, of course. However, this way, it will make the 
commitOffsets method more complex, and might have other errors to handle 
(during request metadata update).

 

For me, I still prefer to control the commitOffset method via the timeout, and 
if the metadata is already updated by other threads (ex: topics deleted), this 
commit will fail with timeout. And the caller should do their own metadata 
update for next retry.

 

Let's see if there are other opinions from other experts. :)

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> ---
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group". See as below:
>  
> {code:java}
>  if (includeMetadataInTimeout) {
> // try to update assignment metadata BUT do not need to block on the 
> timer for join group
> updateAssignmentMetadataIfNeeded(timer, false);
> } else {
> while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), 
> true)) {
> log.warn("Still waiting for metadata");
> }
> }{code}
>  
>  
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to constru

[GitHub] [kafka] guozhangwang merged pull request #11329: KAFKA-13301; Optimized the interpretation of the relationship between 'request.timeout. ms' and 'max.poll.interval.ms' in the document.

2021-09-18 Thread GitBox


guozhangwang merged pull request #11329:
URL: https://github.com/apache/kafka/pull/11329


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13301) The relationship between request.timeout. ms and max.poll.interval.ms in the Consumer Configs is incorrect.

2021-09-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13301.
---
Fix Version/s: 3.1.0
   Resolution: Fixed

> The relationship between request.timeout. ms and max.poll.interval.ms in the 
> Consumer Configs is incorrect.
> ---
>
> Key: KAFKA-13301
> URL: https://issues.apache.org/jira/browse/KAFKA-13301
> Project: Kafka
>  Issue Type: Improvement
>Reporter: yangshengwei
>Priority: Trivial
> Fix For: 3.1.0
>
> Attachments: image-2021-09-15-15-37-25-561.png, 
> image-2021-09-15-15-39-00-179.png
>
>
> in Consumer Configs,The value of the configuration max.poll.interval.ms 
> always be larger than request.timeout.ms must . But here's what the official 
> document says:  The value of the configuration request.timeout.ms must always 
> be larger than max.poll.interval.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and traffi

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417285#comment-17417285
 ] 

RivenSun commented on KAFKA-13310:
--

 

Thank you [~showuon]  very much for your reply, I very much agree with your 
points of view above

I also considered what you said about a race condition,
So I suggest to modify the *two points* together


1. Try `cleanUpConsumedOffsets` before submitting the offer, maybe 90% of 
abnormal situations can be avoided. I didn't do it in commitOffsetsSync. As you 
said, that would make the infinite loop more complicated. We just need to try 
to clean up unKnownTopicPartitions before entering the loop

2. Pass the poll()'s timer set by the customer to 
commitOffsetsSync(allConsumedOffsets, timer). to avoid exceeding the timeout 
expected by the customer

I am also very happy to invite other experts to analyze this problem, thank you 
everyone for making Kafka more perfect:D

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> ---
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group". See as below:
>  
> {code:java}
>  if (includeMetadataInTimeout) {
> // try to update assignment metadata BUT do not need to block on the 
> timer for join group
> updateAssignmentMetadataIfNeeded(timer, false);
> } else {
> while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), 
> true)) {
> log.warn("Still waiting for metadata");
> }
> }{code}
>  
>  
> By tracing the code back layer by layer, it is found that the function of 
> this variable is to construct a time.timer(0L) and pass it back to the method 
> joinGroupIfNeeded (final Timer timer) in AbstractCoordinator. See as below:
> {code:java}
> // if not wait for join group, we would just use a timer of 0
>   if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
> // since we may use a different timer in the callee, we'd still need 
> // to update the original timer's current time after the call 
>   timer.update(time.milliseconds()); 
>   return false; 
> }
> {code}
>  But you will find that there is a submethod onJoinPrepare in the method 
> stack of joinGroupIfNeeded, and then there is a line of code in the 
> onJoinPrepare method
>  maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs)), 
> the value of rebalanceConfig.rebalance

[jira] [Comment Edited] (KAFKA-13310) KafkaConsumer cannot jump out of the poll method, and the consumer is blocked in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). Cpu and t

2021-09-18 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417285#comment-17417285
 ] 

RivenSun edited comment on KAFKA-13310 at 9/19/21, 6:05 AM:


 

Thank you [~showuon]  very much for your reply, I very much agree with your 
points of view above

I also considered what you said about a race condition,
 So I suggest to modify the *two points* together

1. Try `cleanUpConsumedOffsets` before submitting the offsets, maybe 90% of 
abnormal situations can be avoided.
 I didn't do it (`cleanUpConsumedOffsets`) *in commitOffsetsSync*. As you said, 
that would make the infinite loop more complicated. We just need to *try to* 
clean up unKnownTopicPartitions before entering the loop

2. Pass the poll()'s timer set by the customer to 
commitOffsetsSync(allConsumedOffsets, timer). to avoid *exceeding the timeout 
expected by the customer*

I am also very happy to invite other experts to analyze this problem, thank you 
everyone for making Kafka more perfect:D


was (Author: rivensun):
 

Thank you [~showuon]  very much for your reply, I very much agree with your 
points of view above

I also considered what you said about a race condition,
So I suggest to modify the *two points* together


1. Try `cleanUpConsumedOffsets` before submitting the offer, maybe 90% of 
abnormal situations can be avoided. I didn't do it in commitOffsetsSync. As you 
said, that would make the infinite loop more complicated. We just need to try 
to clean up unKnownTopicPartitions before entering the loop

2. Pass the poll()'s timer set by the customer to 
commitOffsetsSync(allConsumedOffsets, timer). to avoid exceeding the timeout 
expected by the customer

I am also very happy to invite other experts to analyze this problem, thank you 
everyone for making Kafka more perfect:D

> KafkaConsumer cannot jump out of the poll method, and the consumer is blocked 
> in the ConsumerCoordinator method maybeAutoCommitOffsetsSync(Timer timer). 
> Cpu and traffic of  Broker‘s side increase sharply
> ---
>
> Key: KAFKA-13310
> URL: https://issues.apache.org/jira/browse/KAFKA-13310
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.8.1
> Environment: prod
>Reporter: RivenSun
>Priority: Major
> Attachments: SecondDeleteConsumerLog.png, SecondDeleteDebugLog.png, 
> brokerCpu.png, brokerNetBytes.png, kafkaConsumerLog.png
>
>
> h2. Foreword
>       Because our consumers' consumption logic is sometimes heavier, we refer 
> to the configuration of Kafka stream 
> [https://kafka.apache.org/documentation/#upgrade_10201_notable]
>  Set max.poll.interval.ms to Integer.MAX_VALUE
>  Our consumers have adopted method : 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  
> h2. Recurrence of the problem scene
> operate steps are
>  (1) Test environment Kafka cluster: three brokers
>  (2) Topics conforming to regular expressions include rivenTest1, rivenTest2, 
> and rivenTest88
>  (3) Only one consumer is needed, group.id is "rivenReassign", 
> consumer.subscribe(Pattern.compile(".*riven.*"));
>  (4) At the beginning, the group status is stable, and everything is normal 
> for consumers, then I delete topic: rivenTest88
>  
> h2. Phenomenon
>       Problem phenomenon
>   (1) The consumer is blocked in the poll method, no longer consume any 
> messages, and the consumer log is always printing
>  [main] WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator-[Consumer 
> clientId=consumer-rivenReassign-1, groupId=rivenReassign] Offset commit 
> failed on partition rivenTest88-1 at offset 0: This server does not host this 
> topic-partition.
>  (2) The describe consumerGroup interface of Adminclient  has always timed 
> out, and the group status is no longer stable
>  (3) The cpu and traffic of the broker are *significantly increased*
>  
>  
> h2. Problem tracking
>    By analyzing the kafkaConsumer code, the version is 2.8.1.
>  I found that you introduced the waitForJoinGroup variable in the 
> updateAssignmentMetadataIfNeeded method. For the reason, I attached the 
> comment on the method: "try to update assignment metadata BUT do not need to 
> block on the timer for join group". See as below:
>  
> {code:java}
>  if (includeMetadataInTimeout) {
> // try to update assignment metadata BUT do not need to block on the 
> timer for join group
> updateAssignmentMetadataIfNeeded(timer, false);
> } else {
> while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), 
> true)) {
> log.warn("Still waiting for metadata");
> }
> }{code}
>  
>  
> B