I believe it is related to a bug in the state directory cleanup. This has
been fixed on trunk and also on the 0.11 branch (will be part of 0.11.0.1
that will hopefully be released soon). The fix is in this JIRA:
https://issues.apache.org/jira/browse/KAFKA-5562
To work around it you should set
StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG to Long.MAX_VALUE, i.e.,
disable it.

Thanks,
Damian


On Wed, 16 Aug 2017 at 07:39 Sameer Kumar <sam.kum.w...@gmail.com> wrote:

> Hi Damian,
>
> Please find the relevant logs as requested. Let me know if you need any
> more info.
>
> -Sameer.
>
> On Tue, Aug 15, 2017 at 9:33 PM, Damian Guy <damian....@gmail.com> wrote:
>
>> Sameer, the log you attached doesn't contain the logs *before* the
>
>
>> exception happened.
>>
>> On Tue, 15 Aug 2017 at 06:13 Sameer Kumar <sam.kum.w...@gmail.com> wrote:
>>
>> > I have added a attachement containing complete trace in my initial mail.
>> >
>> > On Mon, Aug 14, 2017 at 9:47 PM, Damian Guy <damian....@gmail.com>
>> wrote:
>> >
>> > > Do you have the logs leading up to the exception?
>> > >
>> > > On Mon, 14 Aug 2017 at 06:52 Sameer Kumar <sam.kum.w...@gmail.com>
>> > wrote:
>> > >
>> > > > Exception while doing the join, cant decipher more on this. Has
>> anyone
>> > > > faced it. complete exception trace attached.
>> > > >
>> > > > 2017-08-14 11:15:55 ERROR ConsumerCoordinator:269 - User provided
>> > > listener
>> > > > org.apache.kafka.streams.processor.internals.
>> > > StreamThread$RebalanceListener
>> > > > for group c-7-a34 failed on partition assignment
>> > > > org.apache.kafka.streams.errors.ProcessorStateException: Error
>> opening
>> > > > store KSTREAM-JOINTHIS-0000000018-store-201708140520 at location
>> > > > /data/streampoc/c-7-a34/0_4/KSTREAM-JOINTHIS-0000000018-
>> > > store/KSTREAM-JOINTHIS-0000000018-store-201708140520
>> > > >         at
>> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
>> > > openDB(RocksDBStore.java:198)
>> > > >         at
>> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
>> > > openDB(RocksDBStore.java:165)
>> > > >         at
>> > > >
>> > org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40)
>> > > >         at
>> > > > org.apache.kafka.streams.state.internals.Segments.
>> > > getOrCreateSegment(Segments.java:86)
>> > > >         at
>> > > >
>> > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(
>> > > RocksDBSegmentedBytesStore.java:81)
>> > > >         at
>> > > >
>> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore$1.
>> > > restore(RocksDBSegmentedBytesStore.java:113)
>> > > >         at
>> > > > org.apache.kafka.streams.processor.internals.StateRestorer.restore(
>> > > StateRestorer.java:55)
>> > > >         at
>> > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.
>> > > processNext(StoreChangelogReader.java:216)
>> > > >         at
>> > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.
>> > > restorePartition(StoreChangelogReader.java:186)
>> > > >         at
>> > > > org.apache.kafka.streams.processor.internals.
>> > > StoreChangelogReader.restore(StoreChangelogReader.java:151)
>> > > >         at
>> > > > org.apache.kafka.streams.processor.internals.StreamThread$
>> > > RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
>> > > >         at
>> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
>> > > onJoinComplete(ConsumerCoordinator.java:265)
>> > > >         at
>> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> > > joinGroupIfNeeded(AbstractCoordinator.java:363)
>> > > >         at
>> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> > > ensureActiveGroup(AbstractCoordinator.java:310)
>> > > >         at
>> > > >
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> > > ConsumerCoordinator.java:297)
>> > > >         at
>> > > > org.apache.kafka.clients.consumer.KafkaConsumer.
>> > > pollOnce(KafkaConsumer.java:1078)
>> > > >         at
>> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> > > KafkaConsumer.java:1043)
>> > > >         at
>> > > >
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(
>> > > StreamThread.java:582)
>> > > >         at
>> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > > StreamThread.java:553)
>> > > >         at
>> > > > org.apache.kafka.streams.processor.internals.
>> > > StreamThread.run(StreamThread.java:527)
>> > > > Caused by: org.rocksdb.RocksDBException:
>> > > >         at org.rocksdb.RocksDB.open(Native Method)
>> > > >         at org.rocksdb.RocksDB.open(RocksDB.java:231)
>> > > >         at
>> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
>> > > openDB(RocksDBStore.java:191)
>> > > >         ... 19 more
>> > > >
>> > > > -sameer.
>> > > >
>> > >
>> >
>>
>

Reply via email to