[
https://issues.apache.org/jira/browse/FLINK-39753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083337#comment-18083337
]
Keith Lee commented on FLINK-39753:
-----------------------------------
PR raised against master. LMK if I should cherry pick this for 1.20 and earlier
2.x as well.
> RocksDB ColumnFamilyOptions and LRUCache leak in Compactor
> ----------------------------------------------------------
>
> Key: FLINK-39753
> URL: https://issues.apache.org/jira/browse/FLINK-39753
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.20.4, 2.1.2, 2.0.2, 2.2.1
> Reporter: Keith Lee
> Priority: Critical
> Labels: pull-request-available
>
> h3. Summary
> In
> [Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
> ColumnFamilyHandle.getDescriptor() allocates a new native
> ColumnFamilyOptions on every JNI call (similar class of bug as FLINK-21986)
> This causes two leaks:
> 1. Descriptor native objects
> 2. Whole LRUCache leak during even when all tasks are stopped on task
> manager. If compactor has ran, the native memory used by LRUCache is never
> freed even after all jobs have stopped on the TM. This is because these
> descriptors transitively increments reference count on shared pointer and
> prevent LRUCache from being freed. This leak is significant and has
> contributed to OOMKills in our observations.
> Here's the full chain with file paths and line numbers on how LRUCache leaks:
> {quote}Compactor.compact()
> flink-state-backends/.../sstmerge/Compactor.java:58
> │ cfName.getDescriptor().getOptions()
> ▼
> ColumnFamilyHandle.getDescriptor() → JNI call
> frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
> │
> ▼
> Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
> frocksdb/java/rocksjni/columnfamilyhandle.cc:52
> │ cfh->GetDescriptor(&desc)
> │ → ColumnFamilyDescriptorJni::construct(env, &desc)
> ▼
> ColumnFamilyDescriptorJni::construct()
> frocksdb/java/rocksjni/portal.h:6602
> │ ColumnFamilyOptionsJni::construct(env, &(cfd->options))
> ▼
> ColumnFamilyOptionsJni::construct()
> frocksdb/java/rocksjni/portal.h:2884
> │ auto* cfo = new ColumnFamilyOptions(*cfoptions); ← C++ COPY
> CONSTRUCTOR
> ▼
> ColumnFamilyOptions (the copy)
> frocksdb/include/rocksdb/options.h:303
> │ std::shared_ptr<TableFactory> table_factory; ← shared_ptr
> copied, refcount++
> ▼
> BlockBasedTableFactory (same instance, not a copy)
> frocksdb/table/block_based/block_based_table_factory.h:93
> │ BlockBasedTableOptions table_options_; ← value member of
> the factory
> ▼
> BlockBasedTableOptions
> frocksdb/include/rocksdb/table.h:266
> │ std::shared_ptr<Cache> block_cache; ← the LRUCache
> ▼
> LRUCache (the single shared instance)
> flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
> new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
> {quote}
> h3. Verification of OOMKill and fix
> The fix for this issue is to close the options object as follows:
>
> {{{}---
> a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
>
> b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
> {{@@ -19,6 +19,7 @@}}
> {{ package org.apache.flink.state.rocksdb.sstmerge;}}
> {{ }}
> {{ import org.rocksdb.ColumnFamilyHandle;}}
> {{+import org.rocksdb.ColumnFamilyOptions;}}
> {{ import org.rocksdb.CompactionJobInfo;}}
> {{ import org.rocksdb.CompactionOptions;}}
> {{ import org.rocksdb.RocksDB;}}
> {{@@ -51,7 +52,15 @@ class Compactor {}}
> {{ }}}
> {{ }}
> {{ void compact(ColumnFamilyHandle cfName, int level, List<String> files)
> throws RocksDBException {}}
> {{- int outputLevel = Math.min(level + 1,
> cfName.getDescriptor().getOptions().numLevels() - 1);}}
> {{+ // ColumnFamilyHandle.getDescriptor() allocates a new native
> ColumnFamilyOptions on every}}
> {{+ // call (it copies the column family's options across JNI) and
> does not close it. Leaking}}
> {{+ // it also bumps the reference count on the shared block cache's
> shared_ptr, preventing the}}
> {{+ // cache from ever being freed (same class of leak as
> FLINK-21986). Close it once we have}}
> {{+ // read numLevels().}}
> {{+ final int outputLevel;}}
> {{+ try (ColumnFamilyOptions cfOptions =
> cfName.getDescriptor().getOptions()) {}}
> {{+ outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
> {{+ }}}
> {{ LOG.debug(}}
> {{ "Manually compacting {} files from level {} to {}: {}",}}
> {{ files.size(),}}
> Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task
> managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS
> is at 2.8 GB
> Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents,
> exceeding configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
> Total: [2280777636|tel:2280777636] B
> 0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_backtrace
> 0 0.0% 0.0% [2255548014|tel:2255548014] 98.9% je_prof_tctx_create
> [2255548014|tel:2255548014] 98.9% 98.9% [2255548014|tel:2255548014] 98.9%
> prof_backtrace_impl
> 0 0.0% 98.9% [2247187497|tel:2247187497] 98.5% je_malloc_default
> 0 0.0% 98.9% [1694877743|tel:1694877743] 74.3% void* fallback_impl
> 0 0.0% 98.9% [1655889046|tel:1655889046] 72.6%
> rocksdb::BlockFetcher::ReadBlockContents
> 0 0.0% 98.9% [1573497612|tel:1573497612] 69.0%
> rocksdb::BlockBasedTable::NewDataBlockIterator
> 0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
> 0 0.0% 98.9% [1573201192|tel:1573201192] 69.0%
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
> 0 0.0% 98.9% [1510010892|tel:1510010892] 66.2%
> rocksdb::BlockBasedTableIterator::InitDataBlock
> 0 0.0% 98.9% [1414778240|tel:1414778240] 62.0%
> rocksdb::AtomicGroupReadBuffer::Clear
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1% 0x00007fb693d7cc01
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
> Java_org_rocksdb_RocksIterator_seek0
> 0 0.0% 98.9% [1097112297|tel:1097112297] 48.1%
> rocksdb::DBIter::Seek
> 0 0.0% 98.9% [1095159981|tel:1095159981] 48.0%
> rocksdb::BlockBasedTableIterator::SeekImpl
> 0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
> rocksdb::MergingIterator::Seek
> 0 0.0% 98.9% [1095024770|tel:1095024770] 48.0%
> rocksdb::MergingIterator::SeekImpl
> 0 0.0% 98.9% [651438249|tel:651438249] 28.6%
> rocksdb::UncompressBlockData
> 0 0.0% 98.9% [651438249|tel:651438249] 28.6%
> rocksdb::UncompressSerializedBlock
> 0 0.0% 98.9% [519018670|tel:519018670] 22.8%
> rocksdb::BlockBasedTableIterator::FindBlockForward
> 0 0.0% 98.9% [438749002|tel:438749002] 19.2%
> rocksdb::BlockBasedTableIterator::Next
> 0 0.0% 98.9% [438749002|tel:438749002] 19.2%
> rocksdb::BlockBasedTableIterator::NextAndGetResult
> 0 0.0% 98.9% [438587793|tel:438587793] 19.2%
> rocksdb::MergingIterator::Next
> 0 0.0% 98.9% [438587793|tel:438587793] 19.2%
> rocksdb::MergingIterator::NextAndGetResult
> 0 0.0% 98.9% [436500266|tel:436500266] 19.1% 0x00007fb693d82c67
> 0 0.0% 98.9% [436500266|tel:436500266] 19.1% rocksdb::DBIter::Next
> 0 0.0% 98.9% [356557773|tel:356557773] 15.6% os::malloc@d01a60
> 0 0.0% 98.9% [352586303|tel:352586303] 15.5% Unsafe_AllocateMemory0
> 0 0.0% 98.9% [352497944|tel:352497944] 15.5% 0x00007fb692fe77e0
> 0 0.0% 98.9% [142878042|tel:142878042] 6.3% os::malloc@d01ca0
> 0 0.0% 98.9% [136630920|tel:136630920] 6.0% AllocateHeap@4327f0
> 0 0.0% 98.9% [102717059|tel:102717059] 4.5%
> {_}{{_}}GI{{_}}{_}_clone3
> 0 0.0% 98.9% [102717059|tel:102717059] 4.5% start_thread
> 0 0.0% 98.9% [92210334|tel:92210334] 4.0%
> KlassFactory::create_from_stream
> ...
> {quote}
> With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent
> with configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof
> line 5222.
> Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
> Total: [1416132765|tel:1416132765] B
> 0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_backtrace
> 0 0.0% 0.0% [1391278564|tel:1391278564] 98.2% je_prof_tctx_create
> [1391278564|tel:1391278564] 98.2% 98.2% [1391278564|tel:1391278564] 98.2%
> prof_backtrace_impl
> 0 0.0% 98.2% [1382556219|tel:1382556219] 97.6% je_malloc_default
> 0 0.0% 98.2% [856531103|tel:856531103] 60.5% void* fallback_impl
> 0 0.0% 98.2% [820714114|tel:820714114] 58.0%
> rocksdb::BlockFetcher::ReadBlockContents
> 0 0.0% 98.2% [810027091|tel:810027091] 57.2%
> rocksdb::BlockBasedTable::NewDataBlockIterator
> 0 0.0% 98.2% [809508356|tel:809508356] 57.2%
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
> 0 0.0% 98.2% [809438634|tel:809438634] 57.2%
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
> 0 0.0% 98.2% [767651473|tel:767651473] 54.2%
> rocksdb::BlockBasedTableIterator::InitDataBlock
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9% 0x00007fce45a7b781
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> Java_org_rocksdb_RocksIterator_seek0
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::BlockBasedTableIterator::SeekImpl
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9% rocksdb::DBIter::Seek
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::MergingIterator::Seek
> 0 0.0% 98.2% [734287999|tel:734287999] 51.9%
> rocksdb::MergingIterator::SeekImpl
> 0 0.0% 98.2% [733060136|tel:733060136] 51.8%
> rocksdb::AtomicGroupReadBuffer::Clear
> 0 0.0% 98.2% [353073142|tel:353073142] 24.9% os::malloc@d01a60
> 0 0.0% 98.2% [349535104|tel:349535104] 24.7% Unsafe_AllocateMemory0
> 0 0.0% 98.2% [349464287|tel:349464287] 24.7% 0x00007fce455e6160
> 0 0.0% 98.2% [136836241|tel:136836241] 9.7% os::malloc@d01ca0
> 0 0.0% 98.2% [135269945|tel:135269945] 9.6%
> rocksdb::UncompressBlockData
> 0 0.0% 98.2% [135269945|tel:135269945] 9.6%
> rocksdb::UncompressSerializedBlock
> 0 0.0% 98.2% [130278866|tel:130278866] 9.2% AllocateHeap@4327f0
> 0 0.0% 98.2% [124409905|tel:124409905] 8.8%
> rocksdb::BlockBasedTableIterator::FindBlockForward
> 0 0.0% 98.2% [91613963|tel:91613963] 6.5%
> KlassFactory::create_from_stream
> ...
> {quote}
> See here for reproduction steps to trigger OOMKill by starting, stopping
> Flink job multiple times:
> [https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
> (note, the configuration in these reproduction have higher managed memory
> fraction to more quickly trigger OOMKill).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)