I have already set this configuration. This info is there in logs as well.

state.cleanup.delay.ms = 9223372036854775807

-Sameer.

On Wed, Aug 16, 2017 at 1:56 PM, Damian Guy <damian....@gmail.com> wrote:

> I believe it is related to a bug in the state directory cleanup. This has
> been fixed on trunk and also on the 0.11 branch (will be part of 0.11.0.1
> that will hopefully be released soon). The fix is in this JIRA:
> https://issues.apache.org/jira/browse/KAFKA-5562
>
> To work around it you should set
> StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG to Long.MAX_VALUE, i.e.,
> disable it.
>
> Thanks,
> Damian
>
>
> On Wed, 16 Aug 2017 at 07:39 Sameer Kumar <sam.kum.w...@gmail.com> wrote:
>
> > Hi Damian,
> >
> > Please find the relevant logs as requested. Let me know if you need any
> > more info.
> >
> > -Sameer.
> >
> > On Tue, Aug 15, 2017 at 9:33 PM, Damian Guy <damian....@gmail.com>
> wrote:
> >
> >> Sameer, the log you attached doesn't contain the logs *before* the
> >
> >
> >> exception happened.
> >>
> >> On Tue, 15 Aug 2017 at 06:13 Sameer Kumar <sam.kum.w...@gmail.com>
> wrote:
> >>
> >> > I have added a attachement containing complete trace in my initial
> mail.
> >> >
> >> > On Mon, Aug 14, 2017 at 9:47 PM, Damian Guy <damian....@gmail.com>
> >> wrote:
> >> >
> >> > > Do you have the logs leading up to the exception?
> >> > >
> >> > > On Mon, 14 Aug 2017 at 06:52 Sameer Kumar <sam.kum.w...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Exception while doing the join, cant decipher more on this. Has
> >> anyone
> >> > > > faced it. complete exception trace attached.
> >> > > >
> >> > > > 2017-08-14 11:15:55 ERROR ConsumerCoordinator:269 - User provided
> >> > > listener
> >> > > > org.apache.kafka.streams.processor.internals.
> >> > > StreamThread$RebalanceListener
> >> > > > for group c-7-a34 failed on partition assignment
> >> > > > org.apache.kafka.streams.errors.ProcessorStateException: Error
> >> opening
> >> > > > store KSTREAM-JOINTHIS-0000000018-store-201708140520 at location
> >> > > > /data/streampoc/c-7-a34/0_4/KSTREAM-JOINTHIS-0000000018-
> >> > > store/KSTREAM-JOINTHIS-0000000018-store-201708140520
> >> > > >         at
> >> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> >> > > openDB(RocksDBStore.java:198)
> >> > > >         at
> >> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> >> > > openDB(RocksDBStore.java:165)
> >> > > >         at
> >> > > >
> >> > org.apache.kafka.streams.state.internals.Segment.
> openDB(Segment.java:40)
> >> > > >         at
> >> > > > org.apache.kafka.streams.state.internals.Segments.
> >> > > getOrCreateSegment(Segments.java:86)
> >> > > >         at
> >> > > >
> >> > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.
> put(
> >> > > RocksDBSegmentedBytesStore.java:81)
> >> > > >         at
> >> > > >
> >> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore$1.
> >> > > restore(RocksDBSegmentedBytesStore.java:113)
> >> > > >         at
> >> > > > org.apache.kafka.streams.processor.internals.
> StateRestorer.restore(
> >> > > StateRestorer.java:55)
> >> > > >         at
> >> > > > org.apache.kafka.streams.processor.internals.
> StoreChangelogReader.
> >> > > processNext(StoreChangelogReader.java:216)
> >> > > >         at
> >> > > > org.apache.kafka.streams.processor.internals.
> StoreChangelogReader.
> >> > > restorePartition(StoreChangelogReader.java:186)
> >> > > >         at
> >> > > > org.apache.kafka.streams.processor.internals.
> >> > > StoreChangelogReader.restore(StoreChangelogReader.java:151)
> >> > > >         at
> >> > > > org.apache.kafka.streams.processor.internals.StreamThread$
> >> > > RebalanceListener.onPartitionsAssigned(StreamThread.java:184)
> >> > > >         at
> >> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> >> > > onJoinComplete(ConsumerCoordinator.java:265)
> >> > > >         at
> >> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> > > joinGroupIfNeeded(AbstractCoordinator.java:363)
> >> > > >         at
> >> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> >> > > ensureActiveGroup(AbstractCoordinator.java:310)
> >> > > >         at
> >> > > >
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> >> > > ConsumerCoordinator.java:297)
> >> > > >         at
> >> > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> >> > > pollOnce(KafkaConsumer.java:1078)
> >> > > >         at
> >> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >> > > KafkaConsumer.java:1043)
> >> > > >         at
> >> > > >
> >> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(
> >> > > StreamThread.java:582)
> >> > > >         at
> >> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(
> >> > > StreamThread.java:553)
> >> > > >         at
> >> > > > org.apache.kafka.streams.processor.internals.
> >> > > StreamThread.run(StreamThread.java:527)
> >> > > > Caused by: org.rocksdb.RocksDBException:
> >> > > >         at org.rocksdb.RocksDB.open(Native Method)
> >> > > >         at org.rocksdb.RocksDB.open(RocksDB.java:231)
> >> > > >         at
> >> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> >> > > openDB(RocksDBStore.java:191)
> >> > > >         ... 19 more
> >> > > >
> >> > > > -sameer.
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to