[ 
https://issues.apache.org/jira/browse/FLINK-29928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-29928:
--------------------------------------
    Description: 
h1. Background and motivation

RocksDB is one of the main consumers of off-heap memory, which it uses for 
BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
 - share these objects among RocksDB instances of the same slot
 - bound the total memory usage by all RocksDB instances of a TM

The memory is divided between the slots equally (unless using fine-grained 
resource control).
This is sub-optimal if some slots contain more memory intensive tasks than the 
others.

The proposal is to widen the scope of sharing memory to TM, so that it can be 
shared across all of its RocksDB instances.
That would reduce the overall memory consumption in exchange for resource 
isolation.
h1. Proposed changes
h2. Configuration
 - introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no 
default)
 -- cluster-level (yaml only)
 -- used by a job only if neither 'state.backend.rocksdb.memory.fixed-per-slot' 
nor 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
 - use cluster-level or default configuration when creating TM-wise shared 
RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
"state.backend.rocksdb.memory.write-buffer-ratio"

h2. Example
{code:java}
# cluster-level configuration
taskmanager.memory.managed.size: 1gb
state.backend.rocksdb.memory.fixed-per-tm: 1gb
taskmanager.numberOfTaskSlots: 10
cluster.fine-grained-resource-management.enabled: false

job 1:
state.backend.rocksdb.memory.managed: false # uses shared TM memory

job 2:
state.backend.rocksdb.memory.managed: false # uses shared TM memory

job 3:
state.backend.rocksdb.memory.managed: true # uses exclusive managed memory

job 4:
state.backend.rocksdb.memory.managed: true # gets overriden below
state.backend.rocksdb.memory.fixed-per-slot: 50M # uses exclusive unmanaged 
memory

{code}
Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete 
with each other.
Their Python code (or other consumers) will be able to use up to ~100Mb per 
slot.

Jobs 3 and 4 are not affected as they specify using managed (3) or 
fixed-per-slot memory (4).
Python code (or other consumers) will be able to use up to ~100Mb per slot but 
will compete with RocksDB in job (3).

h2. Creating and sharing RocksDB objects

Introduce sharedResources to TaskManager.
Then, similarly to the current slot-wise sharing using MemoryManager:
 - put/get OpaqueMemoryResource
 - Creation of Cache object is done from the backend code on the first call
 - Release it when the last backend that uses it is destroyed
So flink-runtime doesn't have to depend on state backend.

h2. Class loading and resolution

RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set 
to parent-first to prevent conflicts.
h2. Lifecycle

The cache object should be destroyed on TM termnation; job or task completion 
should NOT close it.
h1. Testing
 * One way to test that the same RocksDB cache is used is via RocksDB metrics.
 * ITCases parameterization
 * manual and unit tests

h1. Limitations
 - classloader.resolve-order=child-first is not supported
 - fine-grained-resource-management is not supported
 - only RocksDB will be able to use TM-wise shared memory; other consumers may 
be adjusted later

h1. Rejected alternatives
 - set total "fixed-per-slot" to a larger value, essentially overcommitting 
unmanaged memory - doesn't work well in containerized environments (OOMErrors)
 - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - 
requires more invasive changes in scheduler and TM
- make part managed memory shared; it is beleived that managed memory must 
preserve isolation proprty among other concerns

cc: [~yunta], [~ym], [~liyu]

  was:
h1. Background and motivation

RocksDB is one of the main consumers of off-heap memory, which it uses for 
BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
 - share these objects among RocksDB instances of the same slot
 - bound the total memory usage by all RocksDB instances of a TM

The memory is divided between the slots equally (unless using fine-grained 
resource control).
This is sub-optimal if some slots contain more memory intensive tasks than the 
others.

The proposal is to widen the scope of sharing memory to TM, so that it can be 
shared across all of its RocksDB instances.
That would reduce the overall memory consumption in exchange for resource 
isolation.
h1. Proposed changes
h2. Configuration
 - introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
 -- cluster-level (yaml only)
 -- the non-shared memory will be used as it is now (exclusively per-slot)
 - -introduce "state.backend.memory.share-scope"-
 -- -job-level (yaml and StateBackend)-
 -- -possible values: NONE, SLOT, TASK_MANAGER-
 -- -default: not set-
 -- override "state.backend.rocksdb.memory.fixed-per-slot" if shared-fraction > 
