Added some nasty code in kafka 2.4.1. Seems to work fine for now... From my understanding, the compaction process when restoring a store is only done to speed up things. So I guess this kind of "hack" isn't such a big deal ?
[image: image.png] Le mer. 1 avr. 2020 à 10:44, Nicolas Carlot <nicolas.car...@chronopost.fr> a écrit : > Here is the full configuration of Rocks. > > [image: image.png] > > Le mer. 1 avr. 2020 à 10:41, Nicolas Carlot <nicolas.car...@chronopost.fr> > a écrit : > >> It's not that I cannot turn on compaction. >> Compaction works fine. >> The issue is with the restoration process of the state store, which tries >> to compact the store to a single level: db.compactRange(columnFamily, true, >> 1, 0) before bulk loading the data. >> It automatically fails when I'm using FIFO compaction. >> >> >> Le mer. 1 avr. 2020 à 10:26, Nicolas Carlot <nicolas.car...@chronopost.fr> >> a écrit : >> >>> My current workaround is to completely delete the state store and >>> rebuild it from scratch. >>> >>> Le mar. 31 mars 2020 à 21:39, Boyang Chen <reluctanthero...@gmail.com> >>> a écrit : >>> >>>> Thanks Nicolas for the report, so are you suggesting that you couldn't >>>> turn >>>> on compactions for the state store? Is there a workaround? >>>> >>>> On Tue, Mar 31, 2020 at 9:54 AM Nicolas Carlot < >>>> nicolas.car...@chronopost.fr> >>>> wrote: >>>> >>>> > After some more testing and debugging, it seems that it is caused by >>>> the >>>> > compaction option I've configured for RocksDB. When removed >>>> everything is >>>> > fine... >>>> > The option is as follow: >>>> > >>>> > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); >>>> > fifoOptions.setMaxTableFilesSize(maxSize); >>>> > fifoOptions.setAllowCompaction(true); >>>> > options.setCompactionOptionsFIFO(fifoOptions); >>>> > options.setCompactionStyle(CompactionStyle.FIFO); >>>> > >>>> > Le mar. 31 mars 2020 à 16:27, Nicolas Carlot < >>>> nicolas.car...@chronopost.fr >>>> > > >>>> > a écrit : >>>> > >>>> > > Hello everyone, >>>> > > >>>> > > I'm currently facing an issue with RocksDb internal compaction >>>> process, >>>> > > which occurs when the local state store of several of my KafkaStream >>>> > > applications are being restored. This is sadly a huge concern as it >>>> > > completely discard resiliency over node failure as those often lead >>>> to a >>>> > > state store restoration. The only workaround I currently have is to >>>> > delete >>>> > > the local store to restore it from scratch. I'm using version 2.4.1 >>>> of >>>> > the >>>> > > Java libraries. >>>> > > >>>> > > The exception thrown by the KStream process is: >>>> > > org.apache.kafka.streams.errors.ProcessorStateException: Error while >>>> > range >>>> > > compacting during restoring store merge_store >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) >>>> > > [kafka-stream-router.jar:?] >>>> > > Caused by: org.rocksdb.RocksDBException: Target level exceeds >>>> number of >>>> > > levels >>>> > > at org.rocksdb.RocksDB.compactRange(Native Method) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > at >>>> > > >>>> > >>>> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) >>>> > > ~[kafka-stream-router.jar:?] >>>> > > ... 11 more >>>> > > >>>> > > Here is the state of partition 0_0 of one of those stores: >>>> > > -rw-r--r-- 1 java j2ee 148210568 31 mars 10:49 000139.sst >>>> > > -rw-r--r-- 1 java j2ee 21620385 31 mars 11:06 000184.sst >>>> > > -rw-r--r-- 1 java j2ee 0 31 mars 11:11 000198.log >>>> > > -rw-r--r-- 1 java j2ee 31602468 31 mars 11:31 000251.sst >>>> > > -rw-r--r-- 1 java j2ee 37856549 31 mars 12:00 000324.sst >>>> > > -rw-r--r-- 1 java j2ee 33498822 31 mars 12:26 000393.sst >>>> > > -rw-r--r-- 1 java j2ee 34368461 31 mars 12:49 000450.sst >>>> > > -rw-r--r-- 1 java j2ee 11371247 31 mars 12:55 000467.sst >>>> > > -rw-r--r-- 1 java j2ee 14356435 31 mars 13:04 000489.sst >>>> > > -rw-r--r-- 1 java j2ee 5858737 31 mars 13:05 000494.sst >>>> > > -rw-r--r-- 1 java j2ee 2545952 31 mars 14:08 000659.sst >>>> > > -rw-r--r-- 1 java j2ee 3187275 31 mars 15:27 000868.sst >>>> > > -rw-r--r-- 1 java j2ee 407017 31 mars 15:34 000885.sst >>>> > > -rw-r--r-- 1 java j2ee 590190 31 mars 15:45 000914.sst >>>> > > -rw-r--r-- 1 java j2ee 154471 31 mars 15:47 000919.sst >>>> > > -rw-r--r-- 1 java j2ee 139838 31 mars 15:49 000924.sst >>>> > > -rw-r--r-- 1 java j2ee 35058 31 mars 15:49 000925.sst >>>> > > -rw-r--r-- 1 java j2ee 33987 31 mars 15:50 000926.sst >>>> > > -rw-r--r-- 1 java j2ee 16 31 mars 11:11 CURRENT >>>> > > -rw-r--r-- 1 java j2ee 37 31 mars 10:33 IDENTITY >>>> > > -rw-r--r-- 1 java j2ee 0 31 mars 10:33 LOCK >>>> > > -rw-r--r-- 1 java j2ee 15340 31 mars 11:11 LOG >>>> > > -rw-r--r-- 1 java j2ee 15046 31 mars 10:33 >>>> LOG.old.1585643630145007 >>>> > > -rw-r--r-- 1 java j2ee 15290 31 mars 10:33 >>>> LOG.old.1585643826265995 >>>> > > -rw-r--r-- 1 java j2ee 15384 31 mars 10:37 >>>> LOG.old.1585645861692248 >>>> > > -rw-r--r-- 1 java j2ee 60767 31 mars 15:55 MANIFEST-000197 >>>> > > -rw-r--r-- 1 java j2ee 4857 31 mars 10:37 OPTIONS-000107 >>>> > > -rw-r--r-- 1 java j2ee 4857 31 mars 11:11 OPTIONS-000200 >>>> > > >>>> > > I can see that Kafka is running RocksDB's compaction with: >>>> > > public void toggleDbForBulkLoading() { >>>> > > try { >>>> > > db.compactRange(columnFamily, true, 1, 0); >>>> > > } catch (final RocksDBException e) { >>>> > > throw new ProcessorStateException("Error while range >>>> > > compacting during restoring store " + name, e); >>>> > > } >>>> > > } >>>> > > >>>> > > Seems related to an issue with RocksDB itself: >>>> > > https://github.com/facebook/rocksdb/issues/2734 >>>> > > But i'm not sure of this. >>>> > > Any help would be greatly appreciated here :) >>>> > > >>>> > >>>> > >>>> > -- >>>> > *Nicolas Carlot* >>>> > >>>> > Lead dev >>>> > | | nicolas.car...@chronopost.fr >>>> > >>>> > >>>> > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. >>>> La >>>> > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* >>>> > >>>> > [image: Logo Chronopost] >>>> > | chronopost.fr <http://www.chronopost.fr/> >>>> > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et >>>> > Twitter >>>> > <https://twitter.com/chronopost>. >>>> > >>>> > [image: DPD Group] >>>> > >>>> >>> >>> >>> -- >>> *Nicolas Carlot* >>> >>> Lead dev >>> | | nicolas.car...@chronopost.fr >>> >>> >>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La >>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* >>> >>> [image: Logo Chronopost] >>> | chronopost.fr <http://www.chronopost.fr/> >>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et >>> Twitter <https://twitter.com/chronopost>. >>> >>> [image: DPD Group] >>> >> >> >> -- >> *Nicolas Carlot* >> >> Lead dev >> | | nicolas.car...@chronopost.fr >> >> >> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La >> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* >> >> [image: Logo Chronopost] >> | chronopost.fr <http://www.chronopost.fr/> >> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et >> Twitter <https://twitter.com/chronopost>. >> >> [image: DPD Group] >> > > > -- > *Nicolas Carlot* > > Lead dev > | | nicolas.car...@chronopost.fr > > > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* > > [image: Logo Chronopost] > | chronopost.fr <http://www.chronopost.fr/> > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et > Twitter <https://twitter.com/chronopost>. > > [image: DPD Group] > -- *Nicolas Carlot* Lead dev | | nicolas.car...@chronopost.fr *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* [image: Logo Chronopost] | chronopost.fr <http://www.chronopost.fr/> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et Twitter <https://twitter.com/chronopost>. [image: DPD Group]