[
https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021786#comment-16021786
]
Joao commented on KAFKA-5167:
-----------------------------
I just came across this very same exception, logs below, however I'm not
closing anything explicitly. I use one state store configured as follows:
{code}
StateStoreSupplier aggregationStore = Stores.create(storeTopic)
.withKeys(MyKey.SERDE)
.withValues(MyVal.SERDE)
.persistent()
.build();
{code}
My problem now is that my instance never recovered properly and is still
throwing this exception. The health checks say the stream is running, which to
some extent it is, but not very well. I happened to have caught this issue
because I'm monitoring the latency of the stream topology, which went from
~100ms total to 40s+.
Questions:
1. Is there a way to effectively check the health of the stream, accounting for
these cases?
2. Is there some configuration I can make in my topology to avoid/circumvent
this problem? Maybe try an in-memory store rather than RocksDB?
{code}
2017-05-23 16:06:38.933 ERROR 46 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5]
Failed while executing StreamTask 0_11 due to commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to poll() was
longer than the configured m
ax.poll.interval.ms, which typically implies that the poll loop is spending too
much time message processing. You can address this either by increasing the
session timeout or by reducing the maximum size of batches returned in poll()
with max.poll.records.
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:545)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnTasks(StreamThread.java:501)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:541)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:479)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.access$1300(StreamThread.java:69)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:252)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:400)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:342)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[kafka-streams-0.10.2.1.jar!/:na]
2017-05-23 16:06:38.933 INFO 46 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5]
Updating suspended tasks to contain active tasks [0_1, 0_6, 0_11]
2017-05-23 16:06:38.933 INFO 46 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5]
Removing all active tasks [0_1, 0_6, 0_11]
2017-05-23 16:06:38.933 INFO 46 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5]
Removing all standby tasks []
2017-05-23 16:06:38.933 INFO 46 --- [ StreamThread-5]
o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group
accountingKsUsageCollection_network
2017-05-23 16:06:39.094 INFO 46 --- [ StreamThread-4]
o.a.k.s.processor.internals.StreamTask : task [0_7] Initializing processor
nodes of the topology
2017-05-23 16:06:39.097 INFO 46 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4]
Creating active task 0_11 with assigned partitions [topicA-11, topicB-11]
2017-05-23 16:06:39.264 WARN 46 --- [ StreamThread-3]
o.a.k.s.p.internals.StreamThread : Could not create task 0_1. Will
retry.
org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the
state directory for task 0_1
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
~[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[kafka-clients-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
[kafka-streams-0.10.2.1.jar!/:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
[kafka-streams-0.10.2.1.jar!/:na]
{code}
> streams task gets stuck after re-balance due to LockException
> -------------------------------------------------------------
>
> Key: KAFKA-5167
> URL: https://issues.apache.org/jira/browse/KAFKA-5167
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0, 0.10.2.1
> Reporter: Narendra Kumar
> Assignee: Matthias J. Sax
> Fix For: 0.11.0.0, 0.10.2.2
>
> Attachments: BugTest.java, DebugTransformer.java, logs.txt
>
>
> During rebalance processor node's close() method gets called two times once
> from StreamThread.suspendTasksAndState() and once from
> StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed
> which I am closing in processor's close method. This instance's close method
> throws some exception if I call close more than once. Because of this
> exception, the Kafka streams does not attempt to close the statemanager ie.
> task.closeStateManager(true) is never called. When a task moves from one
> thread to another within same machine the task blocks trying to get lock on
> state directory which is still held by unclosed statemanager and keep
> throwing the below warning message:
> 2017-04-30 12:34:17 WARN StreamThread:1214 - Could not create task 0_1. Will
> retry.
> org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the
> state directory for task 0_1
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)