BTW - i'm trying to reproduce it, but not having much luck so far... On Wed, 5 Jul 2017 at 09:27 Damian Guy <damian....@gmail.com> wrote:
> Thans for the updates Greg. There were some minor changes around this in > 0.11.0 to make it less likely to happen, but we've only ever seen the > locking fail in the event of a rebalance. When everything is running state > dirs shouldn't be deleted if they are being used as the lock will fail. > > > On Wed, 5 Jul 2017 at 08:15 Greg Fodor <gfo...@gmail.com> wrote: > >> I can report that setting state.cleanup.delay.ms to a very large value >> (effectively disabling it) works around the issue. It seems that the state >> store cleanup process can somehow get out ahead of another task that still >> thinks it should be writing to the state store/flushing it. In my test >> runs, this does not seem to be happening during a rebalancing event, but >> after the cluster is stable. >> >> On Tue, Jul 4, 2017 at 12:29 PM, Greg Fodor <gfo...@gmail.com> wrote: >> >> > Upon another run, I see the same error occur during a rebalance, so >> either >> > my log was showing a rebalance or there is a shared underlying issue >> with >> > state stores. >> > >> > On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> wrote: >> > >> >> Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE. >> >> >> >> On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> wrote: >> >> >> >>> I've nuked the nodes this happened on, but the job had been running >> for >> >>> about 5-10 minutes across 5 nodes before this happened. Does the log >> show a >> >>> rebalance was happening? It looks to me like the standby task was just >> >>> committing as part of normal operations. >> >>> >> >>> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com> >> wrote: >> >>> >> >>>> Hi Greg, >> >>>> >> >>>> Obviously a bit difficult to read the RocksDBException, but my guess >> is >> >>>> it >> >>>> is because the state directory gets deleted right before the flush >> >>>> happens: >> >>>> 2017-07-04 10:54:46,829 [myid:] - INFO >> [StreamThread-21:StateDirector >> >>>> y@213] >> >>>> - Deleting obsolete state directory 0_10 for task 0_10 >> >>>> >> >>>> Yes it looks like it is possibly the same bug as KAFKA-5070. >> >>>> >> >>>> It looks like your application is constantly rebalancing during store >> >>>> intialization, which may be the reason this bug comes about (there >> is a >> >>>> chance that the state dir lock is released so when the thread tries >> to >> >>>> removes the stale state directory it is able to get the lock). You >> >>>> probably >> >>>> want to configure `max.poll.interval.ms` to be a reasonably large >> >>>> value ( i >> >>>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try >> >>>> setting `state.cleanup.delay.ms` to a higher value (default is 10 >> >>>> minutes), >> >>>> to try and avoid it happening during a rebalance (I know this isn't a >> >>>> fix, >> >>>> but will make it less likely to happen). >> >>>> >> >>>> Thanks, >> >>>> Damian >> >>>> >> >>>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote: >> >>>> >> >>>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use >> Kafka >> >>>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that >> >>>> has 4 >> >>>> > state stores and runs across a few hundred partitions, and as part >> of >> >>>> load >> >>>> > testing the job we are trying to reload our data out of kafka into >> a >> >>>> test >> >>>> > db. The result is we are able to load about 4M tuples and then this >> >>>> error >> >>>> > pops up on all of the stream nodes simultaneously. There are 4 >> rocksdb >> >>>> > stores in question and there are lots of these errors which takes >> it >> >>>> down. >> >>>> > This bug *does* not seem to occur on 0.10.1. >> >>>> > >> >>>> > A similar error was mentioned here: >> >>>> > https://issues.apache.org/jira/browse/KAFKA-5070 >> >>>> > >> >>>> > Full log attached. >> >>>> > >> >>>> > org.apache.kafka.streams.errors.ProcessorStateException: task >> [0_10] >> >>>> > Failed to flush state store session-id-start-events >> >>>> > at >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> >>>> anager.flush(ProcessorStateManager.java:337) >> >>>> > at >> >>>> > org.apache.kafka.streams.processor.internals.StandbyTask.com >> >>>> mit(StandbyTask.java:94) >> >>>> > at >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co >> >>>> mmitOne(StreamThread.java:807) >> >>>> > at >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.co >> >>>> mmitAll(StreamThread.java:797) >> >>>> > at >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ma >> >>>> ybeCommit(StreamThread.java:769) >> >>>> > at >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru >> >>>> nLoop(StreamThread.java:647) >> >>>> > at >> >>>> > org.apache.kafka.streams.processor.internals.StreamThread.ru >> >>>> n(StreamThread.java:361) >> >>>> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: >> >>>> Error >> >>>> > while executing flush from store session-id-start-events >> >>>> > at >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI >> >>>> nternal(RocksDBStore.java:354) >> >>>> > at >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush( >> >>>> RocksDBStore.java:345) >> >>>> > at >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >> >>>> > at >> >>>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A >> >>>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >> >>>> > at >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor >> >>>> e$6.run(MeteredKeyValueStore.java:92) >> >>>> > at >> >>>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI >> >>>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) >> >>>> > at >> >>>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor >> >>>> e.flush(MeteredKeyValueStore.java:186) >> >>>> > at >> >>>> > org.apache.kafka.streams.processor.internals.ProcessorStateM >> >>>> anager.flush(ProcessorStateManager.java:335) >> >>>> > ... 6 more >> >>>> > Caused by: org.rocksdb.RocksDBException: v >> >>>> > at org.rocksdb.RocksDB.flush(Native Method) >> >>>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) >> >>>> > at >> >>>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI >> >>>> nternal(RocksDBStore.java:352) >> >>>> > ... 13 more >> >>>> > >> >>>> > >> >>>> >> >>> >> >>> >> >> >> > >> >