[ https://issues.apache.org/jira/browse/FLINK-18808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177624#comment-17177624 ]
ming li edited comment on FLINK-18808 at 8/14/20, 9:14 AM: ----------------------------------------------------------- My proposal is to group {{Output}} according to {{OutputTag}} in {{BroadcastingOutputCollector}}. At present, we put all {{Output}} in an array and cannot distinguish whether it is {{RecordWriterOutput}} or {{ChainingOutput}}. {code:java} protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> chainedOutput; protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> nonChainedOutput; {code} The {{collect}} method will be implemented like this: {code:java} @Override public void collect(StreamRecord<T> record) { if (!nonChainedOutput.getOrDefault(placeHolder, Collections.emptyList()).isEmpty()) { numRecordOut.inc(); for (Output<StreamRecord<T>> output : nonChainedOutput.get(placeHolder)) { output.collect(record); } } for (Output<StreamRecord<T>> output : chainedOutput.getOrDefault(placeHolder, Collections.emptyList())) { output.collect(record); } } @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { if (!nonChainedOutput.getOrDefault(outputTag, Collections.emptyList()).isEmpty()) { numRecordOut.inc(); for (Output<StreamRecord<T>> output : nonChainedOutput.get(outputTag)) { output.collect(outputTag, record); } } for (Output<StreamRecord<T>> output : chainedOutput.getOrDefault(outputTag, Collections.emptyList())) { output.collect(outputTag, record); } }{code} Through grouping, there are the following benefits: 1. We can confirm whether the record can be sent in the {{BroadcastingOutputCollector}} level. 2. No need to add or modify the definition of the interface. The only thing needed is to modify the implementation of {{BroadcastingOutputCollector}} (currently {{BroadcastingOutputCollector}} is only used in {{OperatorChain}}, which will have little impact). {quote}Maybe the best would be to switch to {{numRecordsSent}} (that would also include broadcasted fan out) and ignore all of this complexity? {quote} Yes, if it is to count the actual number of records sent, this will become simple. We can directly count the number of records sent in {{RecordWriterOutput}} or {{RecordWriter}}. But it may cause some confusion. For example, the operator-level {{numRecordsOut}} still maintains the semantics of "{{numRecordsProduced"}}, while the task-level uses the semantics of "{{numRecordsSent"}}. Maybe in the future we can add {{numRecordsSent}} metric for {{operator}} and {{task.:)}} was (Author: ming li): My proposal is to group {{Output}} according to {{OutputTag}} in {{BroadcastingOutputCollector}}.At present, we put all {{Output}} in an array and cannot distinguish whether it is {{RecordWriterOutput}} or {{ChainingOutput}}. {code:java} protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> chainedOutput; protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> nonChainedOutput; {code} The {{collect}} method will be implemented like this: {code:java} @Override public void collect(StreamRecord<T> record) { if (!nonChainedOutput.getOrDefault(placeHolder, Collections.emptyList()).isEmpty()) { numRecordOut.inc(); for (Output<StreamRecord<T>> output : nonChainedOutput.get(placeHolder)) { output.collect(record); } } for (Output<StreamRecord<T>> output : chainedOutput.getOrDefault(placeHolder, Collections.emptyList())) { output.collect(record); } } @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { if (!nonChainedOutput.getOrDefault(outputTag, Collections.emptyList()).isEmpty()) { numRecordOut.inc(); for (Output<StreamRecord<T>> output : nonChainedOutput.get(outputTag)) { output.collect(outputTag, record); } } for (Output<StreamRecord<T>> output : chainedOutput.getOrDefault(outputTag, Collections.emptyList())) { output.collect(outputTag, record); } }{code} Through grouping, there are the following benefits: 1. We can confirm whether the record can be sent in the {{BroadcastingOutputCollector}} level. 2. No need to add or modify the definition of the interface. The only thing needed is to modify the implementation of {{BroadcastingOutputCollector}} (currently {{BroadcastingOutputCollector}} is only used in {{OperatorChain}}, which will have little impact). {quote}Maybe the best would be to switch to {{numRecordsSent}} (that would also include broadcasted fan out) and ignore all of this complexity? {quote} Yes, if it is to count the actual number of records sent, this will become simple. We can directly count the number of records sent in {{RecordWriterOutput}} or {{RecordWriter}}. But it may cause some confusion. For example, the operator-level {{numRecordsOut}} still maintains the semantics of "{{numRecordsProduced"}}, while the task-level uses the semantics of "{{numRecordsSent"}}. Maybe in the future we can add {{numRecordsSent}} metric for {{operator}} and {{task.:)}} > Task-level numRecordsOut metric may be underestimated > ----------------------------------------------------- > > Key: FLINK-18808 > URL: https://issues.apache.org/jira/browse/FLINK-18808 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics, Runtime / Task > Affects Versions: 1.11.1 > Reporter: ming li > Assignee: ming li > Priority: Major > Labels: pull-request-available, usability > Attachments: image-2020-08-04-11-28-13-800.png, > image-2020-08-04-11-32-20-678.png, image-2020-08-13-18-36-13-282.png > > > At present, we only register task-level numRecordsOut metric by reusing > operator output record counter at the end of OperatorChain. > {code:java} > if (config.isChainEnd()) { > operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); > } > {code} > If we only send data out through the last operator of OperatorChain, there is > no problem with this statistics. But consider the following scenario: > !image-2020-08-04-11-28-13-800.png|width=507,height=174! > In this JobGraph, we not only send data in the last operator, but also send > data in the middle operator of OperatorChain (the map operator just returns > the original value directly). Below is one of our test topology, we can see > that the statistics actually only have half of the total data received by the > downstream. > !image-2020-08-04-11-32-20-678.png|width=648,height=251! > I think the data sent out by the intermediate operator should also be counted > into the numRecordsOut of the Task. But currently we are not reusing > operators output record counters in the intermediate operators, which leads > to our task-level numRecordsOut metric is underestimated (although this has > no effect on the actual operation of the job, it may affect our monitoring). > A simple idea of mine is to modify the condition of reusing operators > output record counter: > {code:java} > if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) { > operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); > }{code} > In addition, I have another question: If a record is broadcast to all > downstream, should the numRecordsOut counter increase by one or the > downstream number? It seems that currently we are adding one to calculate the > numRecordsOut metric. -- This message was sent by Atlassian Jira (v8.3.4#803005)