Upon another run, I see the same error occur during a rebalance, so either my log was showing a rebalance or there is a shared underlying issue with state stores.
On Tue, Jul 4, 2017 at 11:35 AM, Greg Fodor <gfo...@gmail.com> wrote: > Also, I am on 0.10.2.1, so poll interval was already set to MAX_VALUE. > > On Tue, Jul 4, 2017 at 11:28 AM, Greg Fodor <gfo...@gmail.com> wrote: > >> I've nuked the nodes this happened on, but the job had been running for >> about 5-10 minutes across 5 nodes before this happened. Does the log show a >> rebalance was happening? It looks to me like the standby task was just >> committing as part of normal operations. >> >> On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com> wrote: >> >>> Hi Greg, >>> >>> Obviously a bit difficult to read the RocksDBException, but my guess is >>> it >>> is because the state directory gets deleted right before the flush >>> happens: >>> 2017-07-04 10:54:46,829 [myid:] - INFO [StreamThread-21:StateDirector >>> y@213] >>> - Deleting obsolete state directory 0_10 for task 0_10 >>> >>> Yes it looks like it is possibly the same bug as KAFKA-5070. >>> >>> It looks like your application is constantly rebalancing during store >>> intialization, which may be the reason this bug comes about (there is a >>> chance that the state dir lock is released so when the thread tries to >>> removes the stale state directory it is able to get the lock). You >>> probably >>> want to configure `max.poll.interval.ms` to be a reasonably large value >>> ( i >>> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try >>> setting `state.cleanup.delay.ms` to a higher value (default is 10 >>> minutes), >>> to try and avoid it happening during a rebalance (I know this isn't a >>> fix, >>> but will make it less likely to happen). >>> >>> Thanks, >>> Damian >>> >>> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote: >>> >>> > Hi all, we are working on upgrading our jobs from 0.10.0 to use Kafka >>> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that >>> has 4 >>> > state stores and runs across a few hundred partitions, and as part of >>> load >>> > testing the job we are trying to reload our data out of kafka into a >>> test >>> > db. The result is we are able to load about 4M tuples and then this >>> error >>> > pops up on all of the stream nodes simultaneously. There are 4 rocksdb >>> > stores in question and there are lots of these errors which takes it >>> down. >>> > This bug *does* not seem to occur on 0.10.1. >>> > >>> > A similar error was mentioned here: >>> > https://issues.apache.org/jira/browse/KAFKA-5070 >>> > >>> > Full log attached. >>> > >>> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_10] >>> > Failed to flush state store session-id-start-events >>> > at >>> > org.apache.kafka.streams.processor.internals.ProcessorStateM >>> anager.flush(ProcessorStateManager.java:337) >>> > at >>> > org.apache.kafka.streams.processor.internals.StandbyTask.com >>> mit(StandbyTask.java:94) >>> > at >>> > org.apache.kafka.streams.processor.internals.StreamThread.co >>> mmitOne(StreamThread.java:807) >>> > at >>> > org.apache.kafka.streams.processor.internals.StreamThread.co >>> mmitAll(StreamThread.java:797) >>> > at >>> > org.apache.kafka.streams.processor.internals.StreamThread.ma >>> ybeCommit(StreamThread.java:769) >>> > at >>> > org.apache.kafka.streams.processor.internals.StreamThread.ru >>> nLoop(StreamThread.java:647) >>> > at >>> > org.apache.kafka.streams.processor.internals.StreamThread.ru >>> n(StreamThread.java:361) >>> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: >>> Error >>> > while executing flush from store session-id-start-events >>> > at >>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI >>> nternal(RocksDBStore.java:354) >>> > at >>> > org.apache.kafka.streams.state.internals.RocksDBStore.flush( >>> RocksDBStore.java:345) >>> > at >>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A >>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >>> > at >>> > org.apache.kafka.streams.state.internals.WrappedStateStore$A >>> bstractWrappedStateStore.flush(WrappedStateStore.java:80) >>> > at >>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor >>> e$6.run(MeteredKeyValueStore.java:92) >>> > at >>> > org.apache.kafka.streams.processor.internals.StreamsMetricsI >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:188) >>> > at >>> > org.apache.kafka.streams.state.internals.MeteredKeyValueStor >>> e.flush(MeteredKeyValueStore.java:186) >>> > at >>> > org.apache.kafka.streams.processor.internals.ProcessorStateM >>> anager.flush(ProcessorStateManager.java:335) >>> > ... 6 more >>> > Caused by: org.rocksdb.RocksDBException: v >>> > at org.rocksdb.RocksDB.flush(Native Method) >>> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) >>> > at >>> > org.apache.kafka.streams.state.internals.RocksDBStore.flushI >>> nternal(RocksDBStore.java:352) >>> > ... 13 more >>> > >>> > >>> >> >> >