[ https://issues.apache.org/jira/browse/FLINK-21884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454589#comment-17454589 ]
ZhuoYu Chen edited comment on FLINK-21884 at 12/7/21, 11:16 AM: ---------------------------------------------------------------- [~trohrmann] HDFS cluster with the growth of time, there will inevitably be some "performance degradation" of the node, mainly manifested as disk read and write slow, network transmission slow, we collectively call these nodes slow nodes. When a cluster grows to a certain size, such as a cluster of thousands of nodes, slow nodes are usually not easily detected. Most of the time, slow nodes are hidden in many healthy nodes, and only when the client frequently accesses these problematic nodes and finds that the read/write is slow, will it be perceived. *Network slow monitoring principle* The principle of monitoring a DN network transmission slowdown is to record the data transmission time between each DN in the cluster, find out the abnormal value and report it to NN as a slow node, under normal circumstances the transmission rate between nodes is basically the same, not too much difference, if there is an abnormal transmission time of A to B, A will report B to NN as a slow node, NN side will determine whether there is a slow node after aggregating all the slow node reports. The NN will determine whether there is a slow node after aggregating all the slow node reports. For example, if node X is a faulty node, then there must be many nodes reporting to NN that X is a slow node, and eventually there are similar slow node reports at the NN side, indicating that X is a slow node. In order to calculate the average elapsed time of data transmission from DN to downstream, DN maintains a HashMap<String,Queue<SampleStat>>, the key of which is the ip of the downstream DN and the value is a queue of SampleStat objects. The number of packets and the total elapsed time are recorded. By default, the DN does a snapshot every five minutes to generate a SampleStat, which is used to record the network situation of data transmission to the downstream DN in these five minutes and stored in the Queue of the HashMap. The Queue of HashMap is a fixed-size queue with a queue length of 36. If the queue is full, it will kick off the first member of the queue to add the new SampleStat to the end of the queue. This means that we will only monitor the last 3 hours (36 X 5 = 180min) of network transmission, in order to ensure that the monitoring data is time-sensitive. was (Author: monster#12): [~trohrmann] HDFS cluster with the growth of time, there will inevitably be some "performance degradation" of the node, mainly manifested as disk read and write slow, network transmission slow, we collectively call these nodes slow nodes. When a cluster grows to a certain size, such as a cluster of thousands of nodes, slow nodes are usually not easily detected. Most of the time, slow nodes are hidden in many healthy nodes, and only when the client frequently accesses these problematic nodes and finds that the read/write is slow, will it be perceived. The principle of monitoring a DN network transmission slowdown is to record the data transmission time between each DN in the cluster, find out the abnormal value and report it to NN as a slow node, under normal circumstances the transmission rate between nodes is basically the same, not too much difference, if there is an abnormal transmission time of A to B, A will report B to NN as a slow node, NN side will determine whether there is a slow node after aggregating all the slow node reports. The NN will determine whether there is a slow node after aggregating all the slow node reports. For example, if node X is a faulty node, then there must be many nodes reporting to NN that X is a slow node, and eventually there are similar slow node reports at the NN side, indicating that X is a slow node. In order to calculate the average elapsed time of data transmission from DN to downstream, DN maintains a HashMap<String,Queue<SampleStat>>, the key of which is the ip of the downstream DN and the value is a queue of SampleStat objects. The number of packets and the total elapsed time are recorded. By default, the DN does a snapshot every five minutes to generate a SampleStat, which is used to record the network situation of data transmission to the downstream DN in these five minutes and stored in the Queue of the HashMap. The Queue of HashMap is a fixed-size queue with a queue length of 36. If the queue is full, it will kick off the first member of the queue to add the new SampleStat to the end of the queue. This means that we will only monitor the last 3 hours (36 X 5 = 180min) of network transmission, in order to ensure that the monitoring data is time-sensitive. > Reduce TaskManager failure detection time > ----------------------------------------- > > Key: FLINK-21884 > URL: https://issues.apache.org/jira/browse/FLINK-21884 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination > Affects Versions: 1.14.0, 1.13.2 > Reporter: Robert Metzger > Priority: Critical > Labels: reactive > Fix For: 1.15.0 > > Attachments: image-2021-03-19-20-10-40-324.png > > > In Flink 1.13 (and older versions), TaskManager failures stall the processing > for a significant amount of time, even though the system gets indications for > the failure almost immediately through network connection losses. > This is due to a high (default) heartbeat timeout of 50 seconds [1] to > accommodate for GC pauses, transient network disruptions or generally slow > environments (otherwise, we would unregister a healthy TaskManager). > Such a high timeout can lead to disruptions in the processing (no processing > for certain periods, high latencies, buildup of consumer lag etc.). In > Reactive Mode (FLINK-10407), the issue surfaces on scale-down events, where > the loss of a TaskManager is immediately visible in the logs, but the job is > stuck in "FAILING" for quite a while until the TaskManger is really > deregistered. (Note that this issue is not that critical in a autoscaling > setup, because Flink can control the scale-down events and trigger them > proactively) > On the attached metrics dashboard, one can see that the job has significant > throughput drops / consumer lags during scale down (and also CPU usage spikes > on processing the queued events, leading to incorrect scale up events again). > !image-2021-03-19-20-10-40-324.png|thumbnail! > One idea to solve this problem is to: > - Score TaskManagers based on certain signals (# exceptions reported, > exception types (connection losses, akka failures), failure frequencies, > ...) and blacklist them accordingly. > - Introduce a best-effort TaskManager unregistration mechanism: When a > TaskManager receives a sigterm, it sends a final message to the JobManager > saying "goodbye", and the JobManager can immediately remove the TM from its > bookkeeping. > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#heartbeat-timeout -- This message was sent by Atlassian Jira (v8.20.1#820001)