[
https://issues.apache.org/jira/browse/FLINK-39753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Keith Lee updated FLINK-39753:
------------------------------
Description:
h3. Summary
In
[Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
ColumnFamilyHandle.getDescriptor() allocates a new native ColumnFamilyOptions
on every JNI call (similar class of bug as FLINK-21986)
This causes two leaks:
1. Descriptor native objects
2. Whole LRUCache leak during even when all tasks are stopped on task manager.
If compactor has ran, the native memory used by LRUCache is never freed even
after all jobs have stopped on the TM. This is because these descriptors
transitively increments reference count on shared pointer and prevent LRUCache
from being freed. This leak is significant and has contributed to OOMKills in
our observations.
Here's the full chain with file paths and line numbers on how LRUCache leaks:
{quote}Compactor.compact()
flink-state-backends/.../sstmerge/Compactor.java:58
│ cfName.getDescriptor().getOptions()
▼
ColumnFamilyHandle.getDescriptor() → JNI call
frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
│
▼
Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
frocksdb/java/rocksjni/columnfamilyhandle.cc:52
│ cfh->GetDescriptor(&desc)
│ → ColumnFamilyDescriptorJni::construct(env, &desc)
▼
ColumnFamilyDescriptorJni::construct()
frocksdb/java/rocksjni/portal.h:6602
│ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
▼
ColumnFamilyOptionsJni::construct()
frocksdb/java/rocksjni/portal.h:2884
│ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY CONSTRUCTOR
▼
ColumnFamilyOptions (the copy)
frocksdb/include/rocksdb/options.h:303
│ std::shared_ptr<TableFactory> table_factory; ← shared_ptr copied,
refcount++
▼
BlockBasedTableFactory (same instance, not a copy)
frocksdb/table/block_based/block_based_table_factory.h:93
│ BlockBasedTableOptions table_options_; ← value member of
the factory
▼
BlockBasedTableOptions
frocksdb/include/rocksdb/table.h:266
│ std::shared_ptr<Cache> block_cache; ← the LRUCache
▼
LRUCache (the single shared instance)
flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
{quote}
h3. Verification of OOMKill and fix
The fix for this issue is to close the options object as follows:
{{{}---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
{{@@ -19,6 +19,7 @@}}
{{ package org.apache.flink.state.rocksdb.sstmerge;}}
{{ }}
{{ import org.rocksdb.ColumnFamilyHandle;}}
{{+import org.rocksdb.ColumnFamilyOptions;}}
{{ import org.rocksdb.CompactionJobInfo;}}
{{ import org.rocksdb.CompactionOptions;}}
{{ import org.rocksdb.RocksDB;}}
{{@@ -51,7 +52,15 @@ class Compactor {}}
{{ }}}
{{ }}
{{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
throws RocksDBException {}}
{{- int outputLevel = Math.min(level + 1,
cfName.getDescriptor().getOptions().numLevels() - 1);}}
{{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
ColumnFamilyOptions on every}}
{{+ // call (it copies the column family's options across JNI) and does
not close it. Leaking}}
{{+ // it also bumps the reference count on the shared block cache's
shared_ptr, preventing the}}
{{+ // cache from ever being freed (same class of leak as FLINK-21986).
Close it once we have}}
{{+ // read numLevels().}}
{{+ final int outputLevel;}}
{{+ try (ColumnFamilyOptions cfOptions =
cfName.getDescriptor().getOptions()) {}}
{{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
{{+ }}}
{{ LOG.debug(}}
{{ "Manually compacting {} files from level {} to {}: {}",}}
{{ files.size(),}}
Without closing of ColumnFamilyOptions, TM running in session mode reached
~1.54GB in ReadBlockContents after all job was stopped and resubmitted,
exceeding configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
Total: [2280777636|tel:2280777636] B
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
[2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
prof_backtrace_impl
0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% rocksdb::DBIter::Seek
0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::Seek
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressBlockData
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.9% [519018670|tel:519018670] 22.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::Next
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::NextAndGetResult
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::Next
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::NextAndGetResult
0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
0 0.0% 98.9% [102717059|tel:102717059] 4.5%
{_}{{_}}GI{{_}}{_}_clone3
0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
0 0.0% 98.9% [92210334|tel:92210334] 4.0%
KlassFactory::create_from_stream
...
{quote}
With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent with
configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
Total: [1416132765|tel:1416132765] B
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
[1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
prof_backtrace_impl
0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
0 0.0% 98.2% [820714114|tel:820714114] 58.0%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.2% [810027091|tel:810027091] 57.2%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.2% [809508356|tel:809508356] 57.2%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.2% [809438634|tel:809438634] 57.2%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.2% [767651473|tel:767651473] 54.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.2% [733060136|tel:733060136] 51.8%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressBlockData
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
0 0.0% 98.2% [124409905|tel:124409905] 8.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.2% [91613963|tel:91613963] 6.5%
KlassFactory::create_from_stream
...
{quote}
See here for reproduction steps to trigger OOMKill by starting, stopping Flink
job multiple times:
[https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
(note, the configuration in these reproduction have higher managed memory
fraction to more quickly trigger OOMKill).
was:
h3. Summary
In
[Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
ColumnFamilyHandle.getDescriptor() allocates a new native ColumnFamilyOptions
on every JNI call (similar class of bug as FLINK-21986)
This causes two leaks:
1. Descriptor native objects
2. Whole LRUCache leak during even when all tasks are stopped on task manager.
If compactor has ran, the native memory used by LRUCache is never freed even
after all jobs have stopped on the TM. This is because these descriptors
transitively increments reference count on shared pointer and prevent LRUCache
from being freed. This leak is significant and has contributed to OOMKills in
our observations.
Here's the full chain with file paths and line numbers on how LRUCache leaks:
{quote}Compactor.compact()
flink-state-backends/.../sstmerge/Compactor.java:58
│ cfName.getDescriptor().getOptions()
▼
ColumnFamilyHandle.getDescriptor() → JNI call
frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
│
▼
Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
frocksdb/java/rocksjni/columnfamilyhandle.cc:52
│ cfh->GetDescriptor(&desc)
│ → ColumnFamilyDescriptorJni::construct(env, &desc)
▼
ColumnFamilyDescriptorJni::construct()
frocksdb/java/rocksjni/portal.h:6602
│ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
▼
ColumnFamilyOptionsJni::construct()
frocksdb/java/rocksjni/portal.h:2884
│ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY CONSTRUCTOR
▼
ColumnFamilyOptions (the copy)
frocksdb/include/rocksdb/options.h:303
│ std::shared_ptr<TableFactory> table_factory; ← shared_ptr copied,
refcount++
▼
BlockBasedTableFactory (same instance, not a copy)
frocksdb/table/block_based/block_based_table_factory.h:93
│ BlockBasedTableOptions table_options_; ← value member of
the factory
▼
BlockBasedTableOptions
frocksdb/include/rocksdb/table.h:266
│ std::shared_ptr<Cache> block_cache; ← the LRUCache
▼
LRUCache (the single shared instance)
flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
{quote}
h3. Verification of OOMKill and fix
The fix for this issue is to close the options object as follows:
{{{}---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
{{@@ -19,6 +19,7 @@}}
{{ package org.apache.flink.state.rocksdb.sstmerge;}}
{{ }}
{{ import org.rocksdb.ColumnFamilyHandle;}}
{{+import org.rocksdb.ColumnFamilyOptions;}}
{{ import org.rocksdb.CompactionJobInfo;}}
{{ import org.rocksdb.CompactionOptions;}}
{{ import org.rocksdb.RocksDB;}}
{{@@ -51,7 +52,15 @@ class Compactor {}}
{{ }}}
{{ }}
{{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
throws RocksDBException {}}
{{- int outputLevel = Math.min(level + 1,
cfName.getDescriptor().getOptions().numLevels() - 1);}}
{{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
ColumnFamilyOptions on every}}
{{+ // call (it copies the column family's options across JNI) and does
not close it. Leaking}}
{{+ // it also bumps the reference count on the shared block cache's
shared_ptr, preventing the}}
{{+ // cache from ever being freed (same class of leak as FLINK-21986).
Close it once we have}}
{{+ // read numLevels().}}
{{+ final int outputLevel;}}
{{+ try (ColumnFamilyOptions cfOptions =
cfName.getDescriptor().getOptions()) {}}
{{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
{{+ }}}
{{ LOG.debug(}}
{{ "Manually compacting {} files from level {} to {}: {}",}}
{{ files.size(),}}
Without closing of ColumnFamilyOptions, TM with high RSS have ~1.54GB in
ReadBlockContents, exceeding configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
Total: [2280777636|tel:2280777636] B
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
[2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
prof_backtrace_impl
0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% rocksdb::DBIter::Seek
0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::Seek
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressBlockData
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.9% [519018670|tel:519018670] 22.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::Next
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::NextAndGetResult
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::Next
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::NextAndGetResult
0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
0 0.0% 98.9% [102717059|tel:102717059] 4.5%
{_}{{_}}GI{{_}}{_}_clone3
0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
0 0.0% 98.9% [92210334|tel:92210334] 4.0%
KlassFactory::create_from_stream
...
{quote}
With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent with
configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
Total: [1416132765|tel:1416132765] B
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
[1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
prof_backtrace_impl
0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
0 0.0% 98.2% [820714114|tel:820714114] 58.0%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.2% [810027091|tel:810027091] 57.2%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.2% [809508356|tel:809508356] 57.2%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.2% [809438634|tel:809438634] 57.2%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.2% [767651473|tel:767651473] 54.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.2% [733060136|tel:733060136] 51.8%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressBlockData
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
0 0.0% 98.2% [124409905|tel:124409905] 8.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.2% [91613963|tel:91613963] 6.5%
KlassFactory::create_from_stream
...
{quote}
See here for reproduction steps to trigger OOMKill by starting, stopping Flink
job multiple times:
[https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
(note, the configuration in these reproduction have higher managed memory
fraction to more quickly trigger OOMKill).
> RocksDB ColumnFamilyOptions and LRUCache leak in Compactor
> ----------------------------------------------------------
>
> Key: FLINK-39753
> URL: https://issues.apache.org/jira/browse/FLINK-39753
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.20.4, 2.1.2, 2.0.2, 2.2.1
> Reporter: Keith Lee
> Priority: Critical
> Labels: pull-request-available
>
> h3. Summary
> In
> [Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
> ColumnFamilyHandle.getDescriptor() allocates a new native
> ColumnFamilyOptions on every JNI call (similar class of bug as FLINK-21986)
> This causes two leaks:
> 1. Descriptor native objects
> 2. Whole LRUCache leak during even when all tasks are stopped on task
> manager. If compactor has ran, the native memory used by LRUCache is never
> freed even after all jobs have stopped on the TM. This is because these
> descriptors transitively increments reference count on shared pointer and
> prevent LRUCache from being freed. This leak is significant and has
> contributed to OOMKills in our observations.
> Here's the full chain with file paths and line numbers on how LRUCache leaks:
> {quote}Compactor.compact()
> flink-state-backends/.../sstmerge/Compactor.java:58
> │ cfName.getDescriptor().getOptions()
> ▼
> ColumnFamilyHandle.getDescriptor() → JNI call
> frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
> │
> ▼
> Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
> frocksdb/java/rocksjni/columnfamilyhandle.cc:52
> │ cfh->GetDescriptor(&desc)
> │ → ColumnFamilyDescriptorJni::construct(env, &desc)
> ▼
> ColumnFamilyDescriptorJni::construct()
> frocksdb/java/rocksjni/portal.h:6602
> │ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
> ▼
> ColumnFamilyOptionsJni::construct()
> frocksdb/java/rocksjni/portal.h:2884
> │ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY
> CONSTRUCTOR
> ▼
> ColumnFamilyOptions (the copy)
> frocksdb/include/rocksdb/options.h:303
> │ std::shared_ptr<TableFactory> table_factory; ← shared_ptr
> copied, refcount++
> ▼
> BlockBasedTableFactory (same instance, not a copy)
> frocksdb/table/block_based/block_based_table_factory.h:93
> │ BlockBasedTableOptions table_options_; ← value member of
> the factory
> ▼
> BlockBasedTableOptions
> frocksdb/include/rocksdb/table.h:266
> │ std::shared_ptr<Cache> block_cache; ← the LRUCache
> ▼
> LRUCache (the single shared instance)
> flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
> new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
> {quote}
> h3. Verification of OOMKill and fix
> The fix for this issue is to close the options object as follows:
>
> {{{}---
> a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
>
> b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
> {{@@ -19,6 +19,7 @@}}
> {{ package org.apache.flink.state.rocksdb.sstmerge;}}
> {{ }}
> {{ import org.rocksdb.ColumnFamilyHandle;}}
> {{+import org.rocksdb.ColumnFamilyOptions;}}
> {{ import org.rocksdb.CompactionJobInfo;}}
> {{ import org.rocksdb.CompactionOptions;}}
> {{ import org.rocksdb.RocksDB;}}
> {{@@ -51,7 +52,15 @@ class Compactor {}}
> {{ }}}
> {{ }}
> {{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
> throws RocksDBException {}}
> {{- int outputLevel = Math.min(level + 1,
> cfName.getDescriptor().getOptions().numLevels() - 1);}}
> {{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
> ColumnFamilyOptions on every}}
> {{+ // call (it copies the column family's options across JNI) and
> does not close it. Leaking}}
> {{+ // it also bumps the reference count on the shared block cache's
> shared_ptr, preventing the}}
> {{+ // cache from ever being freed (same class of leak as
> FLINK-21986). Close it once we have}}
> {{+ // read numLevels().}}
> {{+ final int outputLevel;}}
> {{+ try (ColumnFamilyOptions cfOptions =
> cfName.getDescriptor().getOptions()) {}}
> {{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
> {{+ }}}
> {{ LOG.debug(}}
> {{ "Manually compacting {} files from level {} to {}: {}",}}
> {{ files.size(),}}
> Without closing of ColumnFamilyOptions, TM running in session mode reached
> ~1.54GB in ReadBlockContents after all job was stopped and resubmitted,
> exceeding configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
> Total: [2280777636|tel:2280777636] B
> 0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
> 0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
> [2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
> prof_backtrace_impl
> 0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
> 0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
> 0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
> rocksdb::BlockFetcher::ReadBlockContents
> 0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
> rocksdb::BlockBasedTable::NewDataBlockIterator
> 0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
> 0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
> 0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
> rocksdb::BlockBasedTableIterator::InitDataBlock
> 0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
> rocksdb::AtomicGroupReadBuffer::Clear
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
> Java_org_rocksdb_RocksIterator_seek0
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
> rocksdb::DBIter::Seek
> 0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
> rocksdb::BlockBasedTableIterator::SeekImpl
> 0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
> rocksdb::MergingIterator::Seek
> 0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
> rocksdb::MergingIterator::SeekImpl
> 0 0.0% 98.9% [651438249|tel:651438249] 28.6%
> rocksdb::UncompressBlockData
> 0 0.0% 98.9% [651438249|tel:651438249] 28.6%
> rocksdb::UncompressSerializedBlock
> 0 0.0% 98.9% [519018670|tel:519018670] 22.8%
> rocksdb::BlockBasedTableIterator::FindBlockForward
> 0 0.0% 98.9% [438749002|tel:438749002] 19.2%
> rocksdb::BlockBasedTableIterator::Next
> 0 0.0% 98.9% [438749002|tel:438749002] 19.2%
> rocksdb::BlockBasedTableIterator::NextAndGetResult
> 0 0.0% 98.9% [438587793|tel:438587793] 19.2%
> rocksdb::MergingIterator::Next
> 0 0.0% 98.9% [438587793|tel:438587793] 19.2%
> rocksdb::MergingIterator::NextAndGetResult
> 0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
> 0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
> 0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
> 0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
> 0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
> 0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
> 0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
> 0 0.0% 98.9% [102717059|tel:102717059] 4.5%
> {_}{{_}}GI{{_}}{_}_clone3
> 0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
> 0 0.0% 98.9% [92210334|tel:92210334] 4.0%
> KlassFactory::create_from_stream
> ...
> {quote}
> With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent
> with configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
> Total: [1416132765|tel:1416132765] B
> 0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
> 0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
> [1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
> prof_backtrace_impl
> 0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
> 0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
> 0 0.0% 98.2% [820714114|tel:820714114] 58.0%
> rocksdb::BlockFetcher::ReadBlockContents
> 0 0.0% 98.2% [810027091|tel:810027091] 57.2%
> rocksdb::BlockBasedTable::NewDataBlockIterator
> 0 0.0% 98.2% [809508356|tel:809508356] 57.2%
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
> 0 0.0% 98.2% [809438634|tel:809438634] 57.2%
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
> 0 0.0% 98.2% [767651473|tel:767651473] 54.2%
> rocksdb::BlockBasedTableIterator::InitDataBlock
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> Java_org_rocksdb_RocksIterator_seek0
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::BlockBasedTableIterator::SeekImpl
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::MergingIterator::Seek
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::MergingIterator::SeekImpl
> 0 0.0% 98.2% [733060136|tel:733060136] 51.8%
> rocksdb::AtomicGroupReadBuffer::Clear
> 0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
> 0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
> 0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
> 0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
> 0 0.0% 98.2% [135269945|tel:135269945] 9.6%
> rocksdb::UncompressBlockData
> 0 0.0% 98.2% [135269945|tel:135269945] 9.6%
> rocksdb::UncompressSerializedBlock
> 0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
> 0 0.0% 98.2% [124409905|tel:124409905] 8.8%
> rocksdb::BlockBasedTableIterator::FindBlockForward
> 0 0.0% 98.2% [91613963|tel:91613963] 6.5%
> KlassFactory::create_from_stream
> ...
> {quote}
> See here for reproduction steps to trigger OOMKill by starting, stopping
> Flink job multiple times:
> [https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
> (note, the configuration in these reproduction have higher managed memory
> fraction to more quickly trigger OOMKill).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)