Hi Ara,

This usually means that one, or more, of your StreamThreads is taking a
long time during recovery or processing. So the rebalance times out and you
get another rebalance.
The second exception, i.e.,
2017-02-22 20:12:48 WARN  StreamThread:1184 - Could not create task 3_0.
Will retry.
org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock
the state directory: /kafka/1/kafka-streams/argyle-streams/3_0

Is because you have at least one thread that is still processing and hasn't
given up its tasks that have been revoked yet.
You probably need to get a thread dump to see what the threads are doing at
the time. As the first exception suggests, you may need to increase the
max.poll.interval.ms

Thanks,
Damian

On Thu, 23 Feb 2017 at 18:26 Ara Ebrahimi <ara.ebrah...@argyledata.com>
wrote:

> Hi,
>
> After upgrading to 0.10.20.0 I got this:
>
> Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot be completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
> ... 10 more
>
> Which lead to this:
>
> 2017-02-22 20:12:16 ERROR StreamThread:505 - stream-thread
> [StreamThread-9] Failed while executing StreamTask 4_6 due to commit
> consumer offsets:
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> 2017-02-22 20:12:16 ERROR ConsumerCoordinator:400 - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> argyle-streams failed on partition revocation
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-9] failed to suspend stream tasks
> at
> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>
> And an infinite loop of these retries:
>
> 2017-02-22 20:12:48 WARN  StreamThread:1184 - Could not create task 3_0.
> Will retry.
> org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock
> the state directory: /kafka/1/kafka-streams/argyle-streams/3_0
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>
> Previously this would not retry and we had a chance to restart the process.
>
> After a manual restart processing resumed for everything excepts parts
> dealing with ktables, as if it couldn’t recover rocksdb files. What’s the
> recovery process for reading back rocksdb files and resuming processing
> exactly?
>
> Aside from setting better values for kafka consumer polling, what’s the
> best strategy for dealing with such failures? I’m worried that if such a
> things happens recovery doesn’t take place and we can’t resume processing
> unless we delete kafka streams state dir.
>
> Ara.
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>

Reply via email to