Hi Nicolas, Yeah, I meant "doesn't". Sorry for that!
Best, Bruno On Thu, Apr 16, 2020 at 1:02 PM Nicolas Carlot <nicolas.car...@chronopost.fr> wrote: > > Hi Bruno, > > "As far as I understand, the issue is that bulk loading as done in Kafka > Streams does work as expected if FIFO compaction is used." > > You meant "doesn't" right ? > > Ok, I will open a ticket, but I don't think my "fix" is the correct one. > Just ignoring the issue doesn't seem to be a correct solution :) > > Le jeu. 16 avr. 2020 à 11:49, Bruno Cadonna <br...@confluent.io> a écrit : > > > Hi Nicolas, > > > > Thank you for reporting this issue. > > > > As far as I understand, the issue is that bulk loading as done in Kafka > > Streams does work as expected if FIFO compaction is used. > > > > I would propose that you open a bug ticket. Please make sure to include > > steps to reproduce the issue in the ticket. Additionally, since you already > > have some code that solves the issue, you could assign the ticket to > > yourself and open a PR for it. Would you be interested in contributing? > > > > If you cannot assign the ticket to yourself, ask to be added to the list of > > contributors on the dev mailing list. > > > > Best, > > Bruno > > > > On Thu, Apr 16, 2020 at 11:06 AM Nicolas Carlot < > > nicolas.car...@chronopost.fr> wrote: > > > > > # Bump # > > > So is this a bug ? Should I file a ticket ? > > > Any idea ? I don't like the idea of having to patch Kafka libraries... > > > > > > Le mer. 1 avr. 2020 à 16:33, Nicolas Carlot < > > nicolas.car...@chronopost.fr> > > > a écrit : > > > > > >> Added some nasty code in kafka 2.4.1. Seems to work fine for now... From > > >> my understanding, the compaction process when restoring a store is only > > >> done to speed up things. > > >> So I guess this kind of "hack" isn't such a big deal ? > > >> > > >> [image: image.png] > > >> > > >> Le mer. 1 avr. 2020 à 10:44, Nicolas Carlot < > > nicolas.car...@chronopost.fr> > > >> a écrit : > > >> > > >>> Here is the full configuration of Rocks. > > >>> > > >>> [image: image.png] > > >>> > > >>> Le mer. 1 avr. 2020 à 10:41, Nicolas Carlot < > > >>> nicolas.car...@chronopost.fr> a écrit : > > >>> > > >>>> It's not that I cannot turn on compaction. > > >>>> Compaction works fine. > > >>>> The issue is with the restoration process of the state store, which > > >>>> tries to compact the store to a single level: > > db.compactRange(columnFamily, > > >>>> true, 1, 0) before bulk loading the data. > > >>>> It automatically fails when I'm using FIFO compaction. > > >>>> > > >>>> > > >>>> Le mer. 1 avr. 2020 à 10:26, Nicolas Carlot < > > >>>> nicolas.car...@chronopost.fr> a écrit : > > >>>> > > >>>>> My current workaround is to completely delete the state store and > > >>>>> rebuild it from scratch. > > >>>>> > > >>>>> Le mar. 31 mars 2020 à 21:39, Boyang Chen < > > reluctanthero...@gmail.com> > > >>>>> a écrit : > > >>>>> > > >>>>>> Thanks Nicolas for the report, so are you suggesting that you > > >>>>>> couldn't turn > > >>>>>> on compactions for the state store? Is there a workaround? > > >>>>>> > > >>>>>> On Tue, Mar 31, 2020 at 9:54 AM Nicolas Carlot < > > >>>>>> nicolas.car...@chronopost.fr> > > >>>>>> wrote: > > >>>>>> > > >>>>>> > After some more testing and debugging, it seems that it is caused > > >>>>>> by the > > >>>>>> > compaction option I've configured for RocksDB. When removed > > >>>>>> everything is > > >>>>>> > fine... > > >>>>>> > The option is as follow: > > >>>>>> > > > >>>>>> > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); > > >>>>>> > fifoOptions.setMaxTableFilesSize(maxSize); > > >>>>>> > fifoOptions.setAllowCompaction(true); > > >>>>>> > options.setCompactionOptionsFIFO(fifoOptions); > > >>>>>> > options.setCompactionStyle(CompactionStyle.FIFO); > > >>>>>> > > > >>>>>> > Le mar. 31 mars 2020 à 16:27, Nicolas Carlot < > > >>>>>> nicolas.car...@chronopost.fr > > >>>>>> > > > > >>>>>> > a écrit : > > >>>>>> > > > >>>>>> > > Hello everyone, > > >>>>>> > > > > >>>>>> > > I'm currently facing an issue with RocksDb internal compaction > > >>>>>> process, > > >>>>>> > > which occurs when the local state store of several of my > > >>>>>> KafkaStream > > >>>>>> > > applications are being restored. This is sadly a huge concern as > > >>>>>> it > > >>>>>> > > completely discard resiliency over node failure as those often > > >>>>>> lead to a > > >>>>>> > > state store restoration. The only workaround I currently have is > > >>>>>> to > > >>>>>> > delete > > >>>>>> > > the local store to restore it from scratch. I'm using version > > >>>>>> 2.4.1 of > > >>>>>> > the > > >>>>>> > > Java libraries. > > >>>>>> > > > > >>>>>> > > The exception thrown by the KStream process is: > > >>>>>> > > org.apache.kafka.streams.errors.ProcessorStateException: Error > > >>>>>> while > > >>>>>> > range > > >>>>>> > > compacting during restoring store merge_store > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > > >>>>>> > > [kafka-stream-router.jar:?] > > >>>>>> > > Caused by: org.rocksdb.RocksDBException: Target level exceeds > > >>>>>> number of > > >>>>>> > > levels > > >>>>>> > > at org.rocksdb.RocksDB.compactRange(Native Method) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > at > > >>>>>> > > > > >>>>>> > > > >>>>>> > > org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) > > >>>>>> > > ~[kafka-stream-router.jar:?] > > >>>>>> > > ... 11 more > > >>>>>> > > > > >>>>>> > > Here is the state of partition 0_0 of one of those stores: > > >>>>>> > > -rw-r--r-- 1 java j2ee 148210568 31 mars 10:49 000139.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 21620385 31 mars 11:06 000184.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 0 31 mars 11:11 000198.log > > >>>>>> > > -rw-r--r-- 1 java j2ee 31602468 31 mars 11:31 000251.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 37856549 31 mars 12:00 000324.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 33498822 31 mars 12:26 000393.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 34368461 31 mars 12:49 000450.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 11371247 31 mars 12:55 000467.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 14356435 31 mars 13:04 000489.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 5858737 31 mars 13:05 000494.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 2545952 31 mars 14:08 000659.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 3187275 31 mars 15:27 000868.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 407017 31 mars 15:34 000885.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 590190 31 mars 15:45 000914.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 154471 31 mars 15:47 000919.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 139838 31 mars 15:49 000924.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 35058 31 mars 15:49 000925.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 33987 31 mars 15:50 000926.sst > > >>>>>> > > -rw-r--r-- 1 java j2ee 16 31 mars 11:11 CURRENT > > >>>>>> > > -rw-r--r-- 1 java j2ee 37 31 mars 10:33 IDENTITY > > >>>>>> > > -rw-r--r-- 1 java j2ee 0 31 mars 10:33 LOCK > > >>>>>> > > -rw-r--r-- 1 java j2ee 15340 31 mars 11:11 LOG > > >>>>>> > > -rw-r--r-- 1 java j2ee 15046 31 mars 10:33 > > >>>>>> LOG.old.1585643630145007 > > >>>>>> > > -rw-r--r-- 1 java j2ee 15290 31 mars 10:33 > > >>>>>> LOG.old.1585643826265995 > > >>>>>> > > -rw-r--r-- 1 java j2ee 15384 31 mars 10:37 > > >>>>>> LOG.old.1585645861692248 > > >>>>>> > > -rw-r--r-- 1 java j2ee 60767 31 mars 15:55 MANIFEST-000197 > > >>>>>> > > -rw-r--r-- 1 java j2ee 4857 31 mars 10:37 OPTIONS-000107 > > >>>>>> > > -rw-r--r-- 1 java j2ee 4857 31 mars 11:11 OPTIONS-000200 > > >>>>>> > > > > >>>>>> > > I can see that Kafka is running RocksDB's compaction with: > > >>>>>> > > public void toggleDbForBulkLoading() { > > >>>>>> > > try { > > >>>>>> > > db.compactRange(columnFamily, true, 1, 0); > > >>>>>> > > } catch (final RocksDBException e) { > > >>>>>> > > throw new ProcessorStateException("Error while > > >>>>>> range > > >>>>>> > > compacting during restoring store " + name, e); > > >>>>>> > > } > > >>>>>> > > } > > >>>>>> > > > > >>>>>> > > Seems related to an issue with RocksDB itself: > > >>>>>> > > https://github.com/facebook/rocksdb/issues/2734 > > >>>>>> > > But i'm not sure of this. > > >>>>>> > > Any help would be greatly appreciated here :) > > >>>>>> > > > > >>>>>> > > > >>>>>> > > > >>>>>> > -- > > >>>>>> > *Nicolas Carlot* > > >>>>>> > > > >>>>>> > Lead dev > > >>>>>> > | | nicolas.car...@chronopost.fr > > >>>>>> > > > >>>>>> > > > >>>>>> > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost > > >>>>>> déménage. La > > >>>>>> > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > >>>>>> > > > >>>>>> > [image: Logo Chronopost] > > >>>>>> > | chronopost.fr <http://www.chronopost.fr/> > > >>>>>> > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> > > et > > >>>>>> > Twitter > > >>>>>> > <https://twitter.com/chronopost>. > > >>>>>> > > > >>>>>> > [image: DPD Group] > > >>>>>> > > > >>>>>> > > >>>>> > > >>>>> > > >>>>> -- > > >>>>> *Nicolas Carlot* > > >>>>> > > >>>>> Lead dev > > >>>>> | | nicolas.car...@chronopost.fr > > >>>>> > > >>>>> > > >>>>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. > > >>>>> La nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > >>>>> > > >>>>> [image: Logo Chronopost] > > >>>>> | chronopost.fr <http://www.chronopost.fr/> > > >>>>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et > > >>>>> Twitter <https://twitter.com/chronopost>. > > >>>>> > > >>>>> [image: DPD Group] > > >>>>> > > >>>> > > >>>> > > >>>> -- > > >>>> *Nicolas Carlot* > > >>>> > > >>>> Lead dev > > >>>> | | nicolas.car...@chronopost.fr > > >>>> > > >>>> > > >>>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. > > La > > >>>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > >>>> > > >>>> [image: Logo Chronopost] > > >>>> | chronopost.fr <http://www.chronopost.fr/> > > >>>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et > > >>>> Twitter <https://twitter.com/chronopost>. > > >>>> > > >>>> [image: DPD Group] > > >>>> > > >>> > > >>> > > >>> -- > > >>> *Nicolas Carlot* > > >>> > > >>> Lead dev > > >>> | | nicolas.car...@chronopost.fr > > >>> > > >>> > > >>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La > > >>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > >>> > > >>> [image: Logo Chronopost] > > >>> | chronopost.fr <http://www.chronopost.fr/> > > >>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et > > >>> Twitter <https://twitter.com/chronopost>. > > >>> > > >>> [image: DPD Group] > > >>> > > >> > > >> > > >> -- > > >> *Nicolas Carlot* > > >> > > >> Lead dev > > >> | | nicolas.car...@chronopost.fr > > >> > > >> > > >> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La > > >> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > >> > > >> [image: Logo Chronopost] > > >> | chronopost.fr <http://www.chronopost.fr/> > > >> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et > > >> Twitter <https://twitter.com/chronopost>. > > >> > > >> [image: DPD Group] > > >> > > > > > > > > > -- > > > *Nicolas Carlot* > > > > > > Lead dev > > > | | nicolas.car...@chronopost.fr > > > > > > > > > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La > > > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > > > > > [image: Logo Chronopost] > > > | chronopost.fr <http://www.chronopost.fr/> > > > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et > > > Twitter <https://twitter.com/chronopost>. > > > > > > [image: DPD Group] > > > > > > > > -- > *Nicolas Carlot* > > Lead dev > | | nicolas.car...@chronopost.fr > > > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > [image: Logo Chronopost] > | chronopost.fr <http://www.chronopost.fr/> > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et Twitter > <https://twitter.com/chronopost>. > > [image: DPD Group]