Ravi Gupta created KAFKA-18355:
----------------------------------

             Summary: Stream thread blocks indefinitely for acquiring state 
directory lock
                 Key: KAFKA-18355
                 URL: https://issues.apache.org/jira/browse/KAFKA-18355
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.8.1
            Reporter: Ravi Gupta


We are running Kafka streams based application in production and have noticed 
couple of times our lag on source topic partition start increasing.

Based on investigation, we found the below happening:
 # Thread responsible for the partition task gets Authentication exception ( 
MSK IAM authentication gives the transient exception) while producing record in 
the Sink

{code:java}
{
"level":"ERROR",
"logger_name":"org.apache.kafka.clients.NetworkClient",
"message":"[Producer 
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer,
 
transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3]
 Connection to node 1 
(b-1.xxxxxxx.yyyyyy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.xxxx:yyyy) 
failed authentication due to: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
Failed to find AWS IAM Credentials [Caused by 
com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 
401; Error Code: null; Request ID: null; Proxy: null)]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.",
"thread_name":"kafka-producer-network-thread | 
xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer",
"time":"2024-12-26T07:40:45.113067247Z"
} {code}

 # In some cases, the system recovers when the next record is polled and the 
Sink Node ( RecordCollectorImpl) throws the exception from the last message 
while processing
 # However, in couple of cases the following logs appears, approximately 5 
minutes after the producer failure. ( {_}N{_}{_}o additional log statement to 
understand why thread stopped polling, however it seems heartbeat thread got 
the same exception as producer){_}.

{code:java}
{
"level":"WARN",
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator",
"message":"[Consumer 
clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer,
 groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has 
expired. This means the time between subsequent calls to poll() was longer than 
the configured max.poll.interval.ms, which typically implies that the poll loop 
is spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records.",
"thread_name":"kafka-coordinator-heartbeat-thread | 
lioprodapsevc-papse-lall-lio-step-executor_lio-se",
"time":"2024-12-26T07:45:43.286428901Z"
} {code}

 # In such cases, the partition gets assigned to a new thread ( Thread 5), 
however the new thread keep throwing the following exception:

{code:java}
{
"level":"INFO",
"logger_name":"org.apache.kafka.streams.processor.internals.TaskManager",
"message":"stream-thread 
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 Encountered lock exception. Reattempting locking the state in the next 
iteration.",
"stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread 
[xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5]
 task [8_0] Failed to lock the state directory for task 8_0\n\tat 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat
 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat
 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n",
"thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5",
"time":"2024-12-26T07:50:53.904374419Z"
} {code}

 # We are using exception handler, however, in these failure cases our 
exception handler is not called for both producer and consumer exception. 
However in some authentication exception during consume/produce we see the 
handler being called.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to