[ 
https://issues.apache.org/jira/browse/FLINK-29928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-29928:
--------------------------------------
    Description: 
h1. Background and motivation

RocksDB is one of the main consumers of off-heap memory, which it uses for 
BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
- share these objects among RocksDB instances of the same slot
- bound the total memory usage by all RocksDB instances of a TM

The memory is divided between the slots equally (unless using fine-grained 
resource control).
This is sub-optimal, if some slots contain more memory intensive tasks than the 
others.

The proposal is to widen the scope of sharing memory to TM so that it can be 
shared across all its RocksDB instances.
That would allow to reduce the overall memory consuption in exchange for 
resource isolation.

h1. Proposed changes

h2. Configuration
- introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
-- cluster-level (yaml only)
-- the non-shared memory will be used as it is now (exclusively per-slot)
- introduce "state.backend.memory.share-scope"
-- job-level (yaml and StateBackend)
-- possible values: NONE, SLOT, TASK_MANAGER
-- default: not set
-- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set (but 
don't deprecate it, because it specifies the size)
- rely on the existing "state.backend.rocksdb.memory.managed" to decide whether 
the shared memory is managed or unmanaged
- when calculating TM-wise shared  memory, ignore 
"taskmanager.memory.managed.consumer-weights" because RocksDB is the only 
consumer so far
- similarly, exclude StateBackend from weights calculations, so other consumers 
(e.g. PYTHON) can better utilize exclusive slot memory
- use cluster-level or default configuration when creating TM-wise shared 
RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
"state.backend.rocksdb.memory.write-buffer-ratio"

h2. Example
{code}
taskmanager.memory.managed.size: 1gb
taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of 
shared managed memory
taskmanager.numberOfTaskSlots: 10               # each task slot gets 25Mb of 
exclusive managed memory
cluster.fine-grained-resource-management.enabled: false

job 1:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true

job 2:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true

job 3:
state.backend.memory.share-scope: SLOT
state.backend.rocksdb.memory.managed: true

job 4:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false

job 5:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false
{code}
Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with 
each other.
Job 3 will only use exclusive slot memory (25mb per slot).
Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete with 
each other.

Python code (or other consumers) will be able to use up to 25mb per slot in 
jobs 1,2,4,5.

h2. Creating and sharing RocksDB objects
Introduce sharedMemoryManager to TaskManager.
Then, similarly to the current slot-wise sharing:
- Memory manager manages OpaqueMemoryResource
- Creation of Cache object is done from the backend code on the first call
So flink-runtime doesn't have to depend on state backend.

h2. Class loading and resolution
RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set 
to parent-first to prevent conflicts.

h2. Lifecycle
The cache object should be destroyed on TM termnation; job or task completion 
should NOT close it.

h1. Testing
One way to test that the same RocksDB cache is used is via RocksDB metrics.

h1. Limitations
- classloader.resolve-order=child-first is not supported
- fine-grained-resource-management is not supported
- only RocksDB will be able to use TM-wise shared memory; other consumers may 
be adjusted later

h1. Rejected alternatives
- set total "fixed-per-slot" to a larger value, essentially overcommitting 
unmanaged memory - doesn't work well in containerized environments (OOMErrors)
- set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - 
requires more invasive changes in scheduler and TM

cc: [~yunta], [~ym], [~liyu]

  was:
h1. Background and motivation

RocksDB is one of the main consumers of off-heap memory, which it uses for 
BlockCache, MemTables, Indices and Bloom Filters.
Since 1.10 (FLINK-7289), it is possible to:
- share these objects among RocksDB instances of the same slot
- bound the total memory usage by all RocksDB instances of a TM

The memory is divided between the slots equally (unless using fine-grained 
resource control).
This is sub-optimal, if some slots contain more memory intensive tasks than the 
others.

The proposal is to widen the scope of sharing memory to TM so that it can be 
shared across all its RocksDB instances.
That would allow to reduce the overall memory consuption in exchange for 
resource isolation.

h1. Proposed changes

h2. Configuration
- introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
-- cluster-level (yaml only)
-- the non-shared memory will be used as it is now (exclusively per-slot)
- introduce "state.backend.memory.share-scope"
-- job-level (yaml and StateBackend)
-- possible values: NONE, SLOT, TASK_MANAGER
-- default: not set
-- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set (but 
don't deprecate it, because it specifies the size)
- rely on the existing "state.backend.rocksdb.memory.managed" to decide whether 
the shared memory is managed or unmanaged
- when calculating TM-wise shared  memory, ignore 
"taskmanager.memory.managed.consumer-weights" because RocksDB is the only 
consumer so far
- similarly, exclude StateBackend from weights calculations, so other consumers 
(e.g. PYTHON) can better utilize exclusive slot memory
- use cluster-level or default configuration when creating TM-wise shared 
RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
"state.backend.rocksdb.memory.write-buffer-ratio"

h2. Example
{code}
taskmanager.memory.managed.size: 1gb
taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of 
shared managed memory
taskmanager.numberOfTaskSlots: 10               # each task slot gets 25Mb of 
exclusive managed memory
cluster.fine-grained-resource-management.enabled: false

job 1:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true

job 2:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: true

job 3:
state.backend.memory.share-scope: SLOT
state.backend.rocksdb.memory.managed: true

job 4:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false

job 5:
state.backend.memory.share-scope: TASK_MANAGER
state.backend.rocksdb.memory.managed: false
{code}
Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with 
each other.
Job 3 will only use exclusive slot memory (25mb per slot).
Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete with 
each other.

Python code (or other consumers) will be able to use up to 25mb per slot in 
jobs 1,2,4,5.

h2. Creating and sharing RocksDB objects
Introduce sharedMemoryManager to TaskManager.
Then, similarly to the current slot-wise sharing:
- Memory manager manages OpaqueMemoryResource
- Creation of Cache object is done from the backend code on the first call
So flink-runtime doesn't have to depend on state backend.

h2. Class loading and resolution
RocksDB state backend is already a part of the distribution.
However, if a job also includes it then classloader.resolve-order should be set 
to parent-first to prevent conflicts.

h2. Lifecycle
The cache object should be destroyed on TM termnation; job or task completion 
should NOT close it.

h1. Testing
One way to test that the same RocksDB cache is used is via RocksDB metrics.

h1. Limitations
- classloader.resolve-order=child-first is not supported
- fine-grained-resource-management is not supported
- only RocksDB will be able to use TM-wise shared memory; other consumers may 
be adjusted later

cc: [~yunta], [~ym], [~liyu]


> Allow sharing (RocksDB) memory between slots
> --------------------------------------------
>
>                 Key: FLINK-29928
>                 URL: https://issues.apache.org/jira/browse/FLINK-29928
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Configuration, Runtime / State Backends, 
> Runtime / Task
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.17.0
>
>
> h1. Background and motivation
> RocksDB is one of the main consumers of off-heap memory, which it uses for 
> BlockCache, MemTables, Indices and Bloom Filters.
> Since 1.10 (FLINK-7289), it is possible to:
> - share these objects among RocksDB instances of the same slot
> - bound the total memory usage by all RocksDB instances of a TM
> The memory is divided between the slots equally (unless using fine-grained 
> resource control).
> This is sub-optimal, if some slots contain more memory intensive tasks than 
> the others.
> The proposal is to widen the scope of sharing memory to TM so that it can be 
> shared across all its RocksDB instances.
> That would allow to reduce the overall memory consuption in exchange for 
> resource isolation.
> h1. Proposed changes
> h2. Configuration
> - introduce "taskmanager.memory.managed.shared-fraction" (0..1, default 0)
> -- cluster-level (yaml only)
> -- the non-shared memory will be used as it is now (exclusively per-slot)
> - introduce "state.backend.memory.share-scope"
> -- job-level (yaml and StateBackend)
> -- possible values: NONE, SLOT, TASK_MANAGER
> -- default: not set
> -- override "state.backend.rocksdb.memory.fixed-per-slot" if both are set 
> (but don't deprecate it, because it specifies the size)
> - rely on the existing "state.backend.rocksdb.memory.managed" to decide 
> whether the shared memory is managed or unmanaged
> - when calculating TM-wise shared  memory, ignore 
> "taskmanager.memory.managed.consumer-weights" because RocksDB is the only 
> consumer so far
> - similarly, exclude StateBackend from weights calculations, so other 
> consumers (e.g. PYTHON) can better utilize exclusive slot memory
> - use cluster-level or default configuration when creating TM-wise shared 
> RocksDB objects, e.g.  "state.backend.rocksdb.memory.managed", 
> "state.backend.rocksdb.memory.write-buffer-ratio"
> h2. Example
> {code}
> taskmanager.memory.managed.size: 1gb
> taskmanager.memory.managed.shared-fraction: .75 # all slots share 750Mb of 
> shared managed memory
> taskmanager.numberOfTaskSlots: 10               # each task slot gets 25Mb of 
> exclusive managed memory
> cluster.fine-grained-resource-management.enabled: false
> job 1:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: true
> job 2:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: true
> job 3:
> state.backend.memory.share-scope: SLOT
> state.backend.rocksdb.memory.managed: true
> job 4:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> job 5:
> state.backend.memory.share-scope: TASK_MANAGER
> state.backend.rocksdb.memory.managed: false
> {code}
> Jobs 1 and 2 will use the same 750Mb of managed memory and will compete with 
> each other.
> Job 3 will only use exclusive slot memory (25mb per slot).
> Jobs 4 and 5 will use the same 750Mb of unmanaged memory and will compete 
> with each other.
> Python code (or other consumers) will be able to use up to 25mb per slot in 
> jobs 1,2,4,5.
> h2. Creating and sharing RocksDB objects
> Introduce sharedMemoryManager to TaskManager.
> Then, similarly to the current slot-wise sharing:
> - Memory manager manages OpaqueMemoryResource
> - Creation of Cache object is done from the backend code on the first call
> So flink-runtime doesn't have to depend on state backend.
> h2. Class loading and resolution
> RocksDB state backend is already a part of the distribution.
> However, if a job also includes it then classloader.resolve-order should be 
> set to parent-first to prevent conflicts.
> h2. Lifecycle
> The cache object should be destroyed on TM termnation; job or task completion 
> should NOT close it.
> h1. Testing
> One way to test that the same RocksDB cache is used is via RocksDB metrics.
> h1. Limitations
> - classloader.resolve-order=child-first is not supported
> - fine-grained-resource-management is not supported
> - only RocksDB will be able to use TM-wise shared memory; other consumers may 
> be adjusted later
> h1. Rejected alternatives
> - set total "fixed-per-slot" to a larger value, essentially overcommitting 
> unmanaged memory - doesn't work well in containerized environments (OOMErrors)
> - set numberOfTaskSlots=1 and allow sharing the same slot between any tasks - 
> requires more invasive changes in scheduler and TM
> cc: [~yunta], [~ym], [~liyu]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to