zhijiangW commented on a change in pull request #9483: [FLINK-13767][task] 
Refactor StreamInputProcessor#processInput based on InputStatus
URL: https://github.com/apache/flink/pull/9483#discussion_r323659128
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 ##########
 @@ -121,18 +115,46 @@ public StreamTwoInputProcessor(
                        taskManagerConfig,
                        taskName);
                checkState(checkpointedInputGates.length == 2);
-               this.input1 = new 
StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager, 
0);
-               this.input2 = new 
StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager, 
1);
 
-               this.statusWatermarkValve1 = new StatusWatermarkValve(
-                       unionedInputGate1.getNumberOfInputChannels(),
-                       new ForwardingValveOutputHandler(streamOperator, lock, 
streamStatusMaintainer, input1WatermarkGauge, 0));
-               this.statusWatermarkValve2 = new StatusWatermarkValve(
-                       unionedInputGate2.getNumberOfInputChannels(),
-                       new ForwardingValveOutputHandler(streamOperator, lock, 
streamStatusMaintainer, input2WatermarkGauge, 1));
+               this.output1 = new StreamTaskNetworkOutput<>(
+                       streamOperator,
+                       (StreamRecord<IN1> record) -> {
 
 Review comment:
   Some missing explanations: my previous version took the function way for all 
the elements processing including record, watermark, latencyMarker. 
   
   But @pnowojski did not suggest that way because of inconvenient debugging 
issue, so I adjusted to only retain function way for frequent record processing 
because of possible JIT optimization concerns.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to