Hi Bruno,

"As far as I understand, the issue is that bulk loading as done in Kafka
Streams does work as expected if FIFO compaction is used."

You meant "doesn't" right ?

Ok, I will open a ticket, but I don't think my "fix" is the correct one.
Just ignoring the issue doesn't seem to be a correct solution :)

Le jeu. 16 avr. 2020 à 11:49, Bruno Cadonna <br...@confluent.io> a écrit :

> Hi Nicolas,
>
> Thank you for reporting this issue.
>
> As far as I understand, the issue is that bulk loading as done in Kafka
> Streams does work as expected if FIFO compaction is used.
>
> I would propose that you open a bug ticket. Please make sure to include
> steps to reproduce the issue in the ticket. Additionally, since you already
> have some code that solves the issue, you could assign the ticket to
> yourself and open a PR for it. Would you be interested in contributing?
>
> If you cannot assign the ticket to yourself, ask to be added to the list of
> contributors on the dev mailing list.
>
> Best,
> Bruno
>
> On Thu, Apr 16, 2020 at 11:06 AM Nicolas Carlot <
> nicolas.car...@chronopost.fr> wrote:
>
> > # Bump #
> > So is this a bug ? Should I file a ticket ?
> > Any idea ? I don't like the idea of having to patch Kafka libraries...
> >
> > Le mer. 1 avr. 2020 à 16:33, Nicolas Carlot <
> nicolas.car...@chronopost.fr>
> > a écrit :
> >
> >> Added some nasty code in kafka 2.4.1. Seems to work fine for now... From
> >> my understanding, the compaction process when restoring a store is only
> >> done to speed up things.
> >> So I guess this kind of "hack" isn't such a big deal ?
> >>
> >> [image: image.png]
> >>
> >> Le mer. 1 avr. 2020 à 10:44, Nicolas Carlot <
> nicolas.car...@chronopost.fr>
> >> a écrit :
> >>
> >>> Here is the full configuration of Rocks.
> >>>
> >>> [image: image.png]
> >>>
> >>> Le mer. 1 avr. 2020 à 10:41, Nicolas Carlot <
> >>> nicolas.car...@chronopost.fr> a écrit :
> >>>
> >>>> It's not that I cannot turn on compaction.
> >>>> Compaction works fine.
> >>>> The issue is with the restoration process of the state store, which
> >>>> tries to compact the store to a single level:
> db.compactRange(columnFamily,
> >>>> true, 1, 0) before bulk loading the data.
> >>>> It automatically fails when I'm using FIFO compaction.
> >>>>
> >>>>
> >>>> Le mer. 1 avr. 2020 à 10:26, Nicolas Carlot <
> >>>> nicolas.car...@chronopost.fr> a écrit :
> >>>>
> >>>>> My current workaround is to completely delete the state store and
> >>>>> rebuild it from scratch.
> >>>>>
> >>>>> Le mar. 31 mars 2020 à 21:39, Boyang Chen <
> reluctanthero...@gmail.com>
> >>>>> a écrit :
> >>>>>
> >>>>>> Thanks Nicolas for the report, so are you suggesting that you
> >>>>>> couldn't turn
> >>>>>> on compactions for the state store? Is there a workaround?
> >>>>>>
> >>>>>> On Tue, Mar 31, 2020 at 9:54 AM Nicolas Carlot <
> >>>>>> nicolas.car...@chronopost.fr>
> >>>>>> wrote:
> >>>>>>
> >>>>>> > After some more testing and debugging, it seems that it is caused
> >>>>>> by the
> >>>>>> > compaction option I've configured for RocksDB. When removed
> >>>>>> everything is
> >>>>>> > fine...
> >>>>>> > The option is as follow:
> >>>>>> >
> >>>>>> > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
> >>>>>> > fifoOptions.setMaxTableFilesSize(maxSize);
> >>>>>> > fifoOptions.setAllowCompaction(true);
> >>>>>> > options.setCompactionOptionsFIFO(fifoOptions);
> >>>>>> > options.setCompactionStyle(CompactionStyle.FIFO);
> >>>>>> >
> >>>>>> > Le mar. 31 mars 2020 à 16:27, Nicolas Carlot <
> >>>>>> nicolas.car...@chronopost.fr
> >>>>>> > >
> >>>>>> > a écrit :
> >>>>>> >
> >>>>>> > > Hello everyone,
> >>>>>> > >
> >>>>>> > > I'm currently facing an issue with RocksDb internal compaction
> >>>>>> process,
> >>>>>> > > which occurs when the local state store of several of my
> >>>>>> KafkaStream
> >>>>>> > > applications are being restored. This is sadly a huge concern as
> >>>>>> it
> >>>>>> > > completely discard resiliency over node failure as those often
> >>>>>> lead to a
> >>>>>> > > state store restoration. The only workaround I currently have is
> >>>>>> to
> >>>>>> > delete
> >>>>>> > > the local store to restore it from scratch. I'm using version
> >>>>>> 2.4.1 of
> >>>>>> > the
> >>>>>> > > Java libraries.
> >>>>>> > >
> >>>>>> > > The exception thrown by the KStream process is:
> >>>>>> > > org.apache.kafka.streams.errors.ProcessorStateException: Error
> >>>>>> while
> >>>>>> > range
> >>>>>> > > compacting during restoring  store merge_store
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> >>>>>> > > [kafka-stream-router.jar:?]
> >>>>>> > > Caused by: org.rocksdb.RocksDBException: Target level exceeds
> >>>>>> number of
> >>>>>> > > levels
> >>>>>> > >         at org.rocksdb.RocksDB.compactRange(Native Method)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         at
> >>>>>> > >
> >>>>>> >
> >>>>>>
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
> >>>>>> > > ~[kafka-stream-router.jar:?]
> >>>>>> > >         ... 11 more
> >>>>>> > >
> >>>>>> > > Here is the state of partition 0_0 of one of those stores:
> >>>>>> > > -rw-r--r-- 1 java j2ee 148210568 31 mars  10:49 000139.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee  21620385 31 mars  11:06 000184.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee         0 31 mars  11:11 000198.log
> >>>>>> > > -rw-r--r-- 1 java j2ee  31602468 31 mars  11:31 000251.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee  37856549 31 mars  12:00 000324.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee  33498822 31 mars  12:26 000393.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee  34368461 31 mars  12:49 000450.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee  11371247 31 mars  12:55 000467.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee  14356435 31 mars  13:04 000489.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee   5858737 31 mars  13:05 000494.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee   2545952 31 mars  14:08 000659.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee   3187275 31 mars  15:27 000868.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee    407017 31 mars  15:34 000885.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee    590190 31 mars  15:45 000914.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee    154471 31 mars  15:47 000919.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee    139838 31 mars  15:49 000924.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee     35058 31 mars  15:49 000925.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee     33987 31 mars  15:50 000926.sst
> >>>>>> > > -rw-r--r-- 1 java j2ee        16 31 mars  11:11 CURRENT
> >>>>>> > > -rw-r--r-- 1 java j2ee        37 31 mars  10:33 IDENTITY
> >>>>>> > > -rw-r--r-- 1 java j2ee         0 31 mars  10:33 LOCK
> >>>>>> > > -rw-r--r-- 1 java j2ee     15340 31 mars  11:11 LOG
> >>>>>> > > -rw-r--r-- 1 java j2ee     15046 31 mars  10:33
> >>>>>> LOG.old.1585643630145007
> >>>>>> > > -rw-r--r-- 1 java j2ee     15290 31 mars  10:33
> >>>>>> LOG.old.1585643826265995
> >>>>>> > > -rw-r--r-- 1 java j2ee     15384 31 mars  10:37
> >>>>>> LOG.old.1585645861692248
> >>>>>> > > -rw-r--r-- 1 java j2ee     60767 31 mars  15:55 MANIFEST-000197
> >>>>>> > > -rw-r--r-- 1 java j2ee      4857 31 mars  10:37 OPTIONS-000107
> >>>>>> > > -rw-r--r-- 1 java j2ee      4857 31 mars  11:11 OPTIONS-000200
> >>>>>> > >
> >>>>>> > > I can see that Kafka is running RocksDB's compaction with:
> >>>>>> > > public void toggleDbForBulkLoading() {
> >>>>>> > >             try {
> >>>>>> > >                 db.compactRange(columnFamily, true, 1, 0);
> >>>>>> > >             } catch (final RocksDBException e) {
> >>>>>> > >                 throw new ProcessorStateException("Error while
> >>>>>> range
> >>>>>> > > compacting during restoring  store " + name, e);
> >>>>>> > >             }
> >>>>>> > >         }
> >>>>>> > >
> >>>>>> > > Seems related to an issue with RocksDB itself:
> >>>>>> > > https://github.com/facebook/rocksdb/issues/2734
> >>>>>> > > But i'm not sure of this.
> >>>>>> > > Any help would be greatly appreciated here :)
> >>>>>> > >
> >>>>>> >
> >>>>>> >
> >>>>>> > --
> >>>>>> > *Nicolas Carlot*
> >>>>>> >
> >>>>>> > Lead dev
> >>>>>> > |  | nicolas.car...@chronopost.fr
> >>>>>> >
> >>>>>> >
> >>>>>> > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost
> >>>>>> déménage. La
> >>>>>> > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> >>>>>> >
> >>>>>> > [image: Logo Chronopost]
> >>>>>> > | chronopost.fr <http://www.chronopost.fr/>
> >>>>>> > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost>
> et
> >>>>>> > Twitter
> >>>>>> > <https://twitter.com/chronopost>.
> >>>>>> >
> >>>>>> > [image: DPD Group]
> >>>>>> >
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> *Nicolas Carlot*
> >>>>>
> >>>>> Lead dev
> >>>>> |  | nicolas.car...@chronopost.fr
> >>>>>
> >>>>>
> >>>>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage.
> >>>>> La nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> >>>>>
> >>>>> [image: Logo Chronopost]
> >>>>> | chronopost.fr <http://www.chronopost.fr/>
> >>>>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
> >>>>> Twitter <https://twitter.com/chronopost>.
> >>>>>
> >>>>> [image: DPD Group]
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> *Nicolas Carlot*
> >>>>
> >>>> Lead dev
> >>>> |  | nicolas.car...@chronopost.fr
> >>>>
> >>>>
> >>>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage.
> La
> >>>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> >>>>
> >>>> [image: Logo Chronopost]
> >>>> | chronopost.fr <http://www.chronopost.fr/>
> >>>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
> >>>> Twitter <https://twitter.com/chronopost>.
> >>>>
> >>>> [image: DPD Group]
> >>>>
> >>>
> >>>
> >>> --
> >>> *Nicolas Carlot*
> >>>
> >>> Lead dev
> >>> |  | nicolas.car...@chronopost.fr
> >>>
> >>>
> >>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
> >>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> >>>
> >>> [image: Logo Chronopost]
> >>> | chronopost.fr <http://www.chronopost.fr/>
> >>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
> >>> Twitter <https://twitter.com/chronopost>.
> >>>
> >>> [image: DPD Group]
> >>>
> >>
> >>
> >> --
> >> *Nicolas Carlot*
> >>
> >> Lead dev
> >> |  | nicolas.car...@chronopost.fr
> >>
> >>
> >> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
> >> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> >>
> >> [image: Logo Chronopost]
> >> | chronopost.fr <http://www.chronopost.fr/>
> >> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
> >> Twitter <https://twitter.com/chronopost>.
> >>
> >> [image: DPD Group]
> >>
> >
> >
> > --
> > *Nicolas Carlot*
> >
> > Lead dev
> > |  | nicolas.car...@chronopost.fr
> >
> >
> > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
> > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> >
> > [image: Logo Chronopost]
> > | chronopost.fr <http://www.chronopost.fr/>
> > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
> > Twitter <https://twitter.com/chronopost>.
> >
> > [image: DPD Group]
> >
>


-- 
*Nicolas Carlot*

Lead dev
|  | nicolas.car...@chronopost.fr


*Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*

[image: Logo Chronopost]
| chronopost.fr <http://www.chronopost.fr/>
Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et Twitter
<https://twitter.com/chronopost>.

[image: DPD Group]

Reply via email to