[ https://issues.apache.org/jira/browse/KAFKA-18355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17909575#comment-17909575 ]
Bruno Cadonna commented on KAFKA-18355: --------------------------------------- [~ravigupta] Thanks for the ticket! This ticket contains multiple effects whose cause might be unrelated. First of all, we need to understand why your Streams application exceeds the consumer poll timeout. The consumer poll timeout is 5 min by default. You can increase it by setting {{max.poll.interval.ms}} to a higher value. The lock exception happens because after your application exceeds the consumer poll timeout, a rebalance is triggered. The lock exception is not really an error. Note, that it is on INFO level and not on ERROR level. Streams will retry to acquire the lock and it will NOT re-throw the lock exception. That is also the reason your exception handler is not called. As far as we know the lock exception is harmless and it will not block the application indefinitely and it does not increase the lag. Except we overlooked a bug. The increased log messages regarding the lock exception are due to the state updater that is enabled by default in 3.8. The retries to get the loop run in a quite tight loop flooding the logs with that INFO message about the non-acquired lock. We have seen the lock exception often in our test infrastructure and each time it turned out to be harmless and the error was somewhere else. Since the increased logging is really annoying as you discovered, we introduced a backoff mechanism for retrying to acquire the lock on {{trunk}}: https://github.com/apache/kafka/pull/17209 This will be released with 4.0. We want to cherry-pick this change also to 3.9 and 3.8 patch releases. > Stream thread blocks indefinitely for acquiring state directory lock > -------------------------------------------------------------------- > > Key: KAFKA-18355 > URL: https://issues.apache.org/jira/browse/KAFKA-18355 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.8.1 > Reporter: Ravi Gupta > Priority: Major > > We are running Kafka streams based application in production and have noticed > couple of timesĀ {*}lag on source topic partition start increasing{*}. > Based on investigation, we found the below happening: > * Thread responsible for the partition task gets Authentication exception ( > MSK IAM authentication gives the transient exception) while producing record > in the Sink > {code:java} > { > "level":"ERROR", > "logger_name":"org.apache.kafka.clients.NetworkClient", > "message":"[Producer > clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer, > > transactionalId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-3] > Connection to node 1 > (b-1.xxxxxxx.yyyyyy.c2.kafka.xx-yyyyy.amazonaws.com/xx.xx.xxx.xxxx:yyyy) > failed authentication due to: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > Failed to find AWS IAM Credentials [Caused by > com.amazonaws.AmazonServiceException: Unauthorized (Service: null; Status > Code: 401; Error Code: null; Request ID: null; Proxy: null)]) occurred when > evaluating SASL token received from the Kafka Broker. Kafka Client will go to > AUTHENTICATION_FAILED state.", > "thread_name":"kafka-producer-network-thread | > xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-producer", > "time":"2024-12-26T07:40:45.113067247Z" > } {code} > * In some cases, the system recovers when the next record is polled and the > Sink Node ( RecordCollectorImpl) throws the exception from the last message > while processing > * However, in couple of cases the following logs appears, approximately 5 > minutes after the producer failure. ( {_}N{_}{_}o additional log statement to > understand why thread stopped polling, however it seems heartbeat thread got > the same exception as producer){_}. > {code:java} > { > "level":"WARN", > "logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator", > "message":"[Consumer > clientId=xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-3-consumer, > groupId=xxxxx-xxxx-lall-lio-step-executor_lio-se] consumer poll timeout has > expired. This means the time between subsequent calls to poll() was longer > than the configured max.poll.interval.ms, which typically implies that the > poll loop is spending too much time processing messages. You can address this > either by increasing max.poll.interval.ms or by reducing the maximum size of > batches returned in poll() with max.poll.records.", > "thread_name":"kafka-coordinator-heartbeat-thread | > xxxxx-xxxx-lall-lio-step-executor_lio-se", > "time":"2024-12-26T07:45:43.286428901Z" > } {code} > * In such cases, the partition gets assigned to a new thread ( Thread 5), > however the new thread keep throwing the following exception: > {code:java} > { > "level":"INFO", > "logger_name":"org.apache.kafka.streams.processor.internals.TaskManager", > "message":"stream-thread > [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] > Encountered lock exception. Reattempting locking the state in the next > iteration.", > "stack_trace":"org.apache.kafka.streams.errors.LockException: stream-thread > [xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5] > task [8_0] Failed to lock the state directory for task 8_0\n\tat > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)\n\tat > > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1010)\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:997)\n\tat > > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)\n\tat > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)\n", > "thread_name":"xxxxx-xxxx-lall-lio-step-executor_lio-se-d0ee0299-9fd5-4fd0-8a42-cd49f49de845-StreamThread-5", > "time":"2024-12-26T07:50:53.904374419Z" > } {code} > * We are using exception handler, however, in these failure cases our > exception handler is not called for both producer and consumer exception. > However in some authentication exception during consume/produce we see the > handler being called. > It seems that old thread didn't clean up its state: as the producer failures > are cleaned up when processing next event ( which never happened due to > consumer exception). Neither did consumer failure tried to release the lock. -- This message was sent by Atlassian Jira (v8.20.10#820010)