[ https://issues.apache.org/jira/browse/KAFKA-5242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lukas Gemela updated KAFKA-5242: -------------------------------- Description: >From time to time, during relabance we are getting a lot of exceptions saying {code} org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the state directory: /app/db/clio/0_0 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [kafka-streams-0.10.2.0.jar!/:?] {code} (see attached logfile) By looking at the code looks like the some old tasks are never being closed and the lock is never released. Also, the backoff strategy in StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after 20 iterations it takes 6hours until the next attempt to start a task. I've noticed latest code contains check for rebalanceTimeoutMs, but that still does not solve the problem especially in case MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka streams just hangs up indefinitely. I would personally make that backoffstrategy a bit more configurable with a number of retries that if it exceed a configured value it propagates the exception as any other exception to custom client exception handler. (I can provide a patch) was: >From time to time, during relabance we are getting a lot of exceptions saying {code} org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the state directory: /app/db/clio/0_0 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) ~[kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-clients-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) [kafka-streams-0.10.2.0.jar!/:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) [kafka-streams-0.10.2.0.jar!/:?] {code} (see attached logfile) By looking at the code looks like the some old tasks are never being closed and the lock is never released. Also, the backoff strategy in StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after 20 iterations it takes 6hours until the next attempt to start a task. I've noticed latest code contains check for rebalanceTimeoutMs, but that still does not solve the problem especially in case MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. I would personally make that backoffstrategy a bit more configurable with a number of retries that if it exceed a configured value it propagates the exception as any other exception to custom client exception handler. (I can provide a patch) > state task directory locked forever > ----------------------------------- > > Key: KAFKA-5242 > URL: https://issues.apache.org/jira/browse/KAFKA-5242 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Lukas Gemela > Attachments: clio_170511.log > > > From time to time, during relabance we are getting a lot of exceptions saying > {code} > org.apache.kafka.streams.errors.LockException: task [0_0] Failed to lock the > state directory: /app/db/clio/0_0 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > [kafka-clients-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > [kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > {code} > (see attached logfile) > By looking at the code looks like the some old tasks are never being closed > and the lock is never released. > Also, the backoff strategy in > StreamThread$AbstractTaskCreator.retryWithBackoff can run endlessly - after > 20 iterations it takes 6hours until the next attempt to start a task. > I've noticed latest code contains check for rebalanceTimeoutMs, but that > still does not solve the problem especially in case > MAX_POLL_INTERVAL_MS_CONFIG is set to Integer.MAX_INT. at this stage kafka > streams just hangs up indefinitely. > I would personally make that backoffstrategy a bit more configurable with a > number of retries that if it exceed a configured value it propagates the > exception as any other exception to custom client exception handler. > (I can provide a patch) -- This message was sent by Atlassian JIRA (v6.3.15#6346)