[ 
https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ravi Gupta updated KAFKA-18355:
-------------------------------
    Description: 
We are running Kafka streams based application in production and have noticed 
couple of times {*}lag on source topic partition start increasing{*}.

Based on investigation, we found the below happening:
 * Thread responsible for the partition task gets Authentication exception ( 
MSK IAM authentication gives the transient exception) while producing record in 
the Sink

{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer 
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
 
transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
 Connection to node 1 
(b-1.xxxxxxx.yyyyyy.c2.kafka.xx-yyyyy.amazonaws.com/xx.xx.xxx.xxxx:yyyy) failed 
authentication due to: An error: (java.security.PrivilegedActionException: 
javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused 
by com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status 
Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread | 
xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}
 * In some cases, the system recovers when the next record is polled and the 
Sink Node ( RecordCollectorImpl) throws the exception from the last message 
while processing
 * However, in couple of cases the following logs appears, approximately 5 
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
understand why thread stopped polling, however it seems heartbeat thread got 
the same exception as producer){_}.

{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer 
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
 groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has 
expired. This means the time between subsequent calls to poll() was longer than 
the configured max.poll.interval.ms, which typically implies that the poll loop 
is spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread | 
xxxxx-xxxx-lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}
 * In such cases, the partition gets assigned to a new thread ( Thread 5), 
however the new thread keep throwing the following exception:

{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread 
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 Encountered lock exception. Reattempting locking the state in the next 
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 task [8_0] Failed to lock the state directory for task 8_0\n\tat 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}
 * We are using exception handler, however, in these failure cases our 
exception handler is not called for both producer and consumer exception. 
However in some authentication exception during consume/produce we see the 
handler being called.

It seems that old thread didn't clean up its state because the producer failure 
is cleaned up when processing next event which never happened due to consumer 
exception. Neither did consumer failure tried to release the lock.

  was:
We are running Kafka streams based application in production and have noticed 
couple of times \{*}lag on source topic partition start increasing{*}.

Based on investigation, we found the below happening:
 * Thread responsible for the partition task gets Authentication exception ( 
MSK IAM authentication gives the transient exception) while producing record in 
the Sink

{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer 
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
 
transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
 Connection to node 1 
(b-1.xxxxxxx.yyyyyy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.xxxx:yyyy) 
failed authentication due to: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Failed to find AWS IAM Credentials [Caused by 
com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 
401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread | 
xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}
 * In some cases, the system recovers when the next record is polled and the 
Sink Node ( RecordCollectorImpl) throws the exception from the last message 
while processing
 * However, in couple of cases the following logs appears, approximately 5 
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
understand why thread stopped polling, however it seems heartbeat thread got 
the same exception as producer){_}.

{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer 
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
 groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has 
expired. This means the time between subsequent calls to poll() was longer than 
the configured max.poll.interval.ms, which typically implies that the poll loop 
is spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread | 
xxxxx-xxxx-lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}
 * In such cases, the partition gets assigned to a new thread ( Thread 5), 
however the new thread keep throwing the following exception:

{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread 
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 Encountered lock exception. Reattempting locking the state in the next 
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 task [8_0] Failed to lock the state directory for task 8_0\n\tat 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}
 * We are using exception handler, however, in these failure cases our 
exception handler is not called for both producer and consumer exception. 
However in some authentication exception during consume/produce we see the 
handler being called.

It seems that old thread didn't clean up its state because the producer failure 
is cleaned up when processing next event which never happened due to consumer 
exception. Neither did consumer failure tried to release the lock.


> Stream thread blocks indefinitely for acquiring state directory lock
> --------------------------------------------------------------------
>
>                 Key: KAFKA-18355
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18355
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.8.1
>            Reporter: Ravi Gupta
>            Priority: Major
>
> We are running Kafka streams based application in production and have noticed 
> couple of times {*}lag on source topic partition start increasing{*}.
> Based on investigation, we found the below happening:
>  * Thread responsible for the partition task gets Authentication exception ( 
> MSK IAM authentication gives the transient exception) while producing record 
> in the Sink
> {code:java}
> {
> "level":"ERROR",
> "logger_name":"org.apache.kafka.clients.NetworkClient",
> "message":"[Producer 
> clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
>  
> transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
>  Connection to node 1 
> (b-1.xxxxxxx.yyyyyy.c2.kafka.xx-yyyyy.amazonaws.com/xx.xx.xxx.xxxx:yyyy) 
> failed authentication due to: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> Failed to find AWS IAM Credentials [Caused by 
> com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status 
> Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
> evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
> AUTHENTICATION_FAILED state.",
> "thread_name":"kafka-producer-network-thread | 
> xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
> "time":"2024-12-26T07:40:45.113067247Z"
> } {code}
>  * In some cases, the system recovers when the next record is polled and the 
> Sink Node ( RecordCollectorImpl) throws the exception from the last message 
> while processing
>  * However, in couple of cases the following logs appears, approximately 5 
> minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
> understand why thread stopped polling, however it seems heartbeat thread got 
> the same exception as producer){_}.
> {code:java}
> {
> "level":"WARN",
> "logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
> "message":"[Consumer 
> clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
>  groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has 
> expired. This means the time between subsequent calls to poll() was longer 
> than the configured max.poll.interval.ms, which typically implies that the 
> poll loop is spending too much time processing messages. You can address this 
> either by increasing max.poll.interval.ms or by reducing the maximum size of 
> batches returned in poll() with max.poll.records.",
> "thread_name":"kafka-coordinator-heartbeat-thread | 
> xxxxx-xxxx-lall-lio-step-executor_lio-se",
> "time":"2024-12-26T07:45:43.286428901Z"
> } {code}
>  * In such cases, the partition gets assigned to a new thread ( Thread 5), 
> however the new thread keep throwing the following exception:
> {code:java}
> {
> "level":"INFO",
> "logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
> "message":"stream-thread 
> [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
>  Encountered lock exception. Reattempting locking the state in the next 
> iteration.",
> "stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
> [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
>  task [8_0] Failed to lock the state directory for task 8_0\n\tat 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
>  
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
>  
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
>  
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
> "thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
> "time":"2024-12-26T07:50:53.904374419Z"
> } {code}
>  * We are using exception handler, however, in these failure cases our 
> exception handler is not called for both producer and consumer exception. 
> However in some authentication exception during consume/produce we see the 
> handler being called.
> It seems that old thread didn't clean up its state because the producer 
> failure is cleaned up when processing next event which never happened due to 
> consumer exception. Neither did consumer failure tried to release the lock.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to