The issue I am hitting is not the directory locking issues we've seen in the past. The issue seems to be, as you mentioned, that the state dir is getting deleted by the store cleanup process, but there are still tasks running that are trying to flush the state store. It seems more than a little scary given that right now it seems either a) there are tasks running that should have been re-assigned or b) the cleanup job is removing state directories for currently running + assigned tasks (perhaps during a rebalance there is a race condition?) I'm guessing there's probably a more benign explanation, but that is what it looks like right now.
On Wed, Jul 5, 2017 at 7:00 AM, Damian Guy <damian....@gmail.com> wrote: > BTW - i'm trying to reproduce it, but not having much luck so far... > > On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian....@gmail.com> wrote: > > > Thans for the updates Greg. There were some minor changes around this in > > 0.11.0 to make it less likely to happen, but we've only ever seen the > > locking fail in the event of a rebalance. When everything is running > state > > dirs shouldn't be deleted if they are being used as the lock will fail. > > > > > > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote: > > > >> I can report that setting state.cleanup.delay.ms to a very large value > >> (effectively disabling it) works around the issue. It seems that the > state > >> store cleanup process can somehow get out ahead of another task that > still > >> thinks it should be writing to the state store/flushing it. In my test > >> runs, this does not seem to be happening during a rebalancing event, but > >> after the cluster is stable. > >> > >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com> wrote: > >> > >> > Upon another run, I see the same error occur during a rebalance, so > >> either > >> > my log was showing a rebalance or there is a shared underlying issue > >> with > >> > state stores. > >> > > >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> wrote: > >> > > >> >> Also, I am on 0.10.2.1, so poll interval was already set to > MAX_VALUE. > >> >> > >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> > wrote: > >> >> > >> >>> I've nuked the nodes this happened on, but the job had been running > >> for > >> >>> about 5-10 minutes across 5 nodes before this happened. Does the log > >> show a > >> >>> rebalance was happening? It looks to me like the standby task was > just > >> >>> committing as part of normal operations. > >> >>> > >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com> > >> wrote: > >> >>> > >> >>>> Hi Greg, > >> >>>> > >> >>>> Obviously a bit difficult to read the RocksDBException, but my > guess > >> is > >> >>>> it > >> >>>> is because the state directory gets deleted right before the flush > >> >>>> happens: > >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO > >> [StreamThread-21:StateDirector > >> >>>> y@213] > >> >>>> - Deleting obsolete state directory 0_10 for task 0_10 > >> >>>> > >> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070. > >> >>>> > >> >>>> It looks like your application is constantly rebalancing during > store > >> >>>> intialization, which may be the reason this bug comes about (there > >> is a > >> >>>> chance that the state dir lock is released so when the thread tries > >> to > >> >>>> removes the stale state directory it is able to get the lock). You > >> >>>> probably > >> >>>> want to configure `max.poll.interval.ms` to be a reasonably large > >> >>>> value ( i > >> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also > try > >> >>>> setting `state.cleanup.delay.ms` to a higher value (default is 10 > >> >>>> minutes), > >> >>>> to try and avoid it happening during a rebalance (I know this > isn't a > >> >>>> fix, > >> >>>> but will make it less likely to happen). > >> >>>> > >> >>>> Thanks, > >> >>>> Damian > >> >>>> > >> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote: > >> >>>> > >> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use > >> Kafka > >> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job > that > >> >>>> has 4 > >> >>>> > state stores and runs across a few hundred partitions, and as > part > >> of > >> >>>> load > >> >>>> > testing the job we are trying to reload our data out of kafka > into > >> a > >> >>>> test > >> >>>> > db. The result is we are able to load about 4M tuples and then > this > >> >>>> error > >> >>>> > pops up on all of the stream nodes simultaneously. There are 4 > >> rocksdb > >> >>>> > stores in question and there are lots of these errors which takes > >> it > >> >>>> down. > >> >>>> > This bug *does* not seem to occur on 0.10.1. > >> >>>> > > >> >>>> > A similar error was mentioned here: > >> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070 > >> >>>> > > >> >>>> > Full log attached. > >> >>>> > > >> >>>> > org.apache.kafka.streams.errors.ProcessorStateException: task > >> [0_10] > >> >>>> > Failed to flush state store session-id-start-events > >> >>>> > at > >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM > >> >>>> anager.flush(ProcessorStateManager.java:337) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.processor.internals.StandbyTask.com > >> >>>> mit(StandbyTask.java:94) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co > >> >>>> mmitOne(StreamThread.java:807) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co > >> >>>> mmitAll(StreamThread.java:797) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ma > >> >>>> ybeCommit(StreamThread.java:769) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru > >> >>>> nLoop(StreamThread.java:647) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru > >> >>>> n(StreamThread.java:361) > >> >>>> > Caused by: org.apache.kafka.streams.errors. > ProcessorStateException: > >> >>>> Error > >> >>>> > while executing flush from store session-id-start-events > >> >>>> > at > >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI > >> >>>> nternal(RocksDBStore.java:354) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush( > >> >>>> RocksDBStore.java:345) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A > >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A > >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor > >> >>>> e$6.run(MeteredKeyValueStore.java:92) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI > >> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor > >> >>>> e.flush(MeteredKeyValueStore.java:186) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM > >> >>>> anager.flush(ProcessorStateManager.java:335) > >> >>>> > ... 6 more > >> >>>> > Caused by: org.rocksdb.RocksDBException: v > >> >>>> > at org.rocksdb.RocksDB.flush(Native Method) > >> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) > >> >>>> > at > >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI > >> >>>> nternal(RocksDBStore.java:352) > >> >>>> > ... 13 more > >> >>>> > > >> >>>> > > >> >>>> > >> >>> > >> >>> > >> >> > >> > > >> > > >