Thanks Greg. I'll look into it more tomorrow. Just finding it difficult to
reproduce in a test.
Thanks for providing the sequence, gives me something to try and repo.
Appreciated.

Thanks,
Damian
On Wed, 5 Jul 2017 at 19:57, Greg Fodor <gfo...@gmail.com> wrote:

> Also, the sequence of events is:
>
> - Job starts, rebalance happens, things run along smoothly.
> - After 10 minutes (retrospectively) the cleanup task kicks on and removes
> some directories
> - Tasks immediately start failing when trying to flush their state stores
>
>
>
> On Wed, Jul 5, 2017 at 11:55 AM, Greg Fodor <gfo...@gmail.com> wrote:
>
> > The issue I am hitting is not the directory locking issues we've seen in
> > the past. The issue seems to be, as you mentioned, that the state dir is
> > getting deleted by the store cleanup process, but there are still tasks
> > running that are trying to flush the state store. It seems more than a
> > little scary given that right now it seems either a) there are tasks
> > running that should have been re-assigned or b) the cleanup job is
> removing
> > state directories for currently running + assigned tasks (perhaps during
> a
> > rebalance there is a race condition?) I'm guessing there's probably a
> more
> > benign explanation, but that is what it looks like right now.
> >
> > On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian....@gmail.com> wrote:
> >
> >> BTW - i'm trying to reproduce it, but not having much luck so far...
> >>
> >> On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian....@gmail.com> wrote:
> >>
> >> > Thans for the updates Greg. There were some minor changes around this
> in
> >> > 0.11.0 to make it less likely to happen, but we've only ever seen the
> >> > locking fail in the event of a rebalance. When everything is running
> >> state
> >> > dirs shouldn't be deleted if they are being used as the lock will
> fail.
> >> >
> >> >
> >> > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote:
> >> >
> >> >> I can report that setting state.cleanup.delay.ms to a very large
> value
> >> >> (effectively disabling it) works around the issue. It seems that the
> >> state
> >> >> store cleanup process can somehow get out ahead of another task that
> >> still
> >> >> thinks it should be writing to the state store/flushing it. In my
> test
> >> >> runs, this does not seem to be happening during a rebalancing event,
> >> but
> >> >> after the cluster is stable.
> >> >>
> >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com>
> wrote:
> >> >>
> >> >> > Upon another run, I see the same error occur during a rebalance, so
> >> >> either
> >> >> > my log was showing a rebalance or there is a shared underlying
> issue
> >> >> with
> >> >> > state stores.
> >> >> >
> >> >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com>
> >> wrote:
> >> >> >
> >> >> >> Also, I am on 0.10.2.1, so poll interval was already set to
> >> MAX_VALUE.
> >> >> >>
> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com>
> >> wrote:
> >> >> >>
> >> >> >>> I've nuked the nodes this happened on, but the job had been
> running
> >> >> for
> >> >> >>> about 5-10 minutes across 5 nodes before this happened. Does the
> >> log
> >> >> show a
> >> >> >>> rebalance was happening? It looks to me like the standby task was
> >> just
> >> >> >>> committing as part of normal operations.
> >> >> >>>
> >> >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com
> >
> >> >> wrote:
> >> >> >>>
> >> >> >>>> Hi Greg,
> >> >> >>>>
> >> >> >>>> Obviously a bit difficult to read the RocksDBException, but my
> >> guess
> >> >> is
> >> >> >>>> it
> >> >> >>>> is because the state directory gets deleted right before the
> flush
> >> >> >>>> happens:
> >> >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO
> >> >> [StreamThread-21:StateDirector
> >> >> >>>> y@213]
> >> >> >>>> - Deleting obsolete state directory 0_10 for task 0_10
> >> >> >>>>
> >> >> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070.
> >> >> >>>>
> >> >> >>>> It looks like your application is constantly rebalancing during
> >> store
> >> >> >>>> intialization, which may be the reason this bug comes about
> (there
> >> >> is a
> >> >> >>>> chance that the state dir lock is released so when the thread
> >> tries
> >> >> to
> >> >> >>>> removes the stale state directory it is able to get the lock).
> You
> >> >> >>>> probably
> >> >> >>>> want to configure `max.poll.interval.ms` to be a reasonably
> large
> >> >> >>>> value ( i
> >> >> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also
> >> try
> >> >> >>>> setting `state.cleanup.delay.ms` to a higher value (default is
> 10
> >> >> >>>> minutes),
> >> >> >>>> to try and avoid it happening during a rebalance (I know this
> >> isn't a
> >> >> >>>> fix,
> >> >> >>>> but will make it less likely to happen).
> >> >> >>>>
> >> >> >>>> Thanks,
> >> >> >>>> Damian
> >> >> >>>>
> >> >> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com>
> wrote:
> >> >> >>>>
> >> >> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to
> use
> >> >> Kafka
> >> >> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job
> >> that
> >> >> >>>> has 4
> >> >> >>>> > state stores and runs across a few hundred partitions, and as
> >> part
> >> >> of
> >> >> >>>> load
> >> >> >>>> > testing the job we are trying to reload our data out of kafka
> >> into
> >> >> a
> >> >> >>>> test
> >> >> >>>> > db. The result is we are able to load about 4M tuples and then
> >> this
> >> >> >>>> error
> >> >> >>>> > pops up on all of the stream nodes simultaneously. There are 4
> >> >> rocksdb
> >> >> >>>> > stores in question and there are lots of these errors which
> >> takes
> >> >> it
> >> >> >>>> down.
> >> >> >>>> > This bug *does* not seem to occur on 0.10.1.
> >> >> >>>> >
> >> >> >>>> > A similar error was mentioned here:
> >> >> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070
> >> >> >>>> >
> >> >> >>>> > Full log attached.
> >> >> >>>> >
> >> >> >>>> > org.apache.kafka.streams.errors.ProcessorStateException: task
> >> >> [0_10]
> >> >> >>>> > Failed to flush state store session-id-start-events
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
> >> >> >>>> anager.flush(ProcessorStateManager.java:337)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.processor.internals.StandbyTask.com
> >> >> >>>> mit(StandbyTask.java:94)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
> >> >> >>>> mmitOne(StreamThread.java:807)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co
> >> >> >>>> mmitAll(StreamThread.java:797)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ma
> >> >> >>>> ybeCommit(StreamThread.java:769)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
> >> >> >>>> nLoop(StreamThread.java:647)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru
> >> >> >>>> n(StreamThread.java:361)
> >> >> >>>> > Caused by: org.apache.kafka.streams.error
> >> s.ProcessorStateException:
> >> >> >>>> Error
> >> >> >>>> > while executing flush from store session-id-start-events
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
> >> >> >>>> nternal(RocksDBStore.java:354)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush(
> >> >> >>>> RocksDBStore.java:345)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A
> >> >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
> >> >> >>>> e$6.run(MeteredKeyValueStore.java:92)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI
> >> >> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor
> >> >> >>>> e.flush(MeteredKeyValueStore.java:186)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM
> >> >> >>>> anager.flush(ProcessorStateManager.java:335)
> >> >> >>>> > ... 6 more
> >> >> >>>> > Caused by: org.rocksdb.RocksDBException: v
> >> >> >>>> > at org.rocksdb.RocksDB.flush(Native Method)
> >> >> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> >> >> >>>> > at
> >> >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI
> >> >> >>>> nternal(RocksDBStore.java:352)
> >> >> >>>> > ... 13 more
> >> >> >>>> >
> >> >> >>>> >
> >> >> >>>>
> >> >> >>>
> >> >> >>>
> >> >> >>
> >> >> >
> >> >>
> >> >
> >>
> >
> >
>

Reply via email to