[ 
https://issues.apache.org/jira/browse/FLINK-18473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785621#comment-17785621
 ] 

Yi Zhang commented on FLINK-18473:
----------------------------------

[~yunta] [~fanrui] the current logic of picking from multiple directories for a 
rocksdb instance is a bit misleading. the name
lazyInitializeForJob and nextDirectory seem to suggest the intention is to only 
initialize once for each task manager with a random pick of initial index, then 
index+1 for each instance, but that is not the case. If it is as it is intended 
at least user have the choice of setting as many slots per taskmanager as many 
disks so the load is evenly distributed. in that regard can we fix the current 
implementation? it is not friendly for small number of disks. Besides if 
state.backend.rocksdb.localdir is not set, on yarn it falls back to 
yarn.nodemanager.local-dirs, and this logic is completely ignored as it relies 
on yarn's container working directory.  
 
I saw this blog by [~fanrui]  awhile back 
[https://www.alibabacloud.com/blog/optimization-of-apache-flink-for-large-state-scenarios_597062],
 can the change to contributed to the open source? or we can small fix the 
current implementation so that within each taskmanager slots are round-robined 
with random initial nextDirectory? 

> Optimize RocksDB disk load balancing strategy
> ---------------------------------------------
>
>                 Key: FLINK-18473
>                 URL: https://issues.apache.org/jira/browse/FLINK-18473
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>    Affects Versions: 1.12.0
>            Reporter: Rui Fan
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor
>
> In general, bigdata servers have many disks. For large-state jobs, if 
> multiple slots are running on a TM, then each slot will create a RocksDB 
> instance. We hope that multiple RocksDB instances use different disks to 
> achieve load balancing.
> h3. The problem of current load balancing strategy:
> When the current RocksDB is initialized, a random value nextDirectory is 
> generated according to the number of RocksDB dir: [code 
> link|https://github.com/apache/flink/blob/2d371eb5ac9a3e485d3665cb9a740c65e2ba2ac6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L441]
> {code:java}
> nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
> {code}
> Different slots generate different RocksDBStateBackend objects, so each slot 
> will generate its own *nextDirectory*. The random algorithm used here, so the 
> random value generated by different slots may be the same. For example: the 
> current RocksDB dir is configured with 10 disks, the *nextDirectory* 
> generated by slot0 and slot1 are both 5, then slot0 and slot1 will use the 
> same disk. This disk will be under a lot of pressure, other disks will not be 
> under pressure.
> h3. Optimization ideas:
> *{{nextDirectory}}* should belong to slot sharing, the initial value of 
> *{{nextDirectory}}* cannot be 0, it is still generated by random. But define 
> *nextDirectory* as +_{{static AtomicInteger()}}_+ and execute 
> +_{{nextDirectory.incrementAndGet()}}_+ every time RocksDBKeyedStateBackend 
> is applied for. 
> {{nextDirectory}} takes the remainder of {{initializedDbBasePaths.length}} to 
> decide which disk to use.
> Is there any problem with the above ideas?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to