[ 
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Narendra Kumar updated KAFKA-5167:
----------------------------------
     Attachment: logs.txt
    Description: 
During rebalance processor node's close() method gets called two times once 
from StreamThread.suspendTasksAndState() and once from 
StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed which 
I am closing in processor's close method. This instance's close method throws 
some exception if I call close more than once. Because of this exception, the 
Kafka streams does not attempt to close the statemanager ie.  
task.closeStateManager(true) is never called. When a task moves from one thread 
to another within same machine the task blocks trying to get lock on state 
directory which is still held by unclosed statemanager and keep throwing the 
following exception:

2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
retry.
org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
state directory for task 0_1
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)


  was:
During rebalance processor node's close() method gets called two times. I have 
some instance filed which I am closing in processor's close method. This 
instance's close method throws some exception if I call close more than once. 
Because of this exception, the Kafka streams does not attempt to close the 
statemanager ie.  task.closeStateManager(true) is never called. When a task 
moves from one thread to another within same machine the task blocks trying to 
get lock on state directory which is still held by unclosed statemanager and 
keep throwing the following exception:

2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
retry.
org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
state directory for task 0_1
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



> streams task gets stuck after re-balance due to LockException
> -------------------------------------------------------------
>
>                 Key: KAFKA-5167
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5167
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Narendra Kumar
>         Attachments: logs.txt
>
>
> During rebalance processor node's close() method gets called two times once 
> from StreamThread.suspendTasksAndState() and once from 
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed 
> which I am closing in processor's close method. This instance's close method 
> throws some exception if I call close more than once. Because of this 
> exception, the Kafka streams does not attempt to close the statemanager ie.  
> task.closeStateManager(true) is never called. When a task moves from one 
> thread to another within same machine the task blocks trying to get lock on 
> state directory which is still held by unclosed statemanager and keep 
> throwing the following exception:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to