zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r262784641
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java ########## @@ -44,4 +52,67 @@ public BroadcastRecordWriter( public void emit(T record) throws IOException, InterruptedException { broadcastEmit(record); } + + @Override + public void broadcastEmit(T record) throws IOException, InterruptedException { Review comment: Thanks for these kindly detail suggestions. You pointed out a very critical potential problem which I have not thought through before. Regarding with the `LatencyMarker`, I have not thought of a perfect solution yet. 1. If finishing the `LatencyMarker` immediately, it could only reflect the latency before this marker, and lose a bit latency of current `BufferConsumer` containing `LatencyMarker`. 2. If broadcasting the `LatencyMarker`, there are two concerns: First we might add the `LatencyMarkerID` in the marker like `CheckpointID`, and ignore the duplicate id in `StreamInputProcessor` which needs maintain a set of `LatencyMarkerID` for filter. We could add this logic in processor to avoid gigantic avalanche of the marker. If to do so, the logic seems very easy for `BroadcastRecordWriter`, and the methods for `emit` and `randomEmit` are all processed in broadcast mode. But if we extend other elements to be `randomEmit` future, we might need add corresponding special handing logic like `LatencyMarker` then. The second concern is that broadcasting `LatencyMarker` still loses the latency precision compared with now. Currently we choose the random channel to send this marker, so it could reflect the average latency from statistics. But the broadcast mode could always reflect the latency of the fastest channel, because once the downstream receives the first marker, it would ignore the same following markers from other slow channels. Considering the above two options, I would prefer the first option because the latency lose seems less compared with broadcasting way, and we do not need to adjust the logic on receiver side (how many IDs to be maintained in memory might be another concern in this way). Regarding with the first option, I agree with your suggestion of abstracting `AbstractRecordWriter` to maintain single `BufferBuilder` in `BroadcastRecordWriter`. And I could understand your concerns of the `randomTriggered` field. Your suggestion of finishing `LatencyMarker` immediately in `randomEmit` could avoid this field. I would like to have a try based on them if you also approve the first option. :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services