[
https://issues.apache.org/jira/browse/FLINK-39753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Keith Lee updated FLINK-39753:
------------------------------
Description:
h3. Summary
In
[Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
ColumnFamilyHandle.getDescriptor() allocates a new native ColumnFamilyOptions
on every JNI call (similar class of bug as FLINK-21986)
This causes two leaks:
1. Descriptor native objects
2. Whole LRUCache leak during even when all tasks are stopped on task manager.
If compactor has ran, the native memory used by LRUCache is never freed even
after all jobs have stopped on the TM. This is because these descriptors
transitively increments reference count on shared pointer and prevent LRUCache
from being freed.
Here's the full chain with file paths and line numbers on how LRUCache leaks:
{quote}Compactor.compact()
flink-state-backends/.../sstmerge/Compactor.java:58
│ cfName.getDescriptor().getOptions()
▼
ColumnFamilyHandle.getDescriptor() → JNI call
frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
│
▼
Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
frocksdb/java/rocksjni/columnfamilyhandle.cc:52
│ cfh->GetDescriptor(&desc)
│ → ColumnFamilyDescriptorJni::construct(env, &desc)
▼
ColumnFamilyDescriptorJni::construct()
frocksdb/java/rocksjni/portal.h:6602
│ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
▼
ColumnFamilyOptionsJni::construct()
frocksdb/java/rocksjni/portal.h:2884
│ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY CONSTRUCTOR
▼
ColumnFamilyOptions (the copy)
frocksdb/include/rocksdb/options.h:303
│ std::shared_ptr<TableFactory> table_factory; ← shared_ptr copied,
refcount++
▼
BlockBasedTableFactory (same instance, not a copy)
frocksdb/table/block_based/block_based_table_factory.h:93
│ BlockBasedTableOptions table_options_; ← value member of
the factory
▼
BlockBasedTableOptions
frocksdb/include/rocksdb/table.h:266
│ std::shared_ptr<Cache> block_cache; ← the LRUCache
▼
LRUCache (the single shared instance)
flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
{quote}
h3. Verification of OOMKill and fix
The fix for this issue is to close the options object as follows:
{{{}---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
{{@@ -19,6 +19,7 @@}}
{{ package org.apache.flink.state.rocksdb.sstmerge;}}
{{ }}
{{ import org.rocksdb.ColumnFamilyHandle;}}
{{+import org.rocksdb.ColumnFamilyOptions;}}
{{ import org.rocksdb.CompactionJobInfo;}}
{{ import org.rocksdb.CompactionOptions;}}
{{ import org.rocksdb.RocksDB;}}
{{@@ -51,7 +52,15 @@ class Compactor {}}
{{ }}}
{{ }}
{{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
throws RocksDBException {}}
{{- int outputLevel = Math.min(level + 1,
cfName.getDescriptor().getOptions().numLevels() - 1);}}
{{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
ColumnFamilyOptions on every}}
{{+ // call (it copies the column family's options across JNI) and does
not close it. Leaking}}
{{+ // it also bumps the reference count on the shared block cache's
shared_ptr, preventing the}}
{{+ // cache from ever being freed (same class of leak as FLINK-21986).
Close it once we have}}
{{+ // read numLevels().}}
{{+ final int outputLevel;}}
{{+ try (ColumnFamilyOptions cfOptions =
cfName.getDescriptor().getOptions()) {}}
{{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
{{+ }}}
{{ LOG.debug(}}
{{ "Manually compacting {} files from level {} to {}: {}",}}
{{ files.size(),}}
Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task
managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS
is at 2.8 GB
Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents,
exceeding configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
Total: [2280777636|tel:2280777636] B
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
[2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
prof_backtrace_impl
0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% rocksdb::DBIter::Seek
0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::Seek
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressBlockData
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.9% [519018670|tel:519018670] 22.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::Next
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::NextAndGetResult
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::Next
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::NextAndGetResult
0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
0 0.0% 98.9% [102717059|tel:102717059] 4.5%
{_}{{_}}GI{{_}}{_}_clone3
0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
0 0.0% 98.9% [92210334|tel:92210334] 4.0%
KlassFactory::create_from_stream
...
{quote}
With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent with
configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
Total: [1416132765|tel:1416132765] B
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
[1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
prof_backtrace_impl
0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
0 0.0% 98.2% [820714114|tel:820714114] 58.0%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.2% [810027091|tel:810027091] 57.2%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.2% [809508356|tel:809508356] 57.2%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.2% [809438634|tel:809438634] 57.2%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.2% [767651473|tel:767651473] 54.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.2% [733060136|tel:733060136] 51.8%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressBlockData
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
0 0.0% 98.2% [124409905|tel:124409905] 8.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.2% [91613963|tel:91613963] 6.5%
KlassFactory::create_from_stream
...
{quote}
See here for reproduction steps to trigger OOMKill by starting, stopping Flink
job multiple times:
[https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
(note, the configuration in these reproduction have higher managed memory
fraction to more quickly trigger OOMKill).
was:
h3. Summary
In
[Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
ColumnFamilyHandle.getDescriptor() allocates a new native ColumnFamilyOptions
on every JNI call (similar class of bug as FLINK-21986)
This causes two leaks:
1. Descriptor native objects
2. Whole LRUCache leak during even when all tasks are stopped on task manager.
If compactor has ran, the native memory used by LRUCache is never freed even
after all jobs have stopped on the TM. This is because these descriptors
transitively hold reference to LRU cache (or rather, increment reference count
on shared pointer causing non-zero reference count and preventing older shared
blockcache from being cleaned up)
Here's the full chain with file paths and line numbers on how LRUCache leaks:
{quote}Compactor.compact()
flink-state-backends/.../sstmerge/Compactor.java:58
│ cfName.getDescriptor().getOptions()
▼
ColumnFamilyHandle.getDescriptor() → JNI call
frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
│
▼
Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
frocksdb/java/rocksjni/columnfamilyhandle.cc:52
│ cfh->GetDescriptor(&desc)
│ → ColumnFamilyDescriptorJni::construct(env, &desc)
▼
ColumnFamilyDescriptorJni::construct()
frocksdb/java/rocksjni/portal.h:6602
│ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
▼
ColumnFamilyOptionsJni::construct()
frocksdb/java/rocksjni/portal.h:2884
│ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY CONSTRUCTOR
▼
ColumnFamilyOptions (the copy)
frocksdb/include/rocksdb/options.h:303
│ std::shared_ptr<TableFactory> table_factory; ← shared_ptr copied,
refcount++
▼
BlockBasedTableFactory (same instance, not a copy)
frocksdb/table/block_based/block_based_table_factory.h:93
│ BlockBasedTableOptions table_options_; ← value member of
the factory
▼
BlockBasedTableOptions
frocksdb/include/rocksdb/table.h:266
│ std::shared_ptr<Cache> block_cache; ← the LRUCache
▼
LRUCache (the single shared instance)
flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
{quote}
h3. Verification of OOMKill and fix
The fix for this issue is to close the options object as follows:
{{{}---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
{{@@ -19,6 +19,7 @@}}
{{ package org.apache.flink.state.rocksdb.sstmerge;}}
{{ }}
{{ import org.rocksdb.ColumnFamilyHandle;}}
{{+import org.rocksdb.ColumnFamilyOptions;}}
{{ import org.rocksdb.CompactionJobInfo;}}
{{ import org.rocksdb.CompactionOptions;}}
{{ import org.rocksdb.RocksDB;}}
{{@@ -51,7 +52,15 @@ class Compactor {}}
{{ }}}
{{ }}
{{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
throws RocksDBException {}}
{{- int outputLevel = Math.min(level + 1,
cfName.getDescriptor().getOptions().numLevels() - 1);}}
{{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
ColumnFamilyOptions on every}}
{{+ // call (it copies the column family's options across JNI) and does
not close it. Leaking}}
{{+ // it also bumps the reference count on the shared block cache's
shared_ptr, preventing the}}
{{+ // cache from ever being freed (same class of leak as FLINK-21986).
Close it once we have}}
{{+ // read numLevels().}}
{{+ final int outputLevel;}}
{{+ try (ColumnFamilyOptions cfOptions =
cfName.getDescriptor().getOptions()) {}}
{{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
{{+ }}}
{{ LOG.debug(}}
{{ "Manually compacting {} files from level {} to {}: {}",}}
{{ files.size(),}}
Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task
managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS
is at 2.8 GB
Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents,
exceeding configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
Total: [2280777636|tel:2280777636] B
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
[2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
prof_backtrace_impl
0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% rocksdb::DBIter::Seek
0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::Seek
0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressBlockData
0 0.0% 98.9% [651438249|tel:651438249] 28.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.9% [519018670|tel:519018670] 22.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::Next
0 0.0% 98.9% [438749002|tel:438749002] 19.2%
rocksdb::BlockBasedTableIterator::NextAndGetResult
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::Next
0 0.0% 98.9% [438587793|tel:438587793] 19.2%
rocksdb::MergingIterator::NextAndGetResult
0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
0 0.0% 98.9% [102717059|tel:102717059] 4.5%
{_}{{_}}GI{{_}}{_}_clone3
0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
0 0.0% 98.9% [92210334|tel:92210334] 4.0%
KlassFactory::create_from_stream
...
{quote}
With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent with
configured 833MB capacity.
{quote}— Cumulative View —
Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
line 5222.
Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof line
5222.
Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
Total: [1416132765|tel:1416132765] B
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
[1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
prof_backtrace_impl
0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
0 0.0% 98.2% [820714114|tel:820714114] 58.0%
rocksdb::BlockFetcher::ReadBlockContents
0 0.0% 98.2% [810027091|tel:810027091] 57.2%
rocksdb::BlockBasedTable::NewDataBlockIterator
0 0.0% 98.2% [809508356|tel:809508356] 57.2%
rocksdb::BlockBasedTable::RetrieveBlock@6ef610
0 0.0% 98.2% [809438634|tel:809438634] 57.2%
rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
0 0.0% 98.2% [767651473|tel:767651473] 54.2%
rocksdb::BlockBasedTableIterator::InitDataBlock
0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
Java_org_rocksdb_RocksIterator_seek0
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::BlockBasedTableIterator::SeekImpl
0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::Seek
0 0.0% 98.2% [734287999|tel:734287999] 51.9%
rocksdb::MergingIterator::SeekImpl
0 0.0% 98.2% [733060136|tel:733060136] 51.8%
rocksdb::AtomicGroupReadBuffer::Clear
0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressBlockData
0 0.0% 98.2% [135269945|tel:135269945] 9.6%
rocksdb::UncompressSerializedBlock
0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
0 0.0% 98.2% [124409905|tel:124409905] 8.8%
rocksdb::BlockBasedTableIterator::FindBlockForward
0 0.0% 98.2% [91613963|tel:91613963] 6.5%
KlassFactory::create_from_stream
...
{quote}
See here for reproduction steps to trigger OOMKill by starting, stopping Flink
job multiple times:
[https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
(note, the configuration in these reproduction have higher managed memory
fraction to more quickly trigger OOMKill).
> RocksDB ColumnFamilyOptions and LRUCache leak in Compactor
> ----------------------------------------------------------
>
> Key: FLINK-39753
> URL: https://issues.apache.org/jira/browse/FLINK-39753
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.20.4, 2.1.2, 2.0.2, 2.2.1
> Reporter: Keith Lee
> Priority: Critical
>
> h3. Summary
> In
> [Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
> ColumnFamilyHandle.getDescriptor() allocates a new native
> ColumnFamilyOptions on every JNI call (similar class of bug as FLINK-21986)
> This causes two leaks:
> 1. Descriptor native objects
> 2. Whole LRUCache leak during even when all tasks are stopped on task
> manager. If compactor has ran, the native memory used by LRUCache is never
> freed even after all jobs have stopped on the TM. This is because these
> descriptors transitively increments reference count on shared pointer and
> prevent LRUCache from being freed.
> Here's the full chain with file paths and line numbers on how LRUCache leaks:
> {quote}Compactor.compact()
> flink-state-backends/.../sstmerge/Compactor.java:58
> │ cfName.getDescriptor().getOptions()
> ▼
> ColumnFamilyHandle.getDescriptor() → JNI call
> frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
> │
> ▼
> Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
> frocksdb/java/rocksjni/columnfamilyhandle.cc:52
> │ cfh->GetDescriptor(&desc)
> │ → ColumnFamilyDescriptorJni::construct(env, &desc)
> ▼
> ColumnFamilyDescriptorJni::construct()
> frocksdb/java/rocksjni/portal.h:6602
> │ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
> ▼
> ColumnFamilyOptionsJni::construct()
> frocksdb/java/rocksjni/portal.h:2884
> │ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY
> CONSTRUCTOR
> ▼
> ColumnFamilyOptions (the copy)
> frocksdb/include/rocksdb/options.h:303
> │ std::shared_ptr<TableFactory> table_factory; ← shared_ptr
> copied, refcount++
> ▼
> BlockBasedTableFactory (same instance, not a copy)
> frocksdb/table/block_based/block_based_table_factory.h:93
> │ BlockBasedTableOptions table_options_; ← value member of
> the factory
> ▼
> BlockBasedTableOptions
> frocksdb/include/rocksdb/table.h:266
> │ std::shared_ptr<Cache> block_cache; ← the LRUCache
> ▼
> LRUCache (the single shared instance)
> flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
> new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
> {quote}
> h3. Verification of OOMKill and fix
> The fix for this issue is to close the options object as follows:
>
> {{{}---
> a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
>
> b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
> {{@@ -19,6 +19,7 @@}}
> {{ package org.apache.flink.state.rocksdb.sstmerge;}}
> {{ }}
> {{ import org.rocksdb.ColumnFamilyHandle;}}
> {{+import org.rocksdb.ColumnFamilyOptions;}}
> {{ import org.rocksdb.CompactionJobInfo;}}
> {{ import org.rocksdb.CompactionOptions;}}
> {{ import org.rocksdb.RocksDB;}}
> {{@@ -51,7 +52,15 @@ class Compactor {}}
> {{ }}}
> {{ }}
> {{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
> throws RocksDBException {}}
> {{- int outputLevel = Math.min(level + 1,
> cfName.getDescriptor().getOptions().numLevels() - 1);}}
> {{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
> ColumnFamilyOptions on every}}
> {{+ // call (it copies the column family's options across JNI) and
> does not close it. Leaking}}
> {{+ // it also bumps the reference count on the shared block cache's
> shared_ptr, preventing the}}
> {{+ // cache from ever being freed (same class of leak as
> FLINK-21986). Close it once we have}}
> {{+ // read numLevels().}}
> {{+ final int outputLevel;}}
> {{+ try (ColumnFamilyOptions cfOptions =
> cfName.getDescriptor().getOptions()) {}}
> {{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
> {{+ }}}
> {{ LOG.debug(}}
> {{ "Manually compacting {} files from level {} to {}: {}",}}
> {{ files.size(),}}
> Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task
> managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS
> is at 2.8 GB
> Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents,
> exceeding configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
> Total: [2280777636|tel:2280777636] B
> 0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
> 0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
> [2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
> prof_backtrace_impl
> 0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
> 0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
> 0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
> rocksdb::BlockFetcher::ReadBlockContents
> 0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
> rocksdb::BlockBasedTable::NewDataBlockIterator
> 0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
> 0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
> 0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
> rocksdb::BlockBasedTableIterator::InitDataBlock
> 0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
> rocksdb::AtomicGroupReadBuffer::Clear
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
> Java_org_rocksdb_RocksIterator_seek0
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
> rocksdb::DBIter::Seek
> 0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
> rocksdb::BlockBasedTableIterator::SeekImpl
> 0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
> rocksdb::MergingIterator::Seek
> 0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
> rocksdb::MergingIterator::SeekImpl
> 0 0.0% 98.9% [651438249|tel:651438249] 28.6%
> rocksdb::UncompressBlockData
> 0 0.0% 98.9% [651438249|tel:651438249] 28.6%
> rocksdb::UncompressSerializedBlock
> 0 0.0% 98.9% [519018670|tel:519018670] 22.8%
> rocksdb::BlockBasedTableIterator::FindBlockForward
> 0 0.0% 98.9% [438749002|tel:438749002] 19.2%
> rocksdb::BlockBasedTableIterator::Next
> 0 0.0% 98.9% [438749002|tel:438749002] 19.2%
> rocksdb::BlockBasedTableIterator::NextAndGetResult
> 0 0.0% 98.9% [438587793|tel:438587793] 19.2%
> rocksdb::MergingIterator::Next
> 0 0.0% 98.9% [438587793|tel:438587793] 19.2%
> rocksdb::MergingIterator::NextAndGetResult
> 0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
> 0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
> 0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
> 0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
> 0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
> 0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
> 0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
> 0 0.0% 98.9% [102717059|tel:102717059] 4.5%
> {_}{{_}}GI{{_}}{_}_clone3
> 0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
> 0 0.0% 98.9% [92210334|tel:92210334] 4.0%
> KlassFactory::create_from_stream
> ...
> {quote}
> With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent
> with configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
> Total: [1416132765|tel:1416132765] B
> 0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
> 0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
> [1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
> prof_backtrace_impl
> 0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
> 0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
> 0 0.0% 98.2% [820714114|tel:820714114] 58.0%
> rocksdb::BlockFetcher::ReadBlockContents
> 0 0.0% 98.2% [810027091|tel:810027091] 57.2%
> rocksdb::BlockBasedTable::NewDataBlockIterator
> 0 0.0% 98.2% [809508356|tel:809508356] 57.2%
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
> 0 0.0% 98.2% [809438634|tel:809438634] 57.2%
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
> 0 0.0% 98.2% [767651473|tel:767651473] 54.2%
> rocksdb::BlockBasedTableIterator::InitDataBlock
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> Java_org_rocksdb_RocksIterator_seek0
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::BlockBasedTableIterator::SeekImpl
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::MergingIterator::Seek
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::MergingIterator::SeekImpl
> 0 0.0% 98.2% [733060136|tel:733060136] 51.8%
> rocksdb::AtomicGroupReadBuffer::Clear
> 0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
> 0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
> 0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
> 0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
> 0 0.0% 98.2% [135269945|tel:135269945] 9.6%
> rocksdb::UncompressBlockData
> 0 0.0% 98.2% [135269945|tel:135269945] 9.6%
> rocksdb::UncompressSerializedBlock
> 0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
> 0 0.0% 98.2% [124409905|tel:124409905] 8.8%
> rocksdb::BlockBasedTableIterator::FindBlockForward
> 0 0.0% 98.2% [91613963|tel:91613963] 6.5%
> KlassFactory::create_from_stream
> ...
> {quote}
> See here for reproduction steps to trigger OOMKill by starting, stopping
> Flink job multiple times:
> [https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
> (note, the configuration in these reproduction have higher managed memory
> fraction to more quickly trigger OOMKill).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)