[ 
https://issues.apache.org/jira/browse/FLINK-18808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177624#comment-17177624
 ] 

ming li edited comment on FLINK-18808 at 8/14/20, 9:14 AM:
-----------------------------------------------------------

My proposal is to group {{Output}} according to {{OutputTag}} in 
{{BroadcastingOutputCollector}}. At present, we put all {{Output}} in an array 
and cannot distinguish whether it is {{RecordWriterOutput}} or 
{{ChainingOutput}}. 
{code:java}
protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> chainedOutput;
protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> 
nonChainedOutput;   {code}
The {{collect}} method will be implemented like this:
{code:java}
@Override
public void collect(StreamRecord<T> record) {
   if (!nonChainedOutput.getOrDefault(placeHolder, 
Collections.emptyList()).isEmpty()) {
      numRecordOut.inc();
      for (Output<StreamRecord<T>> output : nonChainedOutput.get(placeHolder)) {
         output.collect(record);
      }
   }
   
   for (Output<StreamRecord<T>> output : 
chainedOutput.getOrDefault(placeHolder, Collections.emptyList())) {
      output.collect(record);
   }
}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
   if (!nonChainedOutput.getOrDefault(outputTag, 
Collections.emptyList()).isEmpty()) {
      numRecordOut.inc();
      for (Output<StreamRecord<T>> output : nonChainedOutput.get(outputTag)) {
         output.collect(outputTag, record);
      }
   }
   
   for (Output<StreamRecord<T>> output : chainedOutput.getOrDefault(outputTag, 
Collections.emptyList())) {
      output.collect(outputTag, record);
   }
}{code}
Through grouping, there are the following benefits:
 1. We can confirm whether the record can be sent in the 
{{BroadcastingOutputCollector}} level.
 2. No need to add or modify the definition of the interface. The only thing 
needed is to modify the implementation of {{BroadcastingOutputCollector}} 
(currently {{BroadcastingOutputCollector}} is only used in {{OperatorChain}},  
which will have little impact).
{quote}Maybe the best would be to switch to {{numRecordsSent}} (that would also 
include broadcasted fan out) and ignore all of this complexity?
{quote}
Yes, if it is to count the actual number of records sent, this will become 
simple. We can directly count the number of records sent in 
{{RecordWriterOutput}} or {{RecordWriter}}. But it may cause some confusion. 
For example, the operator-level {{numRecordsOut}} still maintains the semantics 
of "{{numRecordsProduced"}}, while the task-level uses the semantics of 
"{{numRecordsSent"}}. Maybe in the future we can add {{numRecordsSent}} metric 
for {{operator}} and {{task.:)}}


was (Author: ming li):
My proposal is to group {{Output}} according to {{OutputTag}} in 
{{BroadcastingOutputCollector}}.At present, we put all {{Output}} in an array 
and cannot distinguish whether it is {{RecordWriterOutput}} or 
{{ChainingOutput}}. 

 
{code:java}
protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> chainedOutput;
protected final Map<OutputTag<T>, List<Output<StreamRecord<T>>>> 
nonChainedOutput;   {code}
 

The {{collect}} method will be implemented like this:
{code:java}
@Override
public void collect(StreamRecord<T> record) {
   if (!nonChainedOutput.getOrDefault(placeHolder, 
Collections.emptyList()).isEmpty()) {
      numRecordOut.inc();
      for (Output<StreamRecord<T>> output : nonChainedOutput.get(placeHolder)) {
         output.collect(record);
      }
   }
   
   for (Output<StreamRecord<T>> output : 
chainedOutput.getOrDefault(placeHolder, Collections.emptyList())) {
      output.collect(record);
   }
}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
   if (!nonChainedOutput.getOrDefault(outputTag, 
Collections.emptyList()).isEmpty()) {
      numRecordOut.inc();
      for (Output<StreamRecord<T>> output : nonChainedOutput.get(outputTag)) {
         output.collect(outputTag, record);
      }
   }
   
   for (Output<StreamRecord<T>> output : chainedOutput.getOrDefault(outputTag, 
Collections.emptyList())) {
      output.collect(outputTag, record);
   }
}{code}
Through grouping, there are the following benefits:
1. We can confirm whether the record can be sent in the 
{{BroadcastingOutputCollector}} level.
2. No need to add or modify the definition of the interface. The only thing 
needed is to modify the implementation of {{BroadcastingOutputCollector}} 
(currently {{BroadcastingOutputCollector}} is only used in {{OperatorChain}},  
which will have little impact).
{quote}Maybe the best would be to switch to {{numRecordsSent}} (that would also 
include broadcasted fan out) and ignore all of this complexity?
{quote}
Yes, if it is to count the actual number of records sent, this will become 
simple. We can directly count the number of records sent in 
{{RecordWriterOutput}} or {{RecordWriter}}. But it may cause some confusion. 
For example, the operator-level {{numRecordsOut}} still maintains the semantics 
of "{{numRecordsProduced"}}, while the task-level uses the semantics of 
"{{numRecordsSent"}}. Maybe in the future we can add {{numRecordsSent}} metric 
for {{operator}} and {{task.:)}}

> Task-level numRecordsOut metric may be underestimated
> -----------------------------------------------------
>
>                 Key: FLINK-18808
>                 URL: https://issues.apache.org/jira/browse/FLINK-18808
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Metrics, Runtime / Task
>    Affects Versions: 1.11.1
>            Reporter: ming li
>            Assignee: ming li
>            Priority: Major
>              Labels: pull-request-available, usability
>         Attachments: image-2020-08-04-11-28-13-800.png, 
> image-2020-08-04-11-32-20-678.png, image-2020-08-13-18-36-13-282.png
>
>
> At present, we only register task-level numRecordsOut metric by reusing 
> operator output record counter at the end of OperatorChain.
> {code:java}
> if (config.isChainEnd()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }
> {code}
> If we only send data out through the last operator of OperatorChain, there is 
> no problem with this statistics. But consider the following scenario:
> !image-2020-08-04-11-28-13-800.png|width=507,height=174!
> In this JobGraph, we not only send data in the last operator, but also send 
> data in the middle operator of OperatorChain (the map operator just returns 
> the original value directly). Below is one of our test topology, we can see 
> that the statistics actually only have half of the total data received by the 
> downstream.
> !image-2020-08-04-11-32-20-678.png|width=648,height=251!
> I think the data sent out by the intermediate operator should also be counted 
> into the numRecordsOut of the Task. But currently we are not reusing 
> operators output record counters in the intermediate operators, which leads 
> to our task-level numRecordsOut metric is underestimated (although this has 
> no effect on the actual operation of the job, it may affect our monitoring).
> A simple idea of ​​mine is to modify the condition of reusing operators 
> output record counter:
> {code:java}
> if (!config.getNonChainedOutputs(getUserCodeClassloader()).isEmpty()) {
>    operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
> }{code}
> In addition, I have another question: If a record is broadcast to all 
> downstream, should the numRecordsOut counter increase by one or the 
> downstream number? It seems that currently we are adding one to calculate the 
> numRecordsOut metric.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to