Hi,

Thanks for the reply.

Let me give you more information:

- we have a group by + aggregate. The hopping time window is 10 minutes but the 
maintainMs is 180 days (we’re trying to reprocess the entire data set of 
billions of records).

- Over time, after just a few hours, the aggregate slows down considerably. 
Initially it keeps up with the ingest rate just fine, but then after a few 
hours we see that a counter we have in the body of the aggregate block stops 
incrementing completely and resumes after a few minutes! It seems to me it’s 
trying to roll the rocksdb files and pauses the whole StreamThread? Is that 
possible? Is that the root cause of this issue? What’s the best way to monitor 
rocksdb rolling? Can’t find much in the LOG file. I don’t see any stalling.

- If above case is possible then what’s the solution? Much smaller maintainMs? 
Or tuning rocksdb? This is on SSD. Our records have windowed keys of 28 bytes 
and 200 bytes average value size.

Ara.

> On Feb 24, 2017, at 3:15 AM, Damian Guy <damian....@gmail.com> wrote:
>
> Hi Ara,
>
> This usually means that one, or more, of your StreamThreads is taking a
> long time during recovery or processing. So the rebalance times out and you
> get another rebalance.
> The second exception, i.e.,
> 2017-02-22 20:12:48 WARN  StreamThread:1184 - Could not create task 3_0.
> Will retry.
> org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock
> the state directory: /kafka/1/kafka-streams/argyle-streams/3_0
>
> Is because you have at least one thread that is still processing and hasn't
> given up its tasks that have been revoked yet.
> You probably need to get a thread dump to see what the threads are doing at
> the time. As the first exception suggests, you may need to increase the
> max.poll.interval.ms
>
> Thanks,
> Damian
>
> On Thu, 23 Feb 2017 at 18:26 Ara Ebrahimi <ara.ebrah...@argyledata.com>
> wrote:
>
>> Hi,
>>
>> After upgrading to 0.10.20.0 I got this:
>>
>> Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit
>> cannot be completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between subsequent
>> calls to poll() was longer than the configured max.poll.interval.ms,
>> which typically implies that the poll loop is spending too much time
>> message processing. You can address this either by increasing the session
>> timeout or by reducing the maximum size of batches returned in poll() with
>> max.poll.records.
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
>> ... 10 more
>>
>> Which lead to this:
>>
>> 2017-02-22 20:12:16 ERROR StreamThread:505 - stream-thread
>> [StreamThread-9] Failed while executing StreamTask 4_6 due to commit
>> consumer offsets:
>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
>> completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between subsequent
>> calls to poll() was longer than the configured max.poll.interval.ms,
>> which typically implies that the poll loop is spending too much time
>> message processing. You can address this either by increasing the session
>> timeout or by reducing the maximum size of batches returned in poll() with
>> max.poll.records.
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:577)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1125)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:535)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:503)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.commitOffsets(StreamThread.java:531)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:480)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>> 2017-02-22 20:12:16 ERROR ConsumerCoordinator:400 - User provided listener
>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
>> argyle-streams failed on partition revocation
>> org.apache.kafka.streams.errors.StreamsException: stream-thread
>> [StreamThread-9] failed to suspend stream tasks
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.access$1200(StreamThread.java:69)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsRevoked(StreamThread.java:259)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:396)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:329)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>>
>> And an infinite loop of these retries:
>>
>> 2017-02-22 20:12:48 WARN  StreamThread:1184 - Could not create task 3_0.
>> Will retry.
>> org.apache.kafka.streams.errors.LockException: task [3_0] Failed to lock
>> the state directory: /kafka/1/kafka-streams/argyle-streams/3_0
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
>> at
>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
>>
>> Previously this would not retry and we had a chance to restart the process.
>>
>> After a manual restart processing resumed for everything excepts parts
>> dealing with ktables, as if it couldn’t recover rocksdb files. What’s the
>> recovery process for reading back rocksdb files and resuming processing
>> exactly?
>>
>> Aside from setting better values for kafka consumer polling, what’s the
>> best strategy for dealing with such failures? I’m worried that if such a
>> things happens recovery doesn’t take place and we can’t resume processing
>> unless we delete kafka streams state dir.
>>
>> Ara.
>>
>>
>>
>> ________________________________
>>
>> This message is for the designated recipient only and may contain
>> privileged, proprietary, or otherwise confidential information. If you have
>> received it in error, please notify the sender immediately and delete the
>> original. Any other use of the e-mail by you is prohibited. Thank you in
>> advance for your cooperation.
>>
>> ________________________________
>>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Thank you in advance for your 
> cooperation.
>
> ________________________________




________________________________

This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.

________________________________

Reply via email to