Hi Nicolas,

Thank you for reporting this issue.

As far as I understand, the issue is that bulk loading as done in Kafka
Streams does work as expected if FIFO compaction is used.

I would propose that you open a bug ticket. Please make sure to include
steps to reproduce the issue in the ticket. Additionally, since you already
have some code that solves the issue, you could assign the ticket to
yourself and open a PR for it. Would you be interested in contributing?

If you cannot assign the ticket to yourself, ask to be added to the list of
contributors on the dev mailing list.

Best,
Bruno

On Thu, Apr 16, 2020 at 11:06 AM Nicolas Carlot <
nicolas.car...@chronopost.fr> wrote:

> # Bump #
> So is this a bug ? Should I file a ticket ?
> Any idea ? I don't like the idea of having to patch Kafka libraries...
>
> Le mer. 1 avr. 2020 à 16:33, Nicolas Carlot <nicolas.car...@chronopost.fr>
> a écrit :
>
>> Added some nasty code in kafka 2.4.1. Seems to work fine for now... From
>> my understanding, the compaction process when restoring a store is only
>> done to speed up things.
>> So I guess this kind of "hack" isn't such a big deal ?
>>
>> [image: image.png]
>>
>> Le mer. 1 avr. 2020 à 10:44, Nicolas Carlot <nicolas.car...@chronopost.fr>
>> a écrit :
>>
>>> Here is the full configuration of Rocks.
>>>
>>> [image: image.png]
>>>
>>> Le mer. 1 avr. 2020 à 10:41, Nicolas Carlot <
>>> nicolas.car...@chronopost.fr> a écrit :
>>>
>>>> It's not that I cannot turn on compaction.
>>>> Compaction works fine.
>>>> The issue is with the restoration process of the state store, which
>>>> tries to compact the store to a single level: db.compactRange(columnFamily,
>>>> true, 1, 0) before bulk loading the data.
>>>> It automatically fails when I'm using FIFO compaction.
>>>>
>>>>
>>>> Le mer. 1 avr. 2020 à 10:26, Nicolas Carlot <
>>>> nicolas.car...@chronopost.fr> a écrit :
>>>>
>>>>> My current workaround is to completely delete the state store and
>>>>> rebuild it from scratch.
>>>>>
>>>>> Le mar. 31 mars 2020 à 21:39, Boyang Chen <reluctanthero...@gmail.com>
>>>>> a écrit :
>>>>>
>>>>>> Thanks Nicolas for the report, so are you suggesting that you
>>>>>> couldn't turn
>>>>>> on compactions for the state store? Is there a workaround?
>>>>>>
>>>>>> On Tue, Mar 31, 2020 at 9:54 AM Nicolas Carlot <
>>>>>> nicolas.car...@chronopost.fr>
>>>>>> wrote:
>>>>>>
>>>>>> > After some more testing and debugging, it seems that it is caused
>>>>>> by the
>>>>>> > compaction option I've configured for RocksDB. When removed
>>>>>> everything is
>>>>>> > fine...
>>>>>> > The option is as follow:
>>>>>> >
>>>>>> > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
>>>>>> > fifoOptions.setMaxTableFilesSize(maxSize);
>>>>>> > fifoOptions.setAllowCompaction(true);
>>>>>> > options.setCompactionOptionsFIFO(fifoOptions);
>>>>>> > options.setCompactionStyle(CompactionStyle.FIFO);
>>>>>> >
>>>>>> > Le mar. 31 mars 2020 à 16:27, Nicolas Carlot <
>>>>>> nicolas.car...@chronopost.fr
>>>>>> > >
>>>>>> > a écrit :
>>>>>> >
>>>>>> > > Hello everyone,
>>>>>> > >
>>>>>> > > I'm currently facing an issue with RocksDb internal compaction
>>>>>> process,
>>>>>> > > which occurs when the local state store of several of my
>>>>>> KafkaStream
>>>>>> > > applications are being restored. This is sadly a huge concern as
>>>>>> it
>>>>>> > > completely discard resiliency over node failure as those often
>>>>>> lead to a
>>>>>> > > state store restoration. The only workaround I currently have is
>>>>>> to
>>>>>> > delete
>>>>>> > > the local store to restore it from scratch. I'm using version
>>>>>> 2.4.1 of
>>>>>> > the
>>>>>> > > Java libraries.
>>>>>> > >
>>>>>> > > The exception thrown by the KStream process is:
>>>>>> > > org.apache.kafka.streams.errors.ProcessorStateException: Error
>>>>>> while
>>>>>> > range
>>>>>> > > compacting during restoring  store merge_store
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
>>>>>> > > [kafka-stream-router.jar:?]
>>>>>> > > Caused by: org.rocksdb.RocksDBException: Target level exceeds
>>>>>> number of
>>>>>> > > levels
>>>>>> > >         at org.rocksdb.RocksDB.compactRange(Native Method)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         at
>>>>>> > >
>>>>>> >
>>>>>> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
>>>>>> > > ~[kafka-stream-router.jar:?]
>>>>>> > >         ... 11 more
>>>>>> > >
>>>>>> > > Here is the state of partition 0_0 of one of those stores:
>>>>>> > > -rw-r--r-- 1 java j2ee 148210568 31 mars  10:49 000139.sst
>>>>>> > > -rw-r--r-- 1 java j2ee  21620385 31 mars  11:06 000184.sst
>>>>>> > > -rw-r--r-- 1 java j2ee         0 31 mars  11:11 000198.log
>>>>>> > > -rw-r--r-- 1 java j2ee  31602468 31 mars  11:31 000251.sst
>>>>>> > > -rw-r--r-- 1 java j2ee  37856549 31 mars  12:00 000324.sst
>>>>>> > > -rw-r--r-- 1 java j2ee  33498822 31 mars  12:26 000393.sst
>>>>>> > > -rw-r--r-- 1 java j2ee  34368461 31 mars  12:49 000450.sst
>>>>>> > > -rw-r--r-- 1 java j2ee  11371247 31 mars  12:55 000467.sst
>>>>>> > > -rw-r--r-- 1 java j2ee  14356435 31 mars  13:04 000489.sst
>>>>>> > > -rw-r--r-- 1 java j2ee   5858737 31 mars  13:05 000494.sst
>>>>>> > > -rw-r--r-- 1 java j2ee   2545952 31 mars  14:08 000659.sst
>>>>>> > > -rw-r--r-- 1 java j2ee   3187275 31 mars  15:27 000868.sst
>>>>>> > > -rw-r--r-- 1 java j2ee    407017 31 mars  15:34 000885.sst
>>>>>> > > -rw-r--r-- 1 java j2ee    590190 31 mars  15:45 000914.sst
>>>>>> > > -rw-r--r-- 1 java j2ee    154471 31 mars  15:47 000919.sst
>>>>>> > > -rw-r--r-- 1 java j2ee    139838 31 mars  15:49 000924.sst
>>>>>> > > -rw-r--r-- 1 java j2ee     35058 31 mars  15:49 000925.sst
>>>>>> > > -rw-r--r-- 1 java j2ee     33987 31 mars  15:50 000926.sst
>>>>>> > > -rw-r--r-- 1 java j2ee        16 31 mars  11:11 CURRENT
>>>>>> > > -rw-r--r-- 1 java j2ee        37 31 mars  10:33 IDENTITY
>>>>>> > > -rw-r--r-- 1 java j2ee         0 31 mars  10:33 LOCK
>>>>>> > > -rw-r--r-- 1 java j2ee     15340 31 mars  11:11 LOG
>>>>>> > > -rw-r--r-- 1 java j2ee     15046 31 mars  10:33
>>>>>> LOG.old.1585643630145007
>>>>>> > > -rw-r--r-- 1 java j2ee     15290 31 mars  10:33
>>>>>> LOG.old.1585643826265995
>>>>>> > > -rw-r--r-- 1 java j2ee     15384 31 mars  10:37
>>>>>> LOG.old.1585645861692248
>>>>>> > > -rw-r--r-- 1 java j2ee     60767 31 mars  15:55 MANIFEST-000197
>>>>>> > > -rw-r--r-- 1 java j2ee      4857 31 mars  10:37 OPTIONS-000107
>>>>>> > > -rw-r--r-- 1 java j2ee      4857 31 mars  11:11 OPTIONS-000200
>>>>>> > >
>>>>>> > > I can see that Kafka is running RocksDB's compaction with:
>>>>>> > > public void toggleDbForBulkLoading() {
>>>>>> > >             try {
>>>>>> > >                 db.compactRange(columnFamily, true, 1, 0);
>>>>>> > >             } catch (final RocksDBException e) {
>>>>>> > >                 throw new ProcessorStateException("Error while
>>>>>> range
>>>>>> > > compacting during restoring  store " + name, e);
>>>>>> > >             }
>>>>>> > >         }
>>>>>> > >
>>>>>> > > Seems related to an issue with RocksDB itself:
>>>>>> > > https://github.com/facebook/rocksdb/issues/2734
>>>>>> > > But i'm not sure of this.
>>>>>> > > Any help would be greatly appreciated here :)
>>>>>> > >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > *Nicolas Carlot*
>>>>>> >
>>>>>> > Lead dev
>>>>>> > |  | nicolas.car...@chronopost.fr
>>>>>> >
>>>>>> >
>>>>>> > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost
>>>>>> déménage. La
>>>>>> > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
>>>>>> >
>>>>>> > [image: Logo Chronopost]
>>>>>> > | chronopost.fr <http://www.chronopost.fr/>
>>>>>> > Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
>>>>>> > Twitter
>>>>>> > <https://twitter.com/chronopost>.
>>>>>> >
>>>>>> > [image: DPD Group]
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> *Nicolas Carlot*
>>>>>
>>>>> Lead dev
>>>>> |  | nicolas.car...@chronopost.fr
>>>>>
>>>>>
>>>>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage.
>>>>> La nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
>>>>>
>>>>> [image: Logo Chronopost]
>>>>> | chronopost.fr <http://www.chronopost.fr/>
>>>>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
>>>>> Twitter <https://twitter.com/chronopost>.
>>>>>
>>>>> [image: DPD Group]
>>>>>
>>>>
>>>>
>>>> --
>>>> *Nicolas Carlot*
>>>>
>>>> Lead dev
>>>> |  | nicolas.car...@chronopost.fr
>>>>
>>>>
>>>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
>>>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
>>>>
>>>> [image: Logo Chronopost]
>>>> | chronopost.fr <http://www.chronopost.fr/>
>>>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
>>>> Twitter <https://twitter.com/chronopost>.
>>>>
>>>> [image: DPD Group]
>>>>
>>>
>>>
>>> --
>>> *Nicolas Carlot*
>>>
>>> Lead dev
>>> |  | nicolas.car...@chronopost.fr
>>>
>>>
>>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
>>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
>>>
>>> [image: Logo Chronopost]
>>> | chronopost.fr <http://www.chronopost.fr/>
>>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
>>> Twitter <https://twitter.com/chronopost>.
>>>
>>> [image: DPD Group]
>>>
>>
>>
>> --
>> *Nicolas Carlot*
>>
>> Lead dev
>> |  | nicolas.car...@chronopost.fr
>>
>>
>> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
>> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
>>
>> [image: Logo Chronopost]
>> | chronopost.fr <http://www.chronopost.fr/>
>> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
>> Twitter <https://twitter.com/chronopost>.
>>
>> [image: DPD Group]
>>
>
>
> --
> *Nicolas Carlot*
>
> Lead dev
> |  | nicolas.car...@chronopost.fr
>
>
> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
>
> [image: Logo Chronopost]
> | chronopost.fr <http://www.chronopost.fr/>
> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et
> Twitter <https://twitter.com/chronopost>.
>
> [image: DPD Group]
>

Reply via email to