[ 
https://issues.apache.org/jira/browse/FLINK-29928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29928:
-----------------------------------
    Labels: pull-request-available  (was: )

> Allow sharing (RocksDB) memory between slots
> --------------------------------------------
>
>                 Key: FLINK-29928
>                 URL: https://issues.apache.org/jira/browse/FLINK-29928
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Configuration, Runtime / State Backends, 
> Runtime / Task
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> h1. Background and motivation
> RocksDB is one of the main consumers of off-heap memory, which it uses for 
> BlockCache, MemTables, Indices and Bloom Filters.
> Since 1.10 (FLINK-7289), it is possible to:
> - share these objects among RocksDB instances of the same slot
> - bound the total memory usage by all RocksDB instances of a TM
> The memory is divided between the slots equally (unless using fine-grained 
> resource control).
> This is sub-optimal, if some slots contain more memory intensive tasks than 
> the others.
> The proposal is to widen the scope of sharing memory to TM so that it can be 
> shared across all its RocksDB instances.
> That would allow to reduce the overall memory consuption in exchange for 
> resource isolation.
> h1. Proposed changes
> h2. Configuration
> - introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
> -- cluster-level (yaml only)
> -- the non-shared memory will be used as it is now (exclusively per-slot)
> - introduce "state.backend.memory.share-scope"
> -- job-level (yaml and StateBackend)
> -- possible values: NONE, SLOT, TASK_MANAGER
> -- default: not set
> -- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set 
> (but don't deprecate it, because it specifies the size)
> - rely on the existing "state.backend.rocksdb.memory.managed" to decide 
> whether the shared memory is managed or unmanaged
> - when calculating TM-wise shared  memory, ignore 
> "taskmanager.memory.managed.consumer-weights" because RocksDB is the only 
> consumer so far
> - similarly, exclude StateBackend from weights calculations, so other 
> consumers (e.g. PYTHON) can better utilize exclusive slot memory
> - use cluster-level or default configuration when creating TM-wise shared 
> RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
> "state.backend.rocksdb.memory.write-buffer-ratio"
> h2. Example
> {code}
> taskmanager.memory.managed.size: 1gb
> taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of 
> shared managed memory
> taskmanager.numberOfTaskSlots: 10               # each task slot gets 25Mb of 
> exclusive managed memory
> cluster.fine-grained-resource-management.enabled: false
> job 1:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: true
> job 2:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: true
> job 3:
> state.backend.memory.share-scope: SLOT
> state.backend.rocksdb.memory.managed: true
> job 4:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> job 5:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> {code}
> Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with 
> each other.
> Job 3 will only use exclusive slot memory (25mb per slot).
> Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete 
> with each other.
> Python code (or other consumers) will be able to use up to 25mb per slot in 
> jobs 1,2,4,5.
> h2. Creating and sharing RocksDB objects
> Introduce sharedMemoryManager to TaskManager.
> Then, similarly to the current slot-wise sharing:
> - Memory manager manages OpaqueMemoryResource
> - Creation of Cache object is done from the backend code on the first call
> So flink-runtime doesn't have to depend on state backend.
> h2. Class loading and resolution
> RocksDB state backend is already a part of the distribution.
> However, if a job also includes it then classloader.resolve-order should be 
> set to parent-first to prevent conflicts.
> h2. Lifecycle
> The cache object should be destroyed on TM termnation; job or task completion 
> should NOT close it.
> h1. Testing
> One way to test that the same RocksDB cache is used is via RocksDB metrics.
> h1. Limitations
> - classloader.resolve-order=child-first is not supported
> - fine-grained-resource-management is not supported
> - only RocksDB will be able to use TM-wise shared memory; other consumers may 
> be adjusted later
> h1. Rejected alternatives
> - set total "fixed-per-slot" to a larger value, essentially overcommitting 
> unmanaged memory - doesn't work well in containerized environments (OOMErrors)
> - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - 
> requires more invasive changes in scheduler and TM
> cc: [~yunta], [~ym], [~liyu]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to