[ https://issues.apache.org/jira/browse/FLINK-21436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17294349#comment-17294349 ]
fanrui commented on FLINK-21436: -------------------------------- h1. Give some test results of increasing the `state.backend.fs.memory-threshold` configuration: When source parallelism is 2000 and kafka partition is 2000, try to increase `state.backend.fs.memory-threshold` = 20K, then the state of offset will be sent to JM through ByteStreamStateHandle, reducing the number of hdfs files. Unfortunately, after trying many times, the restore is still unsuccessful. The JM memory is 30G, and the heap memory is 22G. The JM GC pressure was very high during restore, resulting in a maximum CPU usage of over 3000%, and an average CPU usage of over 500%. The reasons for the ultimate failure are often: ``` akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@host:36509/user/taskmanager_0#1922704998]] after [60000 ms]. Message of type [org.apache.flink.runtime. rpc.messages.RemoteRpcInvocation]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. ``` Exception screenshot: !akka timeout Exception.png! FlameGraph: [^JM 启动火焰图.svg] Hope the community can complete similar tests, if there is time. The test scenario is relatively simple, it only needs to meet the following conditions: - source parallelism is greater than 2000 - kafka partition is greater than 2000 - Increase `state.backend.fs.memory-threshold` = 20K - Job processing logic can be as simple as possible I am very happy to provide more detailed test environment and conditions. If there is a problem with my testing process, I hope to correct me. > Speed up the restore of UnionListState > ---------------------------------------- > > Key: FLINK-21436 > URL: https://issues.apache.org/jira/browse/FLINK-21436 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / State Backends > Affects Versions: 1.13.0 > Reporter: fanrui > Priority: Major > Attachments: JM 启动火焰图.svg, akka timeout Exception.png > > > h1. 1. Problem introduction and cause analysis > Problem description: The duration of UnionListState restore under large > concurrency is more than 2 minutes. > h2. the reason: > 2000 subtasks write 2000 files during checkpoint, and each subtask needs to > read 2000 files during restore. > 2000*2000 = 4 million, so 4 million small files need to be read to hdfs > during restore. HDFS has become a bottleneck, causing restore to be > particularly time-consuming. > h1. 2. Optimize ideas > Under normal circumstances, the UnionListState state is relatively small. > Typical usage scenario: Kafka offset information. > When restoring, JM can directly read all 2000 small files, merge > UnionListState into a byte array and send it to all TMs to avoid frequent > access to hdfs by TMs. > h1. 3. Benefits after optimization > Before optimization: 2000 concurrent, Kafka offset restore takes 90~130 s. > After optimization: 2000 concurrent, Kafka offset restore takes less than 1s. > h1. 4. Risk points > Too big UnionListState leads to too much pressure on JM. > Solution 1: > Add configuration and decide whether to enable this feature. The default is > false, which means the old plan is used. When the user is set to true, JM > will merge. > Solution 2: > The above configuration is not required, which is equivalent to enabling > merge by default. > JM detects the size of the state before merge, and if it is less than the > threshold, the state is considered to be relatively small, and the state is > sent to all TMs through ByteStreamStateHandle. > If the threshold is exceeded, the state is considered to be greater. At this > time, write an hdfs file, and send FileStateHandle to all TMs, and TM can > read this file. > > Note: Most of the scenarios where Flink uses UnionListState are Kafka offset > (small state). In theory, most jobs are risk-free. -- This message was sent by Atlassian Jira (v8.3.4#803005)