ming li created FLINK-18808:
-------------------------------

             Summary: Task-level numRecordsOut metric may be underestimated
                 Key: FLINK-18808
                 URL: https://issues.apache.org/jira/browse/FLINK-18808
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Metrics
    Affects Versions: 1.11.1
         Environment: !image-2020-08-04-11-26-21-490.png!
            Reporter: ming li
         Attachments: image-2020-08-04-11-28-13-800.png, 
image-2020-08-04-11-32-20-678.png

At present, we only register task-level numRecordsOut metric by reusing 
operator output record counter at the end of OperatorChain.
{code:java}
//代码占位符
if (config.isChainEnd()) {
   operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
}
{code}
If we only send data out through the last operator of OperatorChain, there is 
no problem with this statistics. But consider the following scenario:

!image-2020-08-04-11-28-13-800.png|width=507,height=174!

In this JobGraph, we not only send data in the last operator, but also send 
data in the middle operator of OperatorChain (the map operator just returns the 
original value directly). Below is one of our test topology, we can see that 
the statistics actually only have half of the total data received by the 
downstream.

!image-2020-08-04-11-32-20-678.png|width=648,height=251!

I think the data sent out by the intermediate operator should also be counted 
into the numRecordsOut of the Task. But currently we are not reusing operators 
output record counters in the intermediate operators, which leads to our 
task-level numRecordsOut metric is underestimated (although this has no effect 
on the actual operation of the job, it may affect our monitoring).

A simple idea of ​​mine is to modify the condition of reusing operators output 
record counter:
{code:java}
//代码占位符
if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) {
   operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
}{code}
In addition, I have another question: If a record is broadcast to all 
downstream, should the numRecordsOut counter increase by one or the downstream 
number? It seems that currently we are adding one to calculate the 
numRecordsOut metric.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to