
ming li commented on FLINK-18808:

My proposal is to group {{Output}} according to {{OutputTag}} in 
{{BroadcastingOutputCollector}}.At present, we put all {{Output}} in an array 
and cannot distinguish whether it is {{RecordWriterOutput}} or 

protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> chainedOutput;
protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> 
nonChainedOutput;   {code}

The {{collect}} method will be implemented like this:
public void collect(StreamRecord<T> record) {
   if (!nonChainedOutput.getOrDefault(placeHolder, 
Collections.emptyList()).isEmpty()) {
      for (Output<StreamRecord<T>> output : nonChainedOutput.get(placeHolder)) {
   for (Output<StreamRecord<T>> output : 
chainedOutput.getOrDefault(placeHolder, Collections.emptyList())) {

public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
   if (!nonChainedOutput.getOrDefault(outputTag, 
Collections.emptyList()).isEmpty()) {
      for (Output<StreamRecord<T>> output : nonChainedOutput.get(outputTag)) {
         output.collect(outputTag, record);
   for (Output<StreamRecord<T>> output : chainedOutput.getOrDefault(outputTag, 
Collections.emptyList())) {
      output.collect(outputTag, record);
Through grouping, there are the following benefits:
1. We can confirm whether the record can be sent in the 
{{BroadcastingOutputCollector}} level.
2. No need to add or modify the definition of the interface. The only thing 
needed is to modify the implementation of {{BroadcastingOutputCollector}} 
(currently {{BroadcastingOutputCollector}} is only used in {{OperatorChain}},  
which will have little impact).
{quote}Maybe the best would be to switch to {{numRecordsSent}} (that would also 
include broadcasted fan out) and ignore all of this complexity?
Yes, if it is to count the actual number of records sent, this will become 
simple. We can directly count the number of records sent in 
{{RecordWriterOutput}} or {{RecordWriter}}. But it may cause some confusion. 
For example, the operator-level {{numRecordsOut}} still maintains the semantics 
of "{{numRecordsProduced"}}, while the task-level uses the semantics of 
"{{numRecordsSent"}}. Maybe in the future we can add {{numRecordsSent}} metric 
for {{operator}} and {{task.:)}}

> Task-level numRecordsOut metric may be underestimated
> -----------------------------------------------------
>                 Key: FLINK-18808
>                 URL: https://issues.apache.org/jira/browse/FLINK-18808
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Metrics, Runtime / Task
>    Affects Versions: 1.11.1
>            Reporter: ming li
>            Assignee: ming li
>            Priority: Major
>              Labels: pull-request-available, usability
>         Attachments: image-2020-08-04-11-28-13-800.png, 
> image-2020-08-04-11-32-20-678.png, image-2020-08-13-18-36-13-282.png
> At present, we only register task-level numRecordsOut metric by reusing 
> operator output record counter at the end of OperatorChain.
> {code:java}
> if (config.isChainEnd()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }
> {code}
> If we only send data out through the last operator of OperatorChain, there is 
> no problem with this statistics. But consider the following scenario:
> !image-2020-08-04-11-28-13-800.png|width=507,height=174!
> In this JobGraph, we not only send data in the last operator, but also send 
> data in the middle operator of OperatorChain (the map operator just returns 
> the original value directly). Below is one of our test topology, we can see 
> that the statistics actually only have half of the total data received by the 
> downstream.
> !image-2020-08-04-11-32-20-678.png|width=648,height=251!
> I think the data sent out by the intermediate operator should also be counted 
> into the numRecordsOut of the Task. But currently we are not reusing 
> operators output record counters in the intermediate operators, which leads 
> to our task-level numRecordsOut metric is underestimated (although this has 
> no effect on the actual operation of the job, it may affect our monitoring).
> A simple idea of ​​mine is to modify the condition of reusing operators 
> output record counter:
> {code:java}
> if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }{code}
> In addition, I have another question: If a record is broadcast to all 
> downstream, should the numRecordsOut counter increase by one or the 
> downstream number? It seems that currently we are adding one to calculate the 
> numRecordsOut metric.

This message was sent by Atlassian Jira

Reply via email to