BTW - i'm trying to reproduce it, but not having much luck so far...

On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian....@gmail.com> wrote:

> Thans for the updates Greg. There were some minor changes around this in
> 0.11.0 to make it less likely to happen, but we've only ever seen the
> locking fail in the event of a rebalance. When everything is running state
> dirs shouldn't be deleted if they are being used as the lock will fail.
>
>
> On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote:
>
>> I can report that setting state.cleanup.delay.ms to a very large value
>> (effectively disabling it) works around the issue. It seems that the state
>> store cleanup process can somehow get out ahead of another task that still
>> thinks it should be writing to the state store/flushing it. In my test
>> runs, this does not seem to be happening during a rebalancing event, but
>> after the cluster is stable.
>>
>> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com> wrote:
>>
>> > Upon another run, I see the same error occur during a rebalance, so
>> either
>> > my log was showing a rebalance or there is a shared underlying issue
>> with
>> > state stores.
>> >
>> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> wrote:
>> >
>> >> Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE.
>> >>
>> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> wrote:
>> >>
>> >>> I've nuked the nodes this happened on, but the job had been running
>> for
>> >>> about 5-10 minutes across 5 nodes before this happened. Does the log
>> show a
>> >>> rebalance was happening? It looks to me like the standby task was just
>> >>> committing as part of normal operations.
>> >>>
>> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com>
>> wrote:
>> >>>
>> >>>> Hi Greg,
>> >>>>
>> >>>> Obviously a bit difficult to read the RocksDBException, but my guess
>> is
>> >>>> it
>> >>>> is because the state directory gets deleted right before the flush
>> >>>> happens:
>> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO
>> [StreamThread-21:StateDirector
>> >>>> y@213]
>> >>>> - Deleting obsolete state directory 0_10 for task 0_10
>> >>>>
>> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070.
>> >>>>
>> >>>> It looks like your application is constantly rebalancing during store
>> >>>> intialization, which may be the reason this bug comes about (there
>> is a
>> >>>> chance that the state dir lock is released so when the thread tries
>> to
>> >>>> removes the stale state directory it is able to get the lock). You
>> >>>> probably
>> >>>> want to configure `max.poll.interval.ms` to be a reasonably large
>> >>>> value ( i
>> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
>> >>>> setting `state.cleanup.delay.ms` to a higher value (default is 10
>> >>>> minutes),
>> >>>> to try and avoid it happening during a rebalance (I know this isn't a
>> >>>> fix,
>> >>>> but will make it less likely to happen).
>> >>>>
>> >>>> Thanks,
>> >>>> Damian
>> >>>>
>> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote:
>> >>>>
>> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use
>> Kafka
>> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that
>> >>>> has 4
>> >>>> > state stores and runs across a few hundred partitions, and as part
>> of
>> >>>> load
>> >>>> > testing the job we are trying to reload our data out of kafka into
>> a
>> >>>> test
>> >>>> > db. The result is we are able to load about 4M tuples and then this
>> >>>> error
>> >>>> > pops up on all of the stream nodes simultaneously. There are 4
>> rocksdb
>> >>>> > stores in question and there are lots of these errors which takes
>> it
>> >>>> down.
>> >>>> > This bug *does* not seem to occur on 0.10.1.
>> >>>> >
>> >>>> > A similar error was mentioned here:
>> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070
>> >>>> >
>> >>>> > Full log attached.
>> >>>> >
>> >>>> > org.apache.kafka.streams.errors.ProcessorStateException: task
>> [0_10]
>> >>>> > Failed to flush state store session-id-start-events
>> >>>> > at
>> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> >>>> anager.flush(ProcessorStateManager.java:337)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.processor.internals.StandbyTask.com
>> >>>> mit(StandbyTask.java:94)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
>> >>>> mmitOne(StreamThread.java:807)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
>> >>>> mmitAll(StreamThread.java:797)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ma
>> >>>> ybeCommit(StreamThread.java:769)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
>> >>>> nLoop(StreamThread.java:647)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
>> >>>> n(StreamThread.java:361)
>> >>>> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
>> >>>> Error
>> >>>> > while executing flush from store session-id-start-events
>> >>>> > at
>> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>> >>>> nternal(RocksDBStore.java:354)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush(
>> >>>> RocksDBStore.java:345)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
>> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>> >>>> e$6.run(MeteredKeyValueStore.java:92)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
>> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>> >>>> e.flush(MeteredKeyValueStore.java:186)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
>> >>>> anager.flush(ProcessorStateManager.java:335)
>> >>>> > ... 6 more
>> >>>> > Caused by: org.rocksdb.RocksDBException: v
>> >>>> > at org.rocksdb.RocksDB.flush(Native Method)
>> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
>> >>>> > at
>> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
>> >>>> nternal(RocksDBStore.java:352)
>> >>>> > ... 13 more
>> >>>> >
>> >>>> >
>> >>>>
>> >>>
>> >>>
>> >>
>> >
>>
>

Reply via email to