I've nuked the nodes this happened on, but the job had been running for
about 5-10 minutes across 5 nodes before this happened. Does the log show a
rebalance was happening? It looks to me like the standby task was just
committing as part of normal operations.

On Tue, Jul 4, 2017 at 7:40 AM, Damian Guy <damian....@gmail.com> wrote:

> Hi Greg,
>
> Obviously a bit difficult to read the RocksDBException, but my guess is it
> is because the state directory gets deleted right before the flush happens:
> 2017-07-04 10:54:46,829 [myid:] - INFO  [StreamThread-21:
> StateDirectory@213]
> - Deleting obsolete state directory 0_10 for task 0_10
>
> Yes it looks like it is possibly the same bug as KAFKA-5070.
>
> It looks like your application is constantly rebalancing during store
> intialization, which may be the reason this bug comes about (there is a
> chance that the state dir lock is released so when the thread tries to
> removes the stale state directory it is able to get the lock). You probably
> want to configure `max.poll.interval.ms` to be a reasonably large value (
> i
> think we default to Integer.MAX_VALUE in 0.10.2.1). You can also try
> setting `state.cleanup.delay.ms` to a higher value (default is 10
> minutes),
> to try and avoid it happening during a rebalance (I know this isn't a fix,
> but will make it less likely to happen).
>
> Thanks,
> Damian
>
> On Tue, 4 Jul 2017 at 12:43 Greg Fodor <gfo...@gmail.com> wrote:
>
> > Hi all, we are working on upgrading our jobs from 0.10.0 to use Kafka
> > Streams 0.10.2.1 and are hitting a problem. We have an ETL job that has 4
> > state stores and runs across a few hundred partitions, and as part of
> load
> > testing the job we are trying to reload our data out of kafka into a test
> > db. The result is we are able to load about 4M tuples and then this error
> > pops up on all of the stream nodes simultaneously. There are 4 rocksdb
> > stores in question and there are lots of these errors which takes it
> down.
> > This bug *does* not seem to occur on 0.10.1.
> >
> > A similar error was mentioned here:
> > https://issues.apache.org/jira/browse/KAFKA-5070
> >
> > Full log attached.
> >
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_10]
> > Failed to flush state store session-id-start-events
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(ProcessorStateManager.java:337)
> > at
> > org.apache.kafka.streams.processor.internals.StandbyTask.commit(
> StandbyTask.java:94)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:807)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:797)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:769)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:647)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:361)
> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> Error
> > while executing flush from store session-id-start-events
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> flushInternal(RocksDBStore.java:354)
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> flush(RocksDBStore.java:345)
> > at
> > org.apache.kafka.streams.state.internals.WrappedStateStore$
> AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> > at
> > org.apache.kafka.streams.state.internals.WrappedStateStore$
> AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> > at
> > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(
> MeteredKeyValueStore.java:92)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> MeteredKeyValueStore.java:186)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(ProcessorStateManager.java:335)
> > ... 6 more
> > Caused by: org.rocksdb.RocksDBException: v
> > at org.rocksdb.RocksDB.flush(Native Method)
> > at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> > at
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> flushInternal(RocksDBStore.java:352)
> > ... 13 more
> >
> >
>

Reply via email to