[ https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ravi Gupta updated KAFKA-18355: ------------------------------- Description: We are running Kafka streams based application in production and have noticed couple of times our lag on source topic partition start increasing. Based on investigation, we found the below happening: * Thread responsible for the partition task gets Authentication exception ( MSK IAM authentication gives the transient exception) while producing record in the Sink {code:java} { "level":"ERROR", "logger_name":"org.apache.kafka.clients.NetworkClient", "message":"[Producer clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer, transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3] Connection to node 1 (b-1.xxxxxxx.yyyyyy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.xxxx:yyyy) failed authentication due to: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.", "thread_name":"kafka-producer-network-thread | xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer", "time":"2024-12-26T07:40:45.113067247Z" } {code} * In some cases, the system recovers when the next record is polled and the Sink Node ( RecordCollectorImpl) throws the exception from the last message while processing * However, in couple of cases the following logs appears, approximately 5 minutes after the producer failure. ( {_}N{_}{_}o additional log statement to understand why thread stopped polling, however it seems heartbeat thread got the same exception as producer){_}. {code:java} { "level":"WARN", "logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator", "message":"[Consumer clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer, groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.", "thread_name":"kafka-coordinator-heartbeat-thread | xxxxx-xxxx-lall-lio-step-executor_lio-se", "time":"2024-12-26T07:45:43.286428901Z" } {code} * In such cases, the partition gets assigned to a new thread ( Thread 5), however the new thread keep throwing the following exception: {code:java} { "level":"INFO", "logger_name":"org.apache.kafka.streams.processor.internals.TaskManager", "message":"stream-thread [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] Encountered lock exception. Reattempting locking the state in the next iteration.", "stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] task [8_0] Failed to lock the state directory for task 8_0\n\tat org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n", "thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5", "time":"2024-12-26T07:50:53.904374419Z" } {code} * We are using exception handler, however, in these failure cases our exception handler is not called for both producer and consumer exception. However in some authentication exception during consume/produce we see the handler being called. was: We are running Kafka streams based application in production and have noticed couple of times our lag on source topic partition start increasing. Based on investigation, we found the below happening: * Thread responsible for the partition task gets Authentication exception ( MSK IAM authentication gives the transient exception) while producing record in the Sink {code:java} { "level":"ERROR", "logger_name":"org.apache.kafka.clients.NetworkClient", "message":"[Producer clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer, transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3] Connection to node 1 (b-1.xxxxxxx.yyyyyy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.xxxx:yyyy) failed authentication due to: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.", "thread_name":"kafka-producer-network-thread | xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer", "time":"2024-12-26T07:40:45.113067247Z" } {code} * In some cases, the system recovers when the next record is polled and the Sink Node ( RecordCollectorImpl) throws the exception from the last message while processing * However, in couple of cases the following logs appears, approximately 5 minutes after the producer failure. ( {_}N{_}{_}o additional log statement to understand why thread stopped polling, however it seems heartbeat thread got the same exception as producer){_}. {code:java} { "level":"WARN", "logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator", "message":"[Consumer clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer, groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.", "thread_name":"kafka-coordinator-heartbeat-thread | lioprodapsevc-papse-lall-lio-step-executor_lio-se", "time":"2024-12-26T07:45:43.286428901Z" } {code} * In such cases, the partition gets assigned to a new thread ( Thread 5), however the new thread keep throwing the following exception: {code:java} { "level":"INFO", "logger_name":"org.apache.kafka.streams.processor.internals.TaskManager", "message":"stream-thread [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] Encountered lock exception. Reattempting locking the state in the next iteration.", "stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] task [8_0] Failed to lock the state directory for task 8_0\n\tat org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n", "thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5", "time":"2024-12-26T07:50:53.904374419Z" } {code} * We are using exception handler, however, in these failure cases our exception handler is not called for both producer and consumer exception. However in some authentication exception during consume/produce we see the handler being called. > Stream thread blocks indefinitely for acquiring state directory lock > -------------------------------------------------------------------- > > Key: KAFKA-18355 > URL: https://issues.apache.org/jira/browse/KAFKA-18355 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.8.1 > Reporter: Ravi Gupta > Priority: Major > > We are running Kafka streams based application in production and have noticed > couple of times our lag on source topic partition start increasing. > Based on investigation, we found the below happening: > * Thread responsible for the partition task gets Authentication exception ( > MSK IAM authentication gives the transient exception) while producing record > in the Sink > {code:java} > { > "level":"ERROR", > "logger_name":"org.apache.kafka.clients.NetworkClient", > "message":"[Producer > clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer, > > transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3] > Connection to node 1 > (b-1.xxxxxxx.yyyyyy.c2.kafka.ap-southeast-1.amazonaws.com/xx.xx.xxx.xxxx:yyyy) > failed authentication due to: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > Failed to find AWS IAM Credentials [Caused by > com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status > Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when > evaluating SASL token received from the Kafka Broker. Kafka Client will go to > AUTHENTICATION_FAILED state.", > "thread_name":"kafka-producer-network-thread | > xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer", > "time":"2024-12-26T07:40:45.113067247Z" > } {code} > * In some cases, the system recovers when the next record is polled and the > Sink Node ( RecordCollectorImpl) throws the exception from the last message > while processing > * However, in couple of cases the following logs appears, approximately 5 > minutes after the producer failure. ( {_}N{_}{_}o additional log statement to > understand why thread stopped polling, however it seems heartbeat thread got > the same exception as producer){_}. > {code:java} > { > "level":"WARN", > "logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator", > "message":"[Consumer > clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer, > groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has > expired. This means the time between subsequent calls to poll() was longer > than the configured max.poll.interval.ms, which typically implies that the > poll loop is spending too much time processing messages. You can address this > either by increasing max.poll.interval.ms or by reducing the maximum size of > batches returned in poll() with max.poll.records.", > "thread_name":"kafka-coordinator-heartbeat-thread | > xxxxx-xxxx-lall-lio-step-executor_lio-se", > "time":"2024-12-26T07:45:43.286428901Z" > } {code} > * In such cases, the partition gets assigned to a new thread ( Thread 5), > however the new thread keep throwing the following exception: > {code:java} > { > "level":"INFO", > "logger_name":"org.apache.kafka.streams.processor.internals.TaskManager", > "message":"stream-thread > [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] > Encountered lock exception. Reattempting locking the state in the next > iteration.", > "stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread > [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] > task [8_0] Failed to lock the state directory for task 8_0\n\tat > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat > > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n", > "thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5", > "time":"2024-12-26T07:50:53.904374419Z" > } {code} > * We are using exception handler, however, in these failure cases our > exception handler is not called for both producer and consumer exception. > However in some authentication exception during consume/produce we see the > handler being called. > -- This message was sent by Atlassian Jira (v8.20.10#820010)