[ 
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021786#comment-16021786
 ] 

Joao commented on KAFKA-5167:
-----------------------------

I just came across this very same exception, logs below, however I'm not 
closing anything explicitly. I use one state store configured as follows:

{code}
StateStoreSupplier aggregationStore = Stores.create(storeTopic)
                                                    .withKeys(MyKey.SERDE)
                                                    .withValues(MyVal.SERDE)
                                                    .persistent()
                                                    .build();
{code}

My problem now is that my instance never recovered properly and is still 
throwing this exception. The health checks say the stream is running, which to 
some extent it is, but not very well. I happened to have caught this issue 
because I'm monitoring the latency of the stream topology, which went from 
~100ms total to 40s+.

Questions:
1. Is there a way to effectively check the health of the stream, accounting for 
these cases?
2. Is there some configuration I can make in my topology to avoid/circumvent 
this problem? Maybe try an in-memory store rather than RocksDB?

{code}
2017-05-23 16:06:38.933 ERROR 46 --- [ StreamThread-5] 
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5] 
Failed while executing StreamTask 0_11 due to commit consumer offsets:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured m
ax.poll.interval.ms, which typically implies that the poll loop is spending too 
much time message processing. You can address this either by increasing the 
session timeout or by reducing the maximum size of batches returned in poll() 
with max.poll.records.
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
 ~[kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:545)
 ~[kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.performOnTasks(StreamThread.java:501)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:541)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:479)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$1300(StreamThread.java:69)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:252)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:400)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:342)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
 [kafka-streams-0.10.2.1.jar!/:na]

2017-05-23 16:06:38.933  INFO 46 --- [ StreamThread-5] 
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5] 
Updating suspended tasks to contain active tasks [0_1, 0_6, 0_11]
2017-05-23 16:06:38.933  INFO 46 --- [ StreamThread-5] 
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5] 
Removing all active tasks [0_1, 0_6, 0_11]
2017-05-23 16:06:38.933  INFO 46 --- [ StreamThread-5] 
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5] 
Removing all standby tasks []
2017-05-23 16:06:38.933  INFO 46 --- [ StreamThread-5] 
o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group 
accountingKsUsageCollection_network
2017-05-23 16:06:39.094  INFO 46 --- [ StreamThread-4] 
o.a.k.s.processor.internals.StreamTask   : task [0_7] Initializing processor 
nodes of the topology
2017-05-23 16:06:39.097  INFO 46 --- [ StreamThread-4] 
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4] 
Creating active task 0_11 with assigned partitions [topicA-11, topicB-11]
2017-05-23 16:06:39.264  WARN 46 --- [ StreamThread-3] 
o.a.k.s.p.internals.StreamThread         : Could not create task 0_1. Will 
retry.

org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
state directory for task 0_1
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
 ~[kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
 ~[kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
 ~[kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
 ~[kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
 ~[kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
 [kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 
[kafka-clients-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
 [kafka-streams-0.10.2.1.jar!/:na]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
 [kafka-streams-0.10.2.1.jar!/:na]
{code}

> streams task gets stuck after re-balance due to LockException
> -------------------------------------------------------------
>
>                 Key: KAFKA-5167
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5167
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.10.2.1
>            Reporter: Narendra Kumar
>            Assignee: Matthias J. Sax
>             Fix For: 0.11.0.0, 0.10.2.2
>
>         Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once 
> from StreamThread.suspendTasksAndState() and once from 
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed 
> which I am closing in processor's close method. This instance's close method 
> throws some exception if I call close more than once. Because of this 
> exception, the Kafka streams does not attempt to close the statemanager ie.  
> task.closeStateManager(true) is never called. When a task moves from one 
> thread to another within same machine the task blocks trying to get lock on 
> state directory which is still held by unclosed statemanager and keep 
> throwing the below warning message:
> 2017-04-30 12:34:17 WARN  StreamThread:1214 - Could not create task 0_1. Will 
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the 
> state directory for task 0_1
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to