ok.. got it.. Thanks...changed it, and it works. -Sameer.
On Wed, Aug 16, 2017 at 4:06 PM, Damian Guy <damian....@gmail.com> wrote: > I see. It is the same issue, though. The problem is that Long.MAX_VALUE is > actually too large, it causes an overflow so the task will still run, i.e, > in this bit of code: > > if (now > lastCleanMs + cleanTimeMs) { > stateDirectory.cleanRemovedTasks(cleanTimeMs); > lastCleanMs = now; > } > > So, you will need to set it to a large enough value to disable it but not > Long.MAX_VALUE (sorry) > > On Wed, 16 Aug 2017 at 10:21 Sameer Kumar <sam.kum.w...@gmail.com> wrote: > > > I have already set this configuration. This info is there in logs as > well. > > > > state.cleanup.delay.ms = 9223372036854775807 > > > > -Sameer. > > > > On Wed, Aug 16, 2017 at 1:56 PM, Damian Guy <damian....@gmail.com> > wrote: > > > > > I believe it is related to a bug in the state directory cleanup. This > has > > > been fixed on trunk and also on the 0.11 branch (will be part of > 0.11.0.1 > > > that will hopefully be released soon). The fix is in this JIRA: > > > https://issues.apache.org/jira/browse/KAFKA-5562 > > > > > > To work around it you should set > > > StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG to Long.MAX_VALUE, i.e., > > > disable it. > > > > > > Thanks, > > > Damian > > > > > > > > > On Wed, 16 Aug 2017 at 07:39 Sameer Kumar <sam.kum.w...@gmail.com> > > wrote: > > > > > > > Hi Damian, > > > > > > > > Please find the relevant logs as requested. Let me know if you need > any > > > > more info. > > > > > > > > -Sameer. > > > > > > > > On Tue, Aug 15, 2017 at 9:33 PM, Damian Guy <damian....@gmail.com> > > > wrote: > > > > > > > >> Sameer, the log you attached doesn't contain the logs *before* the > > > > > > > > > > > >> exception happened. > > > >> > > > >> On Tue, 15 Aug 2017 at 06:13 Sameer Kumar <sam.kum.w...@gmail.com> > > > wrote: > > > >> > > > >> > I have added a attachement containing complete trace in my initial > > > mail. > > > >> > > > > >> > On Mon, Aug 14, 2017 at 9:47 PM, Damian Guy <damian....@gmail.com > > > > > >> wrote: > > > >> > > > > >> > > Do you have the logs leading up to the exception? > > > >> > > > > > >> > > On Mon, 14 Aug 2017 at 06:52 Sameer Kumar < > sam.kum.w...@gmail.com > > > > > > >> > wrote: > > > >> > > > > > >> > > > Exception while doing the join, cant decipher more on this. > Has > > > >> anyone > > > >> > > > faced it. complete exception trace attached. > > > >> > > > > > > >> > > > 2017-08-14 11:15:55 ERROR ConsumerCoordinator:269 - User > > provided > > > >> > > listener > > > >> > > > org.apache.kafka.streams.processor.internals. > > > >> > > StreamThread$RebalanceListener > > > >> > > > for group c-7-a34 failed on partition assignment > > > >> > > > org.apache.kafka.streams.errors.ProcessorStateException: > Error > > > >> opening > > > >> > > > store KSTREAM-JOINTHIS-0000000018-store-201708140520 at > location > > > >> > > > /data/streampoc/c-7-a34/0_4/KSTREAM-JOINTHIS-0000000018- > > > >> > > store/KSTREAM-JOINTHIS-0000000018-store-201708140520 > > > >> > > > at > > > >> > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > >> > > openDB(RocksDBStore.java:198) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > >> > > openDB(RocksDBStore.java:165) > > > >> > > > at > > > >> > > > > > > >> > org.apache.kafka.streams.state.internals.Segment. > > > openDB(Segment.java:40) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.state.internals.Segments. > > > >> > > getOrCreateSegment(Segments.java:86) > > > >> > > > at > > > >> > > > > > > >> > org.apache.kafka.streams.state.internals. > RocksDBSegmentedBytesStore. > > > put( > > > >> > > RocksDBSegmentedBytesStore.java:81) > > > >> > > > at > > > >> > > > > > > >> org.apache.kafka.streams.state.internals. > RocksDBSegmentedBytesStore$1. > > > >> > > restore(RocksDBSegmentedBytesStore.java:113) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.processor.internals. > > > StateRestorer.restore( > > > >> > > StateRestorer.java:55) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.processor.internals. > > > StoreChangelogReader. > > > >> > > processNext(StoreChangelogReader.java:216) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.processor.internals. > > > StoreChangelogReader. > > > >> > > restorePartition(StoreChangelogReader.java:186) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.processor.internals. > > > >> > > StoreChangelogReader.restore(StoreChangelogReader.java:151) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.processor.internals.StreamThread$ > > > >> > > RebalanceListener.onPartitionsAssigned(StreamThread.java:184) > > > >> > > > at > > > >> > > > org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator. > > > >> > > onJoinComplete(ConsumerCoordinator.java:265) > > > >> > > > at > > > >> > > > org.apache.kafka.clients.consumer.internals. > AbstractCoordinator. > > > >> > > joinGroupIfNeeded(AbstractCoordinator.java:363) > > > >> > > > at > > > >> > > > org.apache.kafka.clients.consumer.internals. > AbstractCoordinator. > > > >> > > ensureActiveGroup(AbstractCoordinator.java:310) > > > >> > > > at > > > >> > > > > > > >> org.apache.kafka.clients.consumer.internals. > ConsumerCoordinator.poll( > > > >> > > ConsumerCoordinator.java:297) > > > >> > > > at > > > >> > > > org.apache.kafka.clients.consumer.KafkaConsumer. > > > >> > > pollOnce(KafkaConsumer.java:1078) > > > >> > > > at > > > >> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > >> > > KafkaConsumer.java:1043) > > > >> > > > at > > > >> > > > > > > >> > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests( > > > >> > > StreamThread.java:582) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.runLoop( > > > >> > > StreamThread.java:553) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.processor.internals. > > > >> > > StreamThread.run(StreamThread.java:527) > > > >> > > > Caused by: org.rocksdb.RocksDBException: > > > >> > > > at org.rocksdb.RocksDB.open(Native Method) > > > >> > > > at org.rocksdb.RocksDB.open(RocksDB.java:231) > > > >> > > > at > > > >> > > > org.apache.kafka.streams.state.internals.RocksDBStore. > > > >> > > openDB(RocksDBStore.java:191) > > > >> > > > ... 19 more > > > >> > > > > > > >> > > > -sameer. > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > >