Hi, After upgrading to 0.10.20.0 I got this:
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535) at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503) at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531) at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480) ... 10 more Which lead to this: 2017-02-22 20:12:16 ERROR StreamThread:505 - stream-thread [StreamThread-9] Failed while executing StreamTask 4_6 due to commit consumer offsets: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535) at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503) at org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531) at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480) at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) 2017-02-22 20:12:16 ERROR ConsumerCoordinator:400 - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group argyle-streams failed on partition revocation org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-9] failed to suspend stream tasks at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488) at org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) And an infinite loop of these retries: 2017-02-22 20:12:48 WARN StreamThread:1184 - Could not create task 3_0. Will retry. org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock the state directory: /kafka/1/kafka-streams/argyle-streams/3_0 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102) at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) Previously this would not retry and we had a chance to restart the process. After a manual restart processing resumed for everything excepts parts dealing with ktables, as if it couldn’t recover rocksdb files. What’s the recovery process for reading back rocksdb files and resuming processing exactly? Aside from setting better values for kafka consumer polling, what’s the best strategy for dealing with such failures? I’m worried that if such a things happens recovery doesn’t take place and we can’t resume processing unless we delete kafka streams state dir. Ara. ________________________________ This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Thank you in advance for your cooperation. ________________________________