My current workaround is to completely delete the state store and rebuild it from scratch.
Le mar. 31 mars 2020 à 21:39, Boyang Chen <reluctanthero...@gmail.com> a écrit : > Thanks Nicolas for the report, so are you suggesting that you couldn't turn > on compactions for the state store? Is there a workaround? > > On Tue, Mar 31, 2020 at 9:54 AM Nicolas Carlot < > nicolas.car...@chronopost.fr> > wrote: > > > After some more testing and debugging, it seems that it is caused by the > > compaction option I've configured for RocksDB. When removed everything is > > fine... > > The option is as follow: > > > > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); > > fifoOptions.setMaxTableFilesSize(maxSize); > > fifoOptions.setAllowCompaction(true); > > options.setCompactionOptionsFIFO(fifoOptions); > > options.setCompactionStyle(CompactionStyle.FIFO); > > > > Le mar. 31 mars 2020 à 16:27, Nicolas Carlot < > nicolas.car...@chronopost.fr > > > > > a écrit : > > > > > Hello everyone, > > > > > > I'm currently facing an issue with RocksDb internal compaction process, > > > which occurs when the local state store of several of my KafkaStream > > > applications are being restored. This is sadly a huge concern as it > > > completely discard resiliency over node failure as those often lead to > a > > > state store restoration. The only workaround I currently have is to > > delete > > > the local store to restore it from scratch. I'm using version 2.4.1 of > > the > > > Java libraries. > > > > > > The exception thrown by the KStream process is: > > > org.apache.kafka.streams.errors.ProcessorStateException: Error while > > range > > > compacting during restoring store merge_store > > > at > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > > > [kafka-stream-router.jar:?] > > > Caused by: org.rocksdb.RocksDBException: Target level exceeds number of > > > levels > > > at org.rocksdb.RocksDB.compactRange(Native Method) > > > ~[kafka-stream-router.jar:?] > > > at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) > > > ~[kafka-stream-router.jar:?] > > > at > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) > > > ~[kafka-stream-router.jar:?] > > > ... 11 more > > > > > > Here is the state of partition 0_0 of one of those stores: > > > -rw-r--r-- 1 java j2ee 148210568 31 mars 10:49 000139.sst > > > -rw-r--r-- 1 java j2ee 21620385 31 mars 11:06 000184.sst > > > -rw-r--r-- 1 java j2ee 0 31 mars 11:11 000198.log > > > -rw-r--r-- 1 java j2ee 31602468 31 mars 11:31 000251.sst > > > -rw-r--r-- 1 java j2ee 37856549 31 mars 12:00 000324.sst > > > -rw-r--r-- 1 java j2ee 33498822 31 mars 12:26 000393.sst > > > -rw-r--r-- 1 java j2ee 34368461 31 mars 12:49 000450.sst > > > -rw-r--r-- 1 java j2ee 11371247 31 mars 12:55 000467.sst > > > -rw-r--r-- 1 java j2ee 14356435 31 mars 13:04 000489.sst > > > -rw-r--r-- 1 java j2ee 5858737 31 mars 13:05 000494.sst > > > -rw-r--r-- 1 java j2ee 2545952 31 mars 14:08 000659.sst > > > -rw-r--r-- 1 java j2ee 3187275 31 mars 15:27 000868.sst > > > -rw-r--r-- 1 java j2ee 407017 31 mars 15:34 000885.sst > > > -rw-r--r-- 1 java j2ee 590190 31 mars 15:45 000914.sst > > > -rw-r--r-- 1 java j2ee 154471 31 mars 15:47 000919.sst > > > -rw-r--r-- 1 java j2ee 139838 31 mars 15:49 000924.sst > > > -rw-r--r-- 1 java j2ee 35058 31 mars 15:49 000925.sst > > > -rw-r--r-- 1 java j2ee 33987 31 mars 15:50 000926.sst > > > -rw-r--r-- 1 java j2ee 16 31 mars 11:11 CURRENT > > > -rw-r--r-- 1 java j2ee 37 31 mars 10:33 IDENTITY > > > -rw-r--r-- 1 java j2ee 0 31 mars 10:33 LOCK > > > -rw-r--r-- 1 java j2ee 15340 31 mars 11:11 LOG > > > -rw-r--r-- 1 java j2ee 15046 31 mars 10:33 > LOG.old.1585643630145007 > > > -rw-r--r-- 1 java j2ee 15290 31 mars 10:33 > LOG.old.1585643826265995 > > > -rw-r--r-- 1 java j2ee 15384 31 mars 10:37 > LOG.old.1585645861692248 > > > -rw-r--r-- 1 java j2ee 60767 31 mars 15:55 MANIFEST-000197 > > > -rw-r--r-- 1 java j2ee 4857 31 mars 10:37 OPTIONS-000107 > > > -rw-r--r-- 1 java j2ee 4857 31 mars 11:11 OPTIONS-000200 > > > > > > I can see that Kafka is running RocksDB's compaction with: > > > public void toggleDbForBulkLoading() { > > > try { > > > db.compactRange(columnFamily, true, 1, 0); > > > } catch (final RocksDBException e) { > > > throw new ProcessorStateException("Error while range > > > compacting during restoring store " + name, e); > > > } > > > } > > > > > > Seems related to an issue with RocksDB itself: > > > https://github.com/facebook/rocksdb/issues/2734 > > > But i'm not sure of this. > > > Any help would be greatly appreciated here :) > > > > > > > > > -- > > *Nicolas Carlot* > > > > Lead dev > > | | nicolas.car...@chronopost.fr > > > > > > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La > > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > > > [image: Logo Chronopost] > > | chronopost.fr <http://www.chronopost.fr/> > > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et > > Twitter > > <https://twitter.com/chronopost>. > > > > [image: DPD Group] > > > -- *Nicolas Carlot* Lead dev | | nicolas.car...@chronopost.fr *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* [image: Logo Chronopost] | chronopost.fr <http://www.chronopost.fr/> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et Twitter <https://twitter.com/chronopost>. [image: DPD Group]