[ https://issues.apache.org/jira/browse/FLINK-18808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177056#comment-17177056 ]
Piotr Nowojski commented on FLINK-18808: ---------------------------------------- Hmmm, you are right. I understand the problem now. Thanks for pointing this out and sorry that I didn't get it immediately. {quote} Do I need to add a isOutputToTask(OutputTag<X> outputTag) method to determine whether it will be sent to another task? {quote} I have a feeling that this approach would be too indirect and maybe it could add some unnecessary overhead (maybe not). Have you seen the {{org.apache.flink.streaming.runtime.tasks.OperatorChain#createOutputCollector}} method? It looks like we could do some magic there. It has 3 steps: # collect non chained outputs ({{RecordWriterOutput}} # collect chained outputs # handle output selectors Let's first assume that there are no selectors. Currently for this case we are using either {{CopyingBroadcastingOutputCollector}} or {{BroadcastingOutputCollector}} on a flat {{asArray}} structure from combined chained and non chained outputs. It looks like we could calculate the {{numRecordsOut}} metric on this level. {{CopyingBroadcastingOutputCollector}} and {{BroadcastingOutputCollector}} would just bump the task level {{numRecordsOut}} metric on each {{#collect}} call. For the use case with the selectors it's a bit more tricky. But in the end it should be solvable, as {{OperatorChain}} is constructing {{DirectedOutput}} and {{OperatorChain}} knows which outputs are network and which are not, so it could pass this knowledge down to {{DirectedOutput}} and {{DirectedOutput}} could decide when to bump task level {{numRecordsOut}} counter, right? So we would bypass the {{CountingOutput}} altogether for the task level {{numRecordsOut}}. Optionally we could consider if we could just drop the {{CountingOutput}} also for the operator level {{numRecordsOut}}, and move this logic closer to the task level {{numRecordsOut}}. > Task-level numRecordsOut metric may be underestimated > ----------------------------------------------------- > > Key: FLINK-18808 > URL: https://issues.apache.org/jira/browse/FLINK-18808 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task > Affects Versions: 1.11.1 > Reporter: ming li > Assignee: ming li > Priority: Major > Labels: pull-request-available, usability > Attachments: image-2020-08-04-11-28-13-800.png, > image-2020-08-04-11-32-20-678.png, image-2020-08-13-18-36-13-282.png > > > At present, we only register task-level numRecordsOut metric by reusing > operator output record counter at the end of OperatorChain. > {code:java} > if (config.isChainEnd()) { > operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); > } > {code} > If we only send data out through the last operator of OperatorChain, there is > no problem with this statistics. But consider the following scenario: > !image-2020-08-04-11-28-13-800.png|width=507,height=174! > In this JobGraph, we not only send data in the last operator, but also send > data in the middle operator of OperatorChain (the map operator just returns > the original value directly). Below is one of our test topology, we can see > that the statistics actually only have half of the total data received by the > downstream. > !image-2020-08-04-11-32-20-678.png|width=648,height=251! > I think the data sent out by the intermediate operator should also be counted > into the numRecordsOut of the Task. But currently we are not reusing > operators output record counters in the intermediate operators, which leads > to our task-level numRecordsOut metric is underestimated (although this has > no effect on the actual operation of the job, it may affect our monitoring). > A simple idea of mine is to modify the condition of reusing operators > output record counter: > {code:java} > if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) { > operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); > }{code} > In addition, I have another question: If a record is broadcast to all > downstream, should the numRecordsOut counter increase by one or the > downstream number? It seems that currently we are adding one to calculate the > numRecordsOut metric. -- This message was sent by Atlassian Jira (v8.3.4#803005)