0 (but don't deprecate it, because it specifies the size)
 - rely on the existing "state.backend.rocksdb.memory.managed" to decide 
whether the shared memory is managed or unmanaged
 - when calculating TM-wise shared  memory, ignore 
"taskmanager.memory.managed.consumer-weights" because RocksDB is the only 
consumer so far
 - similarly, exclude StateBackend from weights calculations, so other 
consumers (e.g. PYTHON) can better utilize exclusive slot memory
 - use cluster-level or default configuration when creating TM-wise shared 
RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
"state.backend.rocksdb.memory.write-buffer-ratio"

h2. Example
{code:java}
taskmanager.memory.managed.size: 1gb
taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of 
shared managed memory
taskmanager.numberOfTaskSlots: 10               # each task slot gets 25Mb of 
exclusive managed memory
cluster.fine-grained-resource-management.enabled: false

job 1:
state.backend.rocksdb.memory.managed: true

job 2:
state.backend.rocksdb.memory.managed: true

job 3:
state.backend.rocksdb.memory.fixed-per-slot: 50M # ignored
state.backend.rocksdb.memory.managed: true

job 4:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false

job 5:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false
{code}
Jobs 1, 2, 3 will use the same 750Mb of managed memory and will compete with 
each other.
Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete with 
each other. 750Mb here is calculated using the same settings 
(managed.shared-fraction) to avoid adding additional parameters 
(unmanaged.shared-fraction).

Python code (or other consumers) will be able to use up to 25mb per slot.
h2. Creating and sharing RocksDB objects

Introduce sharedMemoryManager to TaskManager.
Then, similarly to the current slot-wise sharing:
 - Memory manager manages OpaqueMemoryResource
 - Creation of Cache object is done from the backend code on the first call
So flink-runtime doesn't have to depend on state backend.

h2. Class loading and resolution

RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set 
to parent-first to prevent conflicts.
h2. Lifecycle

The cache object should be destroyed on TM termnation; job or task completion 
should NOT close it.
h1. Testing
 * One way to test that the same RocksDB cache is used is via RocksDB metrics.
 * ITCases parameterization
 * manual and unit tests

h1. Limitations
 - classloader.resolve-order=child-first is not supported
 - fine-grained-resource-management is not supported
 - only RocksDB will be able to use TM-wise shared memory; other consumers may 
be adjusted later

h1. Rejected alternatives
 - set total "fixed-per-slot" to a larger value, essentially overcommitting 
unmanaged memory - doesn't work well in containerized environments (OOMErrors)
 - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - 
requires more invasive changes in scheduler and TM

cc: [~yunta], [~ym], [~liyu]


> Allow sharing (RocksDB) memory between slots
> --------------------------------------------
>
>                 Key: FLINK-29928
>                 URL: https://issues.apache.org/jira/browse/FLINK-29928
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Configuration, Runtime / State Backends, 
> Runtime / Task
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> h1. Background and motivation
> RocksDB is one of the main consumers of off-heap memory, which it uses for 
> BlockCache, MemTables, Indices and Bloom Filters.
> Since 1.10 (FLINK-7289), it is possible to:
>  - share these objects among RocksDB instances of the same slot
>  - bound the total memory usage by all RocksDB instances of a TM
> The memory is divided between the slots equally (unless using fine-grained 
> resource control).
> This is sub-optimal if some slots contain more memory intensive tasks than 
> the others.
> The proposal is to widen the scope of sharing memory to TM, so that it can be 
> shared across all of its RocksDB instances.
> That would reduce the overall memory consumption in exchange for resource 
> isolation.
> h1. Proposed changes
> h2. Configuration
>  - introduce "state.backend.rocksdb.memory.fixed-per-tm" (memory size, no 
> default)
>  -- cluster-level (yaml only)
>  -- used by a job only if neither 
> 'state.backend.rocksdb.memory.fixed-per-slot' nor 
> 'state.backend.rocksdb.memory.fixed-per-slot' are not used for the job
>  - use cluster-level or default configuration when creating TM-wise shared 
> RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
> "state.backend.rocksdb.memory.write-buffer-ratio"
> h2. Example
> {code:java}
> # cluster-level configuration
> taskmanager.memory.managed.size: 1gb
> state.backend.rocksdb.memory.fixed-per-tm: 1gb
> taskmanager.numberOfTaskSlots: 10
> cluster.fine-grained-resource-management.enabled: false
> job 1:
> state.backend.rocksdb.memory.managed: false # uses shared TM memory
> job 2:
> state.backend.rocksdb.memory.managed: false # uses shared TM memory
> job 3:
> state.backend.rocksdb.memory.managed: true # uses exclusive managed memory
> job 4:
> state.backend.rocksdb.memory.managed: true # gets overriden below
> state.backend.rocksdb.memory.fixed-per-slot: 50M # uses exclusive unmanaged 
> memory
> {code}
> Jobs 1 and2 will use the same 1Gb of shared unmanaged memory and will compete 
> with each other.
> Their Python code (or other consumers) will be able to use up to ~100Mb per 
> slot.
> Jobs 3 and 4 are not affected as they specify using managed (3) or 
> fixed-per-slot memory (4).
> Python code (or other consumers) will be able to use up to ~100Mb per slot 
> but will compete with RocksDB in job (3).
> h2. Creating and sharing RocksDB objects
> Introduce sharedResources to TaskManager.
> Then, similarly to the current slot-wise sharing using MemoryManager:
>  - put/get OpaqueMemoryResource
>  - Creation of Cache object is done from the backend code on the first call
>  - Release it when the last backend that uses it is destroyed
> So flink-runtime doesn't have to depend on state backend.
> h2. Class loading and resolution
> RocksDB state backend is already a part of the distribution.
> However, if a job also includes it then classloader.resolve-order should be 
> set to parent-first to prevent conflicts.
> h2. Lifecycle
> The cache object should be destroyed on TM termnation; job or task completion 
> should NOT close it.
> h1. Testing
>  * One way to test that the same RocksDB cache is used is via RocksDB metrics.
>  * ITCases parameterization
>  * manual and unit tests
> h1. Limitations
>  - classloader.resolve-order=child-first is not supported
>  - fine-grained-resource-management is not supported
>  - only RocksDB will be able to use TM-wise shared memory; other consumers 
> may be adjusted later
> h1. Rejected alternatives
>  - set total "fixed-per-slot" to a larger value, essentially overcommitting 
> unmanaged memory - doesn't work well in containerized environments (OOMErrors)
>  - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks 
> - requires more invasive changes in scheduler and TM
> - make part managed memory shared; it is beleived that managed memory must 
> preserve isolation proprty among other concerns
> cc: [~yunta], [~ym], [~liyu]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to