[ https://issues.apache.org/jira/browse/FLINK-18473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785621#comment-17785621 ]
Yi Zhang commented on FLINK-18473: ---------------------------------- [~yunta] [~fanrui] the current logic of picking from multiple directories for a rocksdb instance is a bit misleading. the name lazyInitializeForJob and nextDirectory seem to suggest the intention is to only initialize once for each task manager with a random pick of initial index, then index+1 for each instance, but that is not the case. If it is as it is intended at least user have the choice of setting as many slots per taskmanager as many disks so the load is evenly distributed. in that regard can we fix the current implementation? it is not friendly for small number of disks. Besides if state.backend.rocksdb.localdir is not set, on yarn it falls back to yarn.nodemanager.local-dirs, and this logic is completely ignored as it relies on yarn's container working directory. I saw this blog by [~fanrui] awhile back [https://www.alibabacloud.com/blog/optimization-of-apache-flink-for-large-state-scenarios_597062], can the change to contributed to the open source? or we can small fix the current implementation so that within each taskmanager slots are round-robined with random initial nextDirectory? > Optimize RocksDB disk load balancing strategy > --------------------------------------------- > > Key: FLINK-18473 > URL: https://issues.apache.org/jira/browse/FLINK-18473 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends > Affects Versions: 1.12.0 > Reporter: Rui Fan > Priority: Minor > Labels: auto-deprioritized-major, stale-minor > > In general, bigdata servers have many disks. For large-state jobs, if > multiple slots are running on a TM, then each slot will create a RocksDB > instance. We hope that multiple RocksDB instances use different disks to > achieve load balancing. > h3. The problem of current load balancing strategy: > When the current RocksDB is initialized, a random value nextDirectory is > generated according to the number of RocksDB dir: [code > link|https://github.com/apache/flink/blob/2d371eb5ac9a3e485d3665cb9a740c65e2ba2ac6/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java#L441] > {code:java} > nextDirectory = new Random().nextInt(initializedDbBasePaths.length); > {code} > Different slots generate different RocksDBStateBackend objects, so each slot > will generate its own *nextDirectory*. The random algorithm used here, so the > random value generated by different slots may be the same. For example: the > current RocksDB dir is configured with 10 disks, the *nextDirectory* > generated by slot0 and slot1 are both 5, then slot0 and slot1 will use the > same disk. This disk will be under a lot of pressure, other disks will not be > under pressure. > h3. Optimization ideas: > *{{nextDirectory}}* should belong to slot sharing, the initial value of > *{{nextDirectory}}* cannot be 0, it is still generated by random. But define > *nextDirectory* as +_{{static AtomicInteger()}}_+ and execute > +_{{nextDirectory.incrementAndGet()}}_+ every time RocksDBKeyedStateBackend > is applied for. > {{nextDirectory}} takes the remainder of {{initializedDbBasePaths.length}} to > decide which disk to use. > Is there any problem with the above ideas? > -- This message was sent by Atlassian Jira (v8.20.10#820010)