I've nuked the nodes this happened on, but the job had been running for about 5-10 minutes across 5 nodes before this happened. Does the log show a rebalance was happening? It looks to me like the standby task was just committing as part of normal operations.
On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com> wrote: > Hi Greg, > > Obviously a bit difficult to read the RocksDBException, but my guess is it > is because the state directory gets deleted right before the flush happens: > 2017-07-04 10:54:46,829 [myid:] - INFO [StreamThread-21: > StateDirectory@213] > - Deleting obsolete state directory 0_10 for task 0_10 > > Yes it looks like it is possibly the same bug as KAFKA-5070. > > It looks like your application is constantly rebalancing during store > intialization, which may be the reason this bug comes about (there is a > chance that the state dir lock is released so when the thread tries to > removes the stale state directory it is able to get the lock). You probably > want to configure `max.poll.interval.ms` to be a reasonably large value ( > i > think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try > setting `state.cleanup.delay.ms` to a higher value (default is 10 > minutes), > to try and avoid it happening during a rebalance (I know this isn't a fix, > but will make it less likely to happen). > > Thanks, > Damian > > On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote: > > > Hi all, we are working on upgrading our jobs from 0.10.0 to use Kafka > > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that has 4 > > state stores and runs across a few hundred partitions, and as part of > load > > testing the job we are trying to reload our data out of kafka into a test > > db. The result is we are able to load about 4M tuples and then this error > > pops up on all of the stream nodes simultaneously. There are 4 rocksdb > > stores in question and there are lots of these errors which takes it > down. > > This bug *does* not seem to occur on 0.10.1. > > > > A similar error was mentioned here: > > https://issues.apache.org/jira/browse/KAFKA-5070 > > > > Full log attached. > > > > org.apache.kafka.streams.errors.ProcessorStateException: task [0_10] > > Failed to flush state store session-id-start-events > > at > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush(ProcessorStateManager.java:337) > > at > > org.apache.kafka.streams.processor.internals.StandbyTask.commit( > StandbyTask.java:94) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > StreamThread.java:807) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > StreamThread.java:797) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > StreamThread.java:769) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:647) > > at > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:361) > > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: > Error > > while executing flush from store session-id-start-events > > at > > org.apache.kafka.streams.state.internals.RocksDBStore. > flushInternal(RocksDBStore.java:354) > > at > > org.apache.kafka.streams.state.internals.RocksDBStore. > flush(RocksDBStore.java:345) > > at > > org.apache.kafka.streams.state.internals.WrappedStateStore$ > AbstractWrappedStateStore.flush(WrappedStateStore.java:80) > > at > > org.apache.kafka.streams.state.internals.WrappedStateStore$ > AbstractWrappedStateStore.flush(WrappedStateStore.java:80) > > at > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run( > MeteredKeyValueStore.java:92) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush( > MeteredKeyValueStore.java:186) > > at > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.flush(ProcessorStateManager.java:335) > > ... 6 more > > Caused by: org.rocksdb.RocksDBException: v > > at org.rocksdb.RocksDB.flush(Native Method) > > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) > > at > > org.apache.kafka.streams.state.internals.RocksDBStore. > flushInternal(RocksDBStore.java:352) > > ... 13 more > > > > >