[ https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021786#comment-16021786 ]
Joao commented on KAFKA-5167: ----------------------------- I just came across this very same exception, logs below, however I'm not closing anything explicitly. I use one state store configured as follows: {code} StateStoreSupplier aggregationStore = Stores.create(storeTopic) .withKeys(MyKey.SERDE) .withValues(MyVal.SERDE) .persistent() .build(); {code} My problem now is that my instance never recovered properly and is still throwing this exception. The health checks say the stream is running, which to some extent it is, but not very well. I happened to have caught this issue because I'm monitoring the latency of the stream topology, which went from ~100ms total to 40s+. Questions: 1. Is there a way to effectively check the health of the stream, accounting for these cases? 2. Is there some configuration I can make in my topology to avoid/circumvent this problem? Maybe try an in-memory store rather than RocksDB? {code} 2017-05-23 16:06:38.933 ERROR 46 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Failed while executing StreamTask 0_11 due to commit consumer offsets: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured m ax.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:545) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.performOnTasks(StreamThread.java:501) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:541) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:479) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.access$1300(StreamThread.java:69) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:252) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:400) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:342) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [kafka-streams-0.10.2.1.jar!/:na] 2017-05-23 16:06:38.933 INFO 46 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Updating suspended tasks to contain active tasks [0_1, 0_6, 0_11] 2017-05-23 16:06:38.933 INFO 46 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Removing all active tasks [0_1, 0_6, 0_11] 2017-05-23 16:06:38.933 INFO 46 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Removing all standby tasks [] 2017-05-23 16:06:38.933 INFO 46 --- [ StreamThread-5] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group accountingKsUsageCollection_network 2017-05-23 16:06:39.094 INFO 46 --- [ StreamThread-4] o.a.k.s.processor.internals.StreamTask : task [0_7] Initializing processor nodes of the topology 2017-05-23 16:06:39.097 INFO 46 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Creating active task 0_11 with assigned partitions [topicA-11, topicB-11] 2017-05-23 16:06:39.264 WARN 46 --- [ StreamThread-3] o.a.k.s.p.internals.StreamThread : Could not create task 0_1. Will retry. org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the state directory for task 0_1 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) ~[kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) [kafka-clients-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) [kafka-streams-0.10.2.1.jar!/:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) [kafka-streams-0.10.2.1.jar!/:na] {code} > streams task gets stuck after re-balance due to LockException > ------------------------------------------------------------- > > Key: KAFKA-5167 > URL: https://issues.apache.org/jira/browse/KAFKA-5167 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0, 0.10.2.1 > Reporter: Narendra Kumar > Assignee: Matthias J. Sax > Fix For: 0.11.0.0, 0.10.2.2 > > Attachments: BugTest.java, DebugTransformer.java, logs.txt > > > During rebalance processor node's close() method gets called two times once > from StreamThread.suspendTasksAndState() and once from > StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed > which I am closing in processor's close method. This instance's close method > throws some exception if I call close more than once. Because of this > exception, the Kafka streams does not attempt to close the statemanager ie. > task.closeStateManager(true) is never called. When a task moves from one > thread to another within same machine the task blocks trying to get lock on > state directory which is still held by unclosed statemanager and keep > throwing the below warning message: > 2017-04-30 12:34:17 WARN StreamThread:1214 - Could not create task 0_1. Will > retry. > org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the > state directory for task 0_1 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100) > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) -- This message was sent by Atlassian JIRA (v6.3.15#6346)