Hi Nicolas, Thank you for reporting this issue.
As far as I understand, the issue is that bulk loading as done in Kafka Streams does work as expected if FIFO compaction is used. I would propose that you open a bug ticket. Please make sure to include steps to reproduce the issue in the ticket. Additionally, since you already have some code that solves the issue, you could assign the ticket to yourself and open a PR for it. Would you be interested in contributing? If you cannot assign the ticket to yourself, ask to be added to the list of contributors on the dev mailing list. Best, Bruno On Thu, Apr 16, 2020 at 11:06 AM Nicolas Carlot < nicolas.car...@chronopost.fr> wrote: > # Bump # > So is this a bug ? Should I file a ticket ? > Any idea ? I don't like the idea of having to patch Kafka libraries... > > Le mer. 1 avr. 2020 à 16:33, Nicolas Carlot <nicolas.car...@chronopost.fr> > a écrit : > >> Added some nasty code in kafka 2.4.1. Seems to work fine for now... From >> my understanding, the compaction process when restoring a store is only >> done to speed up things. >> So I guess this kind of "hack" isn't such a big deal ? >> >> [image: image.png] >> >> Le mer. 1 avr. 2020 à 10:44, Nicolas Carlot <nicolas.car...@chronopost.fr> >> a écrit : >> >>> Here is the full configuration of Rocks. >>> >>> [image: image.png] >>> >>> Le mer. 1 avr. 2020 à 10:41, Nicolas Carlot < >>> nicolas.car...@chronopost.fr> a écrit : >>> >>>> It's not that I cannot turn on compaction. >>>> Compaction works fine. >>>> The issue is with the restoration process of the state store, which >>>> tries to compact the store to a single level: db.compactRange(columnFamily, >>>> true, 1, 0) before bulk loading the data. >>>> It automatically fails when I'm using FIFO compaction. >>>> >>>> >>>> Le mer. 1 avr. 2020 à 10:26, Nicolas Carlot < >>>> nicolas.car...@chronopost.fr> a écrit : >>>> >>>>> My current workaround is to completely delete the state store and >>>>> rebuild it from scratch. >>>>> >>>>> Le mar. 31 mars 2020 à 21:39, Boyang Chen <reluctanthero...@gmail.com> >>>>> a écrit : >>>>> >>>>>> Thanks Nicolas for the report, so are you suggesting that you >>>>>> couldn't turn >>>>>> on compactions for the state store? Is there a workaround? >>>>>> >>>>>> On Tue, Mar 31, 2020 at 9:54 AM Nicolas Carlot < >>>>>> nicolas.car...@chronopost.fr> >>>>>> wrote: >>>>>> >>>>>> > After some more testing and debugging, it seems that it is caused >>>>>> by the >>>>>> > compaction option I've configured for RocksDB. When removed >>>>>> everything is >>>>>> > fine... >>>>>> > The option is as follow: >>>>>> > >>>>>> > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); >>>>>> > fifoOptions.setMaxTableFilesSize(maxSize); >>>>>> > fifoOptions.setAllowCompaction(true); >>>>>> > options.setCompactionOptionsFIFO(fifoOptions); >>>>>> > options.setCompactionStyle(CompactionStyle.FIFO); >>>>>> > >>>>>> > Le mar. 31 mars 2020 à 16:27, Nicolas Carlot < >>>>>> nicolas.car...@chronopost.fr >>>>>> > > >>>>>> > a écrit : >>>>>> > >>>>>> > > Hello everyone, >>>>>> > > >>>>>> > > I'm currently facing an issue with RocksDb internal compaction >>>>>> process, >>>>>> > > which occurs when the local state store of several of my >>>>>> KafkaStream >>>>>> > > applications are being restored. This is sadly a huge concern as >>>>>> it >>>>>> > > completely discard resiliency over node failure as those often >>>>>> lead to a >>>>>> > > state store restoration. The only workaround I currently have is >>>>>> to >>>>>> > delete >>>>>> > > the local store to restore it from scratch. I'm using version >>>>>> 2.4.1 of >>>>>> > the >>>>>> > > Java libraries. >>>>>> > > >>>>>> > > The exception thrown by the KStream process is: >>>>>> > > org.apache.kafka.streams.errors.ProcessorStateException: Error >>>>>> while >>>>>> > range >>>>>> > > compacting during restoring store merge_store >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) >>>>>> > > [kafka-stream-router.jar:?] >>>>>> > > Caused by: org.rocksdb.RocksDBException: Target level exceeds >>>>>> number of >>>>>> > > levels >>>>>> > > at org.rocksdb.RocksDB.compactRange(Native Method) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > at >>>>>> > > >>>>>> > >>>>>> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) >>>>>> > > ~[kafka-stream-router.jar:?] >>>>>> > > ... 11 more >>>>>> > > >>>>>> > > Here is the state of partition 0_0 of one of those stores: >>>>>> > > -rw-r--r-- 1 java j2ee 148210568 31 mars 10:49 000139.sst >>>>>> > > -rw-r--r-- 1 java j2ee 21620385 31 mars 11:06 000184.sst >>>>>> > > -rw-r--r-- 1 java j2ee 0 31 mars 11:11 000198.log >>>>>> > > -rw-r--r-- 1 java j2ee 31602468 31 mars 11:31 000251.sst >>>>>> > > -rw-r--r-- 1 java j2ee 37856549 31 mars 12:00 000324.sst >>>>>> > > -rw-r--r-- 1 java j2ee 33498822 31 mars 12:26 000393.sst >>>>>> > > -rw-r--r-- 1 java j2ee 34368461 31 mars 12:49 000450.sst >>>>>> > > -rw-r--r-- 1 java j2ee 11371247 31 mars 12:55 000467.sst >>>>>> > > -rw-r--r-- 1 java j2ee 14356435 31 mars 13:04 000489.sst >>>>>> > > -rw-r--r-- 1 java j2ee 5858737 31 mars 13:05 000494.sst >>>>>> > > -rw-r--r-- 1 java j2ee 2545952 31 mars 14:08 000659.sst >>>>>> > > -rw-r--r-- 1 java j2ee 3187275 31 mars 15:27 000868.sst >>>>>> > > -rw-r--r-- 1 java j2ee 407017 31 mars 15:34 000885.sst >>>>>> > > -rw-r--r-- 1 java j2ee 590190 31 mars 15:45 000914.sst >>>>>> > > -rw-r--r-- 1 java j2ee 154471 31 mars 15:47 000919.sst >>>>>> > > -rw-r--r-- 1 java j2ee 139838 31 mars 15:49 000924.sst >>>>>> > > -rw-r--r-- 1 java j2ee 35058 31 mars 15:49 000925.sst >>>>>> > > -rw-r--r-- 1 java j2ee 33987 31 mars 15:50 000926.sst >>>>>> > > -rw-r--r-- 1 java j2ee 16 31 mars 11:11 CURRENT >>>>>> > > -rw-r--r-- 1 java j2ee 37 31 mars 10:33 IDENTITY >>>>>> > > -rw-r--r-- 1 java j2ee 0 31 mars 10:33 LOCK >>>>>> > > -rw-r--r-- 1 java j2ee 15340 31 mars 11:11 LOG >>>>>> > > -rw-r--r-- 1 java j2ee 15046 31 mars 10:33 >>>>>> LOG.old.1585643630145007 >>>>>> > > -rw-r--r-- 1 java j2ee 15290 31 mars 10:33 >>>>>> LOG.old.1585643826265995 >>>>>> > > -rw-r--r-- 1 java j2ee 15384 31 mars 10:37 >>>>>> LOG.old.1585645861692248 >>>>>> > > -rw-r--r-- 1 java j2ee 60767 31 mars 15:55 MANIFEST-000197 >>>>>> > > -rw-r--r-- 1 java j2ee 4857 31 mars 10:37 OPTIONS-000107 >>>>>> > > -rw-r--r-- 1 java j2ee 4857 31 mars 11:11 OPTIONS-000200 >>>>>> > > >>>>>> > > I can see that Kafka is running RocksDB's compaction with: >>>>>> > > public void toggleDbForBulkLoading() { >>>>>> > > try { >>>>>> > > db.compactRange(columnFamily, true, 1, 0); >>>>>> > > } catch (final RocksDBException e) { >>>>>> > > throw new ProcessorStateException("Error while >>>>>> range >>>>>> > > compacting during restoring store " + name, e); >>>>>> > > } >>>>>> > > } >>>>>> > > >>>>>> > > Seems related to an issue with RocksDB itself: >>>>>> > > https://github.com/facebook/rocksdb/issues/2734 >>>>>> > > But i'm not sure of this. >>>>>> > > Any help would be greatly appreciated here :) >>>>>> > > >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > *Nicolas Carlot* >>>>>> > >>>>>> > Lead dev >>>>>> > | | nicolas.car...@chronopost.fr >>>>>> > >>>>>> > >>>>>> > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost >>>>>> déménage. La >>>>>> > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* >>>>>> > >>>>>> > [image: Logo Chronopost] >>>>>> > | chronopost.fr <http://www.chronopost.fr/> >>>>>> > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et >>>>>> > Twitter >>>>>> > <https://twitter.com/chronopost>. >>>>>> > >>>>>> > [image: DPD Group] >>>>>> > >>>>>> >>>>> >>>>> >>>>> -- >>>>> *Nicolas Carlot* >>>>> >>>>> Lead dev >>>>> | | nicolas.car...@chronopost.fr >>>>> >>>>> >>>>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. >>>>> La nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* >>>>> >>>>> [image: Logo Chronopost] >>>>> | chronopost.fr <http://www.chronopost.fr/> >>>>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et >>>>> Twitter <https://twitter.com/chronopost>. >>>>> >>>>> [image: DPD Group] >>>>> >>>> >>>> >>>> -- >>>> *Nicolas Carlot* >>>> >>>> Lead dev >>>> | | nicolas.car...@chronopost.fr >>>> >>>> >>>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La >>>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* >>>> >>>> [image: Logo Chronopost] >>>> | chronopost.fr <http://www.chronopost.fr/> >>>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et >>>> Twitter <https://twitter.com/chronopost>. >>>> >>>> [image: DPD Group] >>>> >>> >>> >>> -- >>> *Nicolas Carlot* >>> >>> Lead dev >>> | | nicolas.car...@chronopost.fr >>> >>> >>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La >>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* >>> >>> [image: Logo Chronopost] >>> | chronopost.fr <http://www.chronopost.fr/> >>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et >>> Twitter <https://twitter.com/chronopost>. >>> >>> [image: DPD Group] >>> >> >> >> -- >> *Nicolas Carlot* >> >> Lead dev >> | | nicolas.car...@chronopost.fr >> >> >> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La >> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* >> >> [image: Logo Chronopost] >> | chronopost.fr <http://www.chronopost.fr/> >> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et >> Twitter <https://twitter.com/chronopost>. >> >> [image: DPD Group] >> > > > -- > *Nicolas Carlot* > > Lead dev > | | nicolas.car...@chronopost.fr > > > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > [image: Logo Chronopost] > | chronopost.fr <http://www.chronopost.fr/> > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et > Twitter <https://twitter.com/chronopost>. > > [image: DPD Group] >