[
https://issues.apache.org/jira/browse/KAFKA-20253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Marko Å trukelj updated KAFKA-20253:
-----------------------------------
Description:
Description:
A consumer can enter a tight spin loop, pinning a CPU core, if it encounters a
re-authentication failure, followed by a successful re-authentication when
using SASL_OAUTHBEARER authentication mechanism. This can happen due to
temporary unavailability of the authorization server. It can be a short-lived
unavailability that happens during re-login, when the client needs to obtain a
new access token, or shortly after, when the broker may need the authorization
server to validate the token.
An idle consumer (assigned 0 partitions), when a consumer group has more
consumers than topic partitions, is much more likely to experience this issue.
In the ClassicKafkaConsumer the issue shows as 100% CPU utilization by the
client application process. In the AsyncKafkaConsumer (KIP-848), the
application main thread is spared, but the background
kafka-consumer-network-thread enters a similar loop, with application process
utilizing ~40-50% CPU.
Steps to Reproduce:
A reproducer with instructions is available at:
[GitHub|https://github.com/mstruk/kafka-consumer-reproducer]
Actual Behavior:
The idle consumer (sometimes the working consumer, sometimes both) enters a
busy-wait loop after failed re-authentication followed by a successful
re-authentication (after the authorization server is restored). The spin loop
mostly continues indefinitely for the idle consumer. It seems to sometimes
recover for non-idle consumer.
Expected Behavior:
The consumer should apply some amount of CPU sleep at all times. The internal
state after re-authentication failure and recovery should never be such that
there is a busy-wait loop.
Technical Analysis & Root Cause:
Through remote debugging and thread dumps, the issue traces back to what
appears to be an unhandled state where the internal heartbeat timers expire but
are never reset due to the network/auth failure, forcing a 0ms timeout on the
selector.
Here is an example stack trace:
{code:java}
"main" #1 [3] prio=5 os_prio=0 cpu=174838.97ms elapsed=378.30s
tid=0x0000ffffaea028d0 nid=3 runnable [0x0000ffffad52e000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPoll.wait([email protected]/Native Method)
at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/Unknown
Source)
at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/Unknown
Source)
- locked <0x00000007187a42e0> (a sun.nio.ch.Util$2)
- locked <0x00000007187a41c8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.selectNow([email protected]/Unknown Source)
<--- Due to timeout==0 selectNow() is called - no sleep
at org.apache.kafka.common.network.Selector.select(Selector.java:878)
at org.apache.kafka.common.network.Selector.poll(Selector.java:470)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:645)
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L645)
<--- timeout==0
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:282)
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L282)
calculated pollTimeout is always 0
timer.remainingMs() == 0
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:714)
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:645)
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L645)
<--- This is the poll() loop, running and looping for 1 second before
returning
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:624)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:899)
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L899)
<--- At this point poll(Duration) is invoked with 1 second duration
at
io.strimzi.examples.consumer.GenericExampleConsumer.main(GenericExampleConsumer.java:115)
{code}
The KafkaConsumer.poll() is invoked with 1s duration, which puts a time
constraint on ClassicKafkaConsumer.poll() logic where multiple concerns are
handled on the calling thread inside a loop that keeps looping until the time
constraint of 1s is reached. When the 100% CPU condition persists this loop
never sleeps due to the calculated timeout for NetworkClient.poll() always
being 0.
It looks like the following is happening (this is my interpretation):
The ClassicKafkaConsumer.poll() method asks coordinator.timeToNextPoll() for
the sleep duration. This evaluates Heartbeat.timeToNextHeartbeat(long). Because
the heartbeat cannot be sent (due to the auth failure), a successful response
is never received. The heartbeatTimer.deadlineMs falls entirely into the past
(in my debugging, it was expired by days as I left the 100% CPU state persist
for days). Because Timer's deadlineMs < currentTimeMs, it constantly returns 0.
The main thread passes 0 to NetworkClient.poll(), which results in
SelectorImpl.selectNow() call, which returns immediately without sleep.
The same heartbeatTimer, Selector logic is used in AsyncKafkaConsumer resulting
in the same inconsistent internal state, which manifests slightly differently
in terms of threads CPU consumption, but essentially the same condition.
Workaround:
One workaround for this issue is to carefully handle the
AuthenticationException such that the current KafkaConsumer should be closed
(KafkaConsumer.close()) and a new instance created for the application to
continue. A fresh KafkaConsumer comes with a fresh consistent internal state,
so there is no issue any more.
While the workaround sounds easy there are messaging frameworks that abstract
away Kafka Clients API in order to be pluggable and support various messaging
libraries in a common API way - Quarkus + SmallRye for example. It is generally
the policy of such frameworks to let the client application handle exceptions.
But they do not necessarily make it easy for the client application to
conditionally recreate the underlying library's client object. If such
recreation is absolutely necessary for the production level deployment, users
are condemned to avoid simple best practices code patterns and use complex
extra code that is specific to the underlying messaging library, and also error
prone. Therefore, the proper way to address the issue would be within Kafka
Clients library itself.
was:
Description:
A consumer can enter a tight spin loop, pinning a CPU core, if it encounters a
re-authentication failure, followed by a successful re-authentication when
using SASL_OAUTHBEARER authentication mechanism. This can happen due to
temporary unavailability of the authorization server. It can be a short-lived
unavailability that happens during re-login, when the client needs to obtain a
new access token, or shortly after, when the broker may need the authorization
server to validate the token.
An idle consumer (assigned 0 partitions), when a consumer group has more
consumers than topic partitions, is much more likely to experience this issue.
In the ClassicKafkaConsumer the issue shows as 100% CPU utilization by the
client application process. In the AsyncKafkaConsumer (KIP-848), the
application main thread is spared, but the background
kafka-consumer-network-thread enters a similar loop, with application process
utilizing ~40-50% CPU.
Steps to Reproduce:
A reproducer with instructions is available at:
[[GitHub|https://github.com/mstruk/kafka-consumer-reproducer]|https://github.com/mstruk/kafka-consumer-reproducer]
Actual Behavior:
The idle consumer (sometimes the working consumer, sometimes both) enters a
busy-wait loop after failed re-authentication followed by a successful
re-authentication (after the authorization server is restored). The spin loop
mostly continues indefinitely for the idle consumer. It seems to sometimes
recover for non-idle consumer.
Expected Behavior:
The consumer should apply some amount of CPU sleep at all times. The internal
state after re-authentication failure and recovery should never be such that
there is a busy-wait loop.
Technical Analysis & Root Cause:
Through remote debugging and thread dumps, the issue traces back to what
appears to be an unhandled state where the internal heartbeat timers expire but
are never reset due to the network/auth failure, forcing a 0ms timeout on the
selector.
Here is an example stack trace:
{code:java}
"main" #1 [3] prio=5 os_prio=0 cpu=174838.97ms elapsed=378.30s
tid=0x0000ffffaea028d0 nid=3 runnable [0x0000ffffad52e000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPoll.wait([email protected]/Native Method)
at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/Unknown
Source)
at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/Unknown
Source)
- locked <0x00000007187a42e0> (a sun.nio.ch.Util$2)
- locked <0x00000007187a41c8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.selectNow([email protected]/Unknown Source)
<--- Due to timeout==0 selectNow() is called - no sleep
at org.apache.kafka.common.network.Selector.select(Selector.java:878)
at org.apache.kafka.common.network.Selector.poll(Selector.java:470)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:645)
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L645)
<--- timeout==0
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:282)
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L282)
calculated pollTimeout is always 0
timer.remainingMs() == 0
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:714)
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:645)
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L645)
<--- This is the poll() loop, running and looping for 1 second before
returning
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:624)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:899)
(https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L899)
<--- At this point poll(Duration) is invoked with 1 second duration
at
io.strimzi.examples.consumer.GenericExampleConsumer.main(GenericExampleConsumer.java:115)
{code}
The KafkaConsumer.poll() is invoked with 1s duration, which puts a time
constraint on ClassicKafkaConsumer.poll() logic where multiple concerns are
handled on the calling thread inside a loop that keeps looping until the time
constraint of 1s is reached. When the 100% CPU condition persists this loop
never sleeps due to the calculated timeout for NetworkClient.poll() always
being 0.
It looks like the following is happening (this is my interpretation):
The ClassicKafkaConsumer.poll() method asks coordinator.timeToNextPoll() for
the sleep duration. This evaluates Heartbeat.timeToNextHeartbeat(long). Because
the heartbeat cannot be sent (due to the auth failure), a successful response
is never received. The heartbeatTimer.deadlineMs falls entirely into the past
(in my debugging, it was expired by days as I left the 100% CPU state persist
for days). Because Timer's deadlineMs < currentTimeMs, it constantly returns 0.
The main thread passes 0 to NetworkClient.poll(), which results in
SelectorImpl.selectNow() call, which returns immediately without sleep.
The same heartbeatTimer, Selector logic is used in AsyncKafkaConsumer resulting
in the same inconsistent internal state, which manifests slightly differently
in terms of threads CPU consumption, but essentially the same condition.
Workaround:
One workaround for this issue is to carefully handle the
AuthenticationException such that the current KafkaConsumer should be closed
(KafkaConsumer.close()) and a new instance created for the application to
continue. A fresh KafkaConsumer comes with a fresh consistent internal state,
so there is no issue any more.
While the workaround sounds easy there are messaging frameworks that abstract
away Kafka Clients API in order to be pluggable and support various messaging
libraries in a common API way - Quarkus + SmallRye for example. It is generally
the policy of such frameworks to let the client application handle exceptions.
But they do not necessarily make it easy for the client application to
conditionally recreate the underlying library's client object. If such
recreation is absolutely necessary for the production level deployment, users
are condemned to avoid simple best practices code patterns and use complex
extra code that is specific to the underlying messaging library, and also error
prone. Therefore, the proper way to address the issue would be within Kafka
Clients library itself.
> High CPU loop on consumer after failed re-authentication
> --------------------------------------------------------
>
> Key: KAFKA-20253
> URL: https://issues.apache.org/jira/browse/KAFKA-20253
> Project: Kafka
> Issue Type: Bug
> Components: clients, security
> Affects Versions: 4.0.0, 4.2.0
> Environment: Kafka Client Version: 4.2.0, it can be reproduced with
> 4.0.0 as well. Did not try other versions.
> Reporter: Marko Å trukelj
> Priority: Major
>
> Description:
> A consumer can enter a tight spin loop, pinning a CPU core, if it encounters
> a re-authentication failure, followed by a successful re-authentication when
> using SASL_OAUTHBEARER authentication mechanism. This can happen due to
> temporary unavailability of the authorization server. It can be a short-lived
> unavailability that happens during re-login, when the client needs to obtain
> a new access token, or shortly after, when the broker may need the
> authorization server to validate the token.
> An idle consumer (assigned 0 partitions), when a consumer group has more
> consumers than topic partitions, is much more likely to experience this issue.
> In the ClassicKafkaConsumer the issue shows as 100% CPU utilization by the
> client application process. In the AsyncKafkaConsumer (KIP-848), the
> application main thread is spared, but the background
> kafka-consumer-network-thread enters a similar loop, with application process
> utilizing ~40-50% CPU.
> Steps to Reproduce:
> A reproducer with instructions is available at:
> [GitHub|https://github.com/mstruk/kafka-consumer-reproducer]
> Actual Behavior:
> The idle consumer (sometimes the working consumer, sometimes both) enters a
> busy-wait loop after failed re-authentication followed by a successful
> re-authentication (after the authorization server is restored). The spin loop
> mostly continues indefinitely for the idle consumer. It seems to sometimes
> recover for non-idle consumer.
> Expected Behavior:
> The consumer should apply some amount of CPU sleep at all times. The internal
> state after re-authentication failure and recovery should never be such that
> there is a busy-wait loop.
> Technical Analysis & Root Cause:
> Through remote debugging and thread dumps, the issue traces back to what
> appears to be an unhandled state where the internal heartbeat timers expire
> but are never reset due to the network/auth failure, forcing a 0ms timeout on
> the selector.
> Here is an example stack trace:
> {code:java}
> "main" #1 [3] prio=5 os_prio=0 cpu=174838.97ms elapsed=378.30s
> tid=0x0000ffffaea028d0 nid=3 runnable [0x0000ffffad52e000]
> java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPoll.wait([email protected]/Native Method)
> at sun.nio.ch.EPollSelectorImpl.doSelect([email protected]/Unknown
> Source)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect([email protected]/Unknown
> Source)
> - locked <0x00000007187a42e0> (a sun.nio.ch.Util$2)
> - locked <0x00000007187a41c8> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.selectNow([email protected]/Unknown Source)
> <--- Due to timeout==0 selectNow() is called - no sleep
> at org.apache.kafka.common.network.Selector.select(Selector.java:878)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:470)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:645)
>
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L645)
> <--- timeout==0
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:282)
>
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L282)
> calculated pollTimeout is always 0
> timer.remainingMs() == 0
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.pollForFetches(ClassicKafkaConsumer.java:714)
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:645)
>
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java#L645)
> <--- This is the poll() loop, running and looping for 1 second before
> returning
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:624)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:899)
>
> (https://github.com/apache/kafka/blob/4.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L899)
> <--- At this point poll(Duration) is invoked with 1 second duration
> at
> io.strimzi.examples.consumer.GenericExampleConsumer.main(GenericExampleConsumer.java:115)
> {code}
> The KafkaConsumer.poll() is invoked with 1s duration, which puts a time
> constraint on ClassicKafkaConsumer.poll() logic where multiple concerns are
> handled on the calling thread inside a loop that keeps looping until the time
> constraint of 1s is reached. When the 100% CPU condition persists this loop
> never sleeps due to the calculated timeout for NetworkClient.poll() always
> being 0.
> It looks like the following is happening (this is my interpretation):
> The ClassicKafkaConsumer.poll() method asks coordinator.timeToNextPoll() for
> the sleep duration. This evaluates Heartbeat.timeToNextHeartbeat(long).
> Because the heartbeat cannot be sent (due to the auth failure), a successful
> response is never received. The heartbeatTimer.deadlineMs falls entirely into
> the past (in my debugging, it was expired by days as I left the 100% CPU
> state persist for days). Because Timer's deadlineMs < currentTimeMs, it
> constantly returns 0. The main thread passes 0 to NetworkClient.poll(), which
> results in SelectorImpl.selectNow() call, which returns immediately without
> sleep.
> The same heartbeatTimer, Selector logic is used in AsyncKafkaConsumer
> resulting in the same inconsistent internal state, which manifests slightly
> differently in terms of threads CPU consumption, but essentially the same
> condition.
> Workaround:
> One workaround for this issue is to carefully handle the
> AuthenticationException such that the current KafkaConsumer should be closed
> (KafkaConsumer.close()) and a new instance created for the application to
> continue. A fresh KafkaConsumer comes with a fresh consistent internal state,
> so there is no issue any more.
> While the workaround sounds easy there are messaging frameworks that abstract
> away Kafka Clients API in order to be pluggable and support various messaging
> libraries in a common API way - Quarkus + SmallRye for example. It is
> generally the policy of such frameworks to let the client application handle
> exceptions. But they do not necessarily make it easy for the client
> application to conditionally recreate the underlying library's client object.
> If such recreation is absolutely necessary for the production level
> deployment, users are condemned to avoid simple best practices code patterns
> and use complex extra code that is specific to the underlying messaging
> library, and also error prone. Therefore, the proper way to address the issue
> would be within Kafka Clients library itself.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)