[ 
https://issues.apache.org/jira/browse/FLINK-39753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083337#comment-18083337
 ] 

Keith Lee commented on FLINK-39753:
-----------------------------------

PR raised against master. LMK if I should cherry pick this for 1.20 and earlier 
2.x as well.

> RocksDB ColumnFamilyOptions and LRUCache leak in Compactor
> ----------------------------------------------------------
>
>                 Key: FLINK-39753
>                 URL: https://issues.apache.org/jira/browse/FLINK-39753
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.20.4, 2.1.2, 2.0.2, 2.2.1
>            Reporter: Keith Lee
>            Priority: Critical
>              Labels: pull-request-available
>
> h3. Summary
> In 
> [Compactor.java#L54|https://github.com/apache/flink/blob/f3ffe4e1e1bc8e833e86509c9f22d45290beb6a6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java#L54],
>  ColumnFamilyHandle.getDescriptor() allocates a new native 
> ColumnFamilyOptions on every JNI call (similar class of bug as FLINK-21986)
> This causes two leaks:
> 1. Descriptor native objects
> 2. Whole LRUCache leak during even when all tasks are stopped on task 
> manager. If compactor has ran, the native memory used by LRUCache is never 
> freed even after all jobs have stopped on the TM. This is because these 
> descriptors transitively increments reference count on shared pointer and 
> prevent LRUCache from being freed. This leak is significant and has 
> contributed to OOMKills in our observations.
> Here's the full chain with file paths and line numbers on how LRUCache leaks:
> {quote}Compactor.compact()
>   flink-state-backends/.../sstmerge/Compactor.java:58
>   │  cfName.getDescriptor().getOptions()
>  ▼
> ColumnFamilyHandle.getDescriptor()  →  JNI call
>   frocksdb/java/src/main/java/org/rocksdb/ColumnFamilyHandle.java:91
>   │
>  ▼
> Java_org_rocksdb_ColumnFamilyHandle_getDescriptor()
>   frocksdb/java/rocksjni/columnfamilyhandle.cc:52
>   │  cfh->GetDescriptor(&desc)
>   │  → ColumnFamilyDescriptorJni::construct(env, &desc)
>  ▼
> ColumnFamilyDescriptorJni::construct()
>   frocksdb/java/rocksjni/portal.h:6602
>   │  ColumnFamilyOptionsJni::construct(env, &(cfd->options))
>  ▼
> ColumnFamilyOptionsJni::construct()
>   frocksdb/java/rocksjni/portal.h:2884
>   │  auto* cfo = new ColumnFamilyOptions(*cfoptions);    ← C++ COPY 
> CONSTRUCTOR
>  ▼
> ColumnFamilyOptions (the copy)
>   frocksdb/include/rocksdb/options.h:303
>   │  std::shared_ptr<TableFactory> table_factory;         ← shared_ptr 
> copied, refcount++
>  ▼
> BlockBasedTableFactory (same instance, not a copy)
>   frocksdb/table/block_based/block_based_table_factory.h:93
>   │  BlockBasedTableOptions table_options_;                ← value member of 
> the factory
>  ▼
> BlockBasedTableOptions
>   frocksdb/include/rocksdb/table.h:266
>   │  std::shared_ptr<Cache> block_cache;                  ← the LRUCache
>  ▼
> LRUCache (the single shared instance)
>   flink-state-backends/.../rocksdb/RocksDBMemoryControllerUtils.java:127
>      new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
> {quote}
> h3. Verification of OOMKill and fix
> The fix for this issue is to close the options object as follows:
>  
> {{{}--- 
> a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}{{{}+++
>  
> b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/state/rocksdb/sstmerge/Compactor.java{}}}
> {{@@ -19,6 +19,7 @@}}
> {{ package org.apache.flink.state.rocksdb.sstmerge;}}
> {{ }}
> {{ import org.rocksdb.ColumnFamilyHandle;}}
> {{+import org.rocksdb.ColumnFamilyOptions;}}
> {{ import org.rocksdb.CompactionJobInfo;}}
> {{ import org.rocksdb.CompactionOptions;}}
> {{ import org.rocksdb.RocksDB;}}
> {{@@ -51,7 +52,15 @@ class Compactor {}}
> {{     }}}
> {{ }}
> {{     void compact(ColumnFamilyHandle cfName, int level, List<String> files) 
> throws RocksDBException {}}
> {{-        int outputLevel = Math.min(level + 1, 
> cfName.getDescriptor().getOptions().numLevels() - 1);}}
> {{+        // ColumnFamilyHandle.getDescriptor() allocates a new native 
> ColumnFamilyOptions on every}}
> {{+        // call (it copies the column family's options across JNI) and 
> does not close it. Leaking}}
> {{+        // it also bumps the reference count on the shared block cache's 
> shared_ptr, preventing the}}
> {{+        // cache from ever being freed (same class of leak as 
> FLINK-21986). Close it once we have}}
> {{+        // read numLevels().}}
> {{+        final int outputLevel;}}
> {{+        try (ColumnFamilyOptions cfOptions = 
> cfName.getDescriptor().getOptions()) {}}
> {{+            outputLevel = Math.min(level + 1, cfOptions.numLevels() - 1);}}
> {{+        }}}
> {{         LOG.debug(}}
> {{                 "Manually compacting {} files from level {} to {}: {}",}}
> {{                 files.size(),}}
> Two sets of Flink SQL JOB were ran on local clusters. After 6+ hours, 8 task 
> managers without fix have working set RSS > 3.4 GB. With the fix, highest RSS 
> is at 2.8 GB
> Without closing of , TM with high RSS have ~1.54GB in ReadBlockContents, 
> exceeding configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
> line 5222.
> Using local file /tmp/jeprof/prof.293.3194.i3194.heap.
> Total: [2280777636|tel:2280777636] B
>        0   0.0%   0.0% [2255548014|tel:2255548014]  98.9% je_prof_backtrace
>        0   0.0%   0.0% [2255548014|tel:2255548014]  98.9% je_prof_tctx_create
> [2255548014|tel:2255548014]  98.9%  98.9% [2255548014|tel:2255548014]  98.9% 
> prof_backtrace_impl
>        0   0.0%  98.9% [2247187497|tel:2247187497]  98.5% je_malloc_default
>        0   0.0%  98.9% [1694877743|tel:1694877743]  74.3% void* fallback_impl
>        0   0.0%  98.9% [1655889046|tel:1655889046]  72.6% 
> rocksdb::BlockFetcher::ReadBlockContents
>        0   0.0%  98.9% [1573497612|tel:1573497612]  69.0% 
> rocksdb::BlockBasedTable::NewDataBlockIterator
>        0   0.0%  98.9% [1573201192|tel:1573201192]  69.0% 
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
>        0   0.0%  98.9% [1573201192|tel:1573201192]  69.0% 
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
>        0   0.0%  98.9% [1510010892|tel:1510010892]  66.2% 
> rocksdb::BlockBasedTableIterator::InitDataBlock
>        0   0.0%  98.9% [1414778240|tel:1414778240]  62.0% 
> rocksdb::AtomicGroupReadBuffer::Clear
>        0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 0x00007fb693d7cc01
>        0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 
> Java_org_rocksdb_RocksIterator_seek0
>        0   0.0%  98.9% [1097112297|tel:1097112297]  48.1% 
> rocksdb::DBIter::Seek
>        0   0.0%  98.9% [1095159981|tel:1095159981]  48.0% 
> rocksdb::BlockBasedTableIterator::SeekImpl
>        0   0.0%  98.9% [1095024770|tel:1095024770]  48.0% 
> rocksdb::MergingIterator::Seek
>        0   0.0%  98.9% [1095024770|tel:1095024770]  48.0% 
> rocksdb::MergingIterator::SeekImpl
>        0   0.0%  98.9% [651438249|tel:651438249]  28.6% 
> rocksdb::UncompressBlockData
>        0   0.0%  98.9% [651438249|tel:651438249]  28.6% 
> rocksdb::UncompressSerializedBlock
>        0   0.0%  98.9% [519018670|tel:519018670]  22.8% 
> rocksdb::BlockBasedTableIterator::FindBlockForward
>        0   0.0%  98.9% [438749002|tel:438749002]  19.2% 
> rocksdb::BlockBasedTableIterator::Next
>        0   0.0%  98.9% [438749002|tel:438749002]  19.2% 
> rocksdb::BlockBasedTableIterator::NextAndGetResult
>        0   0.0%  98.9% [438587793|tel:438587793]  19.2% 
> rocksdb::MergingIterator::Next
>        0   0.0%  98.9% [438587793|tel:438587793]  19.2% 
> rocksdb::MergingIterator::NextAndGetResult
>        0   0.0%  98.9% [436500266|tel:436500266]  19.1% 0x00007fb693d82c67
>        0   0.0%  98.9% [436500266|tel:436500266]  19.1% rocksdb::DBIter::Next
>        0   0.0%  98.9% [356557773|tel:356557773]  15.6% os::malloc@d01a60
>        0   0.0%  98.9% [352586303|tel:352586303]  15.5% Unsafe_AllocateMemory0
>        0   0.0%  98.9% [352497944|tel:352497944]  15.5% 0x00007fb692fe77e0
>        0   0.0%  98.9% [142878042|tel:142878042]   6.3% os::malloc@d01ca0
>        0   0.0%  98.9% [136630920|tel:136630920]   6.0% AllocateHeap@4327f0
>        0   0.0%  98.9% [102717059|tel:102717059]   4.5% 
> {_}{{_}}GI{{_}}{_}_clone3
>        0   0.0%  98.9% [102717059|tel:102717059]   4.5% start_thread
>        0   0.0%  98.9% [92210334|tel:92210334]   4.0% 
> KlassFactory::create_from_stream
> ...
> {quote}
> With fix, TM with highest RSS have ~800MB in ReadBlockContents, consistent 
> with configured 833MB capacity.
> {quote}— Cumulative View —
> Using local file /usr/local/lib/[libjemalloc.so|http://libjemalloc.so/].
> Argument "MSWin32" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
> line 5222.
> Argument "linux" isn't numeric in numeric eq (==) at /usr/local/bin/jeprof 
> line 5222.
> Using local file /tmp/jeprof/prof.290.8922.i8922.heap.
> Total: [1416132765|tel:1416132765] B
>        0   0.0%   0.0% [1391278564|tel:1391278564]  98.2% je_prof_backtrace
>        0   0.0%   0.0% [1391278564|tel:1391278564]  98.2% je_prof_tctx_create
> [1391278564|tel:1391278564]  98.2%  98.2% [1391278564|tel:1391278564]  98.2% 
> prof_backtrace_impl
>        0   0.0%  98.2% [1382556219|tel:1382556219]  97.6% je_malloc_default
>        0   0.0%  98.2% [856531103|tel:856531103]  60.5% void* fallback_impl
>        0   0.0%  98.2% [820714114|tel:820714114]  58.0% 
> rocksdb::BlockFetcher::ReadBlockContents
>        0   0.0%  98.2% [810027091|tel:810027091]  57.2% 
> rocksdb::BlockBasedTable::NewDataBlockIterator
>        0   0.0%  98.2% [809508356|tel:809508356]  57.2% 
> rocksdb::BlockBasedTable::RetrieveBlock@6ef610
>        0   0.0%  98.2% [809438634|tel:809438634]  57.2% 
> rocksdb::BlockBasedTable::MaybeReadBlockAndLoadToCache@6eeb40
>        0   0.0%  98.2% [767651473|tel:767651473]  54.2% 
> rocksdb::BlockBasedTableIterator::InitDataBlock
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 0x00007fce45a7b781
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
> Java_org_rocksdb_RocksIterator_seek0
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
> rocksdb::BlockBasedTableIterator::SeekImpl
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% rocksdb::DBIter::Seek
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
> rocksdb::MergingIterator::Seek
>        0   0.0%  98.2% [734287999|tel:734287999]  51.9% 
> rocksdb::MergingIterator::SeekImpl
>        0   0.0%  98.2% [733060136|tel:733060136]  51.8% 
> rocksdb::AtomicGroupReadBuffer::Clear
>        0   0.0%  98.2% [353073142|tel:353073142]  24.9% os::malloc@d01a60
>        0   0.0%  98.2% [349535104|tel:349535104]  24.7% Unsafe_AllocateMemory0
>        0   0.0%  98.2% [349464287|tel:349464287]  24.7% 0x00007fce455e6160
>        0   0.0%  98.2% [136836241|tel:136836241]   9.7% os::malloc@d01ca0
>        0   0.0%  98.2% [135269945|tel:135269945]   9.6% 
> rocksdb::UncompressBlockData
>        0   0.0%  98.2% [135269945|tel:135269945]   9.6% 
> rocksdb::UncompressSerializedBlock
>        0   0.0%  98.2% [130278866|tel:130278866]   9.2% AllocateHeap@4327f0
>        0   0.0%  98.2% [124409905|tel:124409905]   8.8% 
> rocksdb::BlockBasedTableIterator::FindBlockForward
>        0   0.0%  98.2% [91613963|tel:91613963]   6.5% 
> KlassFactory::create_from_stream
>  ...
> {quote}
> See here for reproduction steps to trigger OOMKill by starting, stopping 
> Flink job multiple times: 
> [https://github.com/leekeiabstraction/flink/tree/sst-merge-getdescriptor-leak-repro/sst-leak-repro]
>  (note, the configuration in these reproduction have higher managed memory 
> fraction to more quickly trigger OOMKill).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to