zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262784641
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##########
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
        public void emit(T record) throws IOException, InterruptedException {
                broadcastEmit(record);
        }
+
+       @Override
+       public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   Thanks for these kindly detail suggestions. You pointed out a very critical 
potential problem which I have not thought through before.
   
   Regarding with the `LatencyMarker`, I have not thought of a perfect solution 
yet.
   1. If finishing the `LatencyMarker` immediately, it could only reflect the 
latency before this marker, and lose a bit latency of current `BufferConsumer` 
containing `LatencyMarker`.
   
   2. If broadcasting the `LatencyMarker`, there are two concerns: First we 
might add the `LatencyMarkerID` in the marker like `CheckpointID`, and ignore 
the duplicate id in `StreamInputProcessor` which needs maintain a set of 
`LatencyMarkerID` for filter.  We could add this logic in processor to avoid 
gigantic avalanche of the marker. If to do so, the logic seems very easy for 
`BroadcastRecordWriter`, and the methods for `emit` and `randomEmit` are all 
processed in broadcast mode. But if we extend other elements to be `randomEmit` 
future, we might need add corresponding special handing logic like 
`LatencyMarker` then. The second concern is that broadcasting `LatencyMarker` 
still loses the latency precision compared with now. Currently we choose the 
random channel to send this marker, so it could reflect the average latency 
from statistics. But the broadcast mode could always reflect the latency of the 
fastest channel, because once the downstream receives the first marker, it 
would ignore the same following markers from other slow channels.
   
   Considering the above two options, I would prefer the first option because 
the latency lose seems less compared with broadcasting way, and we do not need 
to adjust the logic on receiver side (how many IDs to be maintained in memory 
might be another concern in this way). 
   
   Regarding with the first option, I agree with your suggestion of abstracting 
`AbstractRecordWriter` to maintain single `BufferBuilder` in 
`BroadcastRecordWriter`.  And  I could understand your concerns of the 
`randomTriggered` field. Your suggestion of finishing `LatencyMarker` 
immediately in `randomEmit` could avoid this field. I would like to have a try 
based on them if you also approve the first option. :) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to