Hello everyone,

I'm currently facing an issue with RocksDb internal compaction process,
which occurs when the local state store of several of my KafkaStream
applications are being restored. This is sadly a huge concern as it
completely discard resiliency over node failure as those often lead to a
state store restoration. The only workaround I currently have is to delete
the local store to restore it from scratch. I'm using version 2.4.1 of the
Java libraries.

The exception thrown by the KStream process is:
org.apache.kafka.streams.errors.ProcessorStateException: Error while range
compacting during restoring  store merge_store
        at
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
[kafka-stream-router.jar:?]
Caused by: org.rocksdb.RocksDBException: Target level exceeds number of
levels
        at org.rocksdb.RocksDB.compactRange(Native Method)
~[kafka-stream-router.jar:?]
        at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636)
~[kafka-stream-router.jar:?]
        at
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613)
~[kafka-stream-router.jar:?]
        ... 11 more

Here is the state of partition 0_0 of one of those stores:
-rw-r--r-- 1 java j2ee 148210568 31 mars  10:49 000139.sst
-rw-r--r-- 1 java j2ee  21620385 31 mars  11:06 000184.sst
-rw-r--r-- 1 java j2ee         0 31 mars  11:11 000198.log
-rw-r--r-- 1 java j2ee  31602468 31 mars  11:31 000251.sst
-rw-r--r-- 1 java j2ee  37856549 31 mars  12:00 000324.sst
-rw-r--r-- 1 java j2ee  33498822 31 mars  12:26 000393.sst
-rw-r--r-- 1 java j2ee  34368461 31 mars  12:49 000450.sst
-rw-r--r-- 1 java j2ee  11371247 31 mars  12:55 000467.sst
-rw-r--r-- 1 java j2ee  14356435 31 mars  13:04 000489.sst
-rw-r--r-- 1 java j2ee   5858737 31 mars  13:05 000494.sst
-rw-r--r-- 1 java j2ee   2545952 31 mars  14:08 000659.sst
-rw-r--r-- 1 java j2ee   3187275 31 mars  15:27 000868.sst
-rw-r--r-- 1 java j2ee    407017 31 mars  15:34 000885.sst
-rw-r--r-- 1 java j2ee    590190 31 mars  15:45 000914.sst
-rw-r--r-- 1 java j2ee    154471 31 mars  15:47 000919.sst
-rw-r--r-- 1 java j2ee    139838 31 mars  15:49 000924.sst
-rw-r--r-- 1 java j2ee     35058 31 mars  15:49 000925.sst
-rw-r--r-- 1 java j2ee     33987 31 mars  15:50 000926.sst
-rw-r--r-- 1 java j2ee        16 31 mars  11:11 CURRENT
-rw-r--r-- 1 java j2ee        37 31 mars  10:33 IDENTITY
-rw-r--r-- 1 java j2ee         0 31 mars  10:33 LOCK
-rw-r--r-- 1 java j2ee     15340 31 mars  11:11 LOG
-rw-r--r-- 1 java j2ee     15046 31 mars  10:33 LOG.old.1585643630145007
-rw-r--r-- 1 java j2ee     15290 31 mars  10:33 LOG.old.1585643826265995
-rw-r--r-- 1 java j2ee     15384 31 mars  10:37 LOG.old.1585645861692248
-rw-r--r-- 1 java j2ee     60767 31 mars  15:55 MANIFEST-000197
-rw-r--r-- 1 java j2ee      4857 31 mars  10:37 OPTIONS-000107
-rw-r--r-- 1 java j2ee      4857 31 mars  11:11 OPTIONS-000200

I can see that Kafka is running RocksDB's compaction with:
public void toggleDbForBulkLoading() {
            try {
                db.compactRange(columnFamily, true, 1, 0);
            } catch (final RocksDBException e) {
                throw new ProcessorStateException("Error while range
compacting during restoring  store " + name, e);
            }
        }

Seems related to an issue with RocksDB itself:
https://github.com/facebook/rocksdb/issues/2734
But i'm not sure of this.
Any help would be greatly appreciated here :)

Reply via email to