Hi,

After upgrading to 0.10.20.0 I got this:

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit 
cannot be completed since the group has already rebalanced and assigned the 
partitions to another member. This means that the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically 
implies that the poll loop is spending too much time message processing. You 
can address this either by increasing the session timeout or by reducing the 
maximum size of batches returned in poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
at 
org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
at 
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
at 
org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
at 
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
... 10 more

Which lead to this:

2017-02-22 20:12:16 ERROR StreamThread:505 - stream-thread [StreamThread-9] 
Failed while executing StreamTask 4_6 due to commit consumer offsets:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured max.poll.interval.ms, which typically implies that 
the poll loop is spending too much time message processing. You can address 
this either by increasing the session timeout or by reducing the maximum size 
of batches returned in poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
at 
org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
at 
org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
at 
org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
at 
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
2017-02-22 20:12:16 ERROR ConsumerCoordinator:400 - User provided listener 
org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
argyle-streams failed on partition revocation
org.apache.kafka.streams.errors.StreamsException: stream-thread 
[StreamThread-9] failed to suspend stream tasks
at 
org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

And an infinite loop of these retries:

2017-02-22 20:12:48 WARN  StreamThread:1184 - Could not create task 3_0. Will 
retry.
org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock the 
state directory: /kafka/1/kafka-streams/argyle-streams/3_0
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
at 
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
at 
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
at 
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
at 
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

Previously this would not retry and we had a chance to restart the process.

After a manual restart processing resumed for everything excepts parts dealing 
with ktables, as if it couldn’t recover rocksdb files. What’s the recovery 
process for reading back rocksdb files and resuming processing exactly?

Aside from setting better values for kafka consumer polling, what’s the best 
strategy for dealing with such failures? I’m worried that if such a things 
happens recovery doesn’t take place and we can’t resume processing unless we 
delete kafka streams state dir.

Ara.



________________________________

This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.

________________________________

Reply via email to