[ 
https://issues.apache.org/jira/browse/FLINK-18808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177056#comment-17177056
 ] 

Piotr Nowojski commented on FLINK-18808:
----------------------------------------

Hmmm, you are right. I understand the problem now. Thanks for pointing this out 
and sorry that I didn't get it immediately.
{quote}
Do I need to add a isOutputToTask(OutputTag<X> outputTag) method to determine 
whether it will be sent to another task? 
{quote}
I have a feeling that this approach would be too indirect and maybe it could 
add some unnecessary overhead (maybe not).

Have you seen the 
{{org.apache.flink.streaming.runtime.tasks.OperatorChain#createOutputCollector}}
 method? It looks like we could do some magic there. It has 3 steps:
# collect non chained outputs ({{RecordWriterOutput}}
# collect chained outputs
# handle output selectors

Let's first assume that there are no selectors. Currently for this case we are 
using either {{CopyingBroadcastingOutputCollector}} or 
{{BroadcastingOutputCollector}} on a flat {{asArray}} structure from combined 
chained and non chained outputs. 

It looks like we could calculate the {{numRecordsOut}} metric on this level. 
{{CopyingBroadcastingOutputCollector}} and {{BroadcastingOutputCollector}} 
would just bump the task level {{numRecordsOut}} metric on each {{#collect}} 
call.

For the use case with the selectors it's a bit more tricky. But in the end it 
should be solvable, as {{OperatorChain}} is constructing {{DirectedOutput}} and 
{{OperatorChain}} knows which outputs are network and which are not, so it 
could pass this knowledge down to {{DirectedOutput}} and {{DirectedOutput}} 
could decide when to bump task level {{numRecordsOut}} counter, right?

So we would bypass the {{CountingOutput}} altogether for the task level 
{{numRecordsOut}}. Optionally we could consider if we could just drop the 
{{CountingOutput}} also for the operator level {{numRecordsOut}}, and move this 
logic closer to the task level {{numRecordsOut}}.

> Task-level numRecordsOut metric may be underestimated
> -----------------------------------------------------
>
>                 Key: FLINK-18808
>                 URL: https://issues.apache.org/jira/browse/FLINK-18808
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Metrics, Runtime / Task
>    Affects Versions: 1.11.1
>            Reporter: ming li
>            Assignee: ming li
>            Priority: Major
>              Labels: pull-request-available, usability
>         Attachments: image-2020-08-04-11-28-13-800.png, 
> image-2020-08-04-11-32-20-678.png, image-2020-08-13-18-36-13-282.png
>
>
> At present, we only register task-level numRecordsOut metric by reusing 
> operator output record counter at the end of OperatorChain.
> {code:java}
> if (config.isChainEnd()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }
> {code}
> If we only send data out through the last operator of OperatorChain, there is 
> no problem with this statistics. But consider the following scenario:
> !image-2020-08-04-11-28-13-800.png|width=507,height=174!
> In this JobGraph, we not only send data in the last operator, but also send 
> data in the middle operator of OperatorChain (the map operator just returns 
> the original value directly). Below is one of our test topology, we can see 
> that the statistics actually only have half of the total data received by the 
> downstream.
> !image-2020-08-04-11-32-20-678.png|width=648,height=251!
> I think the data sent out by the intermediate operator should also be counted 
> into the numRecordsOut of the Task. But currently we are not reusing 
> operators output record counters in the intermediate operators, which leads 
> to our task-level numRecordsOut metric is underestimated (although this has 
> no effect on the actual operation of the job, it may affect our monitoring).
> A simple idea of ​​mine is to modify the condition of reusing operators 
> output record counter:
> {code:java}
> if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }{code}
> In addition, I have another question: If a record is broadcast to all 
> downstream, should the numRecordsOut counter increase by one or the 
> downstream number? It seems that currently we are adding one to calculate the 
> numRecordsOut metric.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to