zhijiangW commented on a change in pull request #9483: [FLINK-13767][task] Refactor StreamInputProcessor#processInput based on InputStatus URL: https://github.com/apache/flink/pull/9483#discussion_r323659128
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java ########## @@ -121,18 +115,46 @@ public StreamTwoInputProcessor( taskManagerConfig, taskName); checkState(checkpointedInputGates.length == 2); - this.input1 = new StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager, 0); - this.input2 = new StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager, 1); - this.statusWatermarkValve1 = new StatusWatermarkValve( - unionedInputGate1.getNumberOfInputChannels(), - new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input1WatermarkGauge, 0)); - this.statusWatermarkValve2 = new StatusWatermarkValve( - unionedInputGate2.getNumberOfInputChannels(), - new ForwardingValveOutputHandler(streamOperator, lock, streamStatusMaintainer, input2WatermarkGauge, 1)); + this.output1 = new StreamTaskNetworkOutput<>( + streamOperator, + (StreamRecord<IN1> record) -> { Review comment: Some missing explanations: my previous version took the function way for all the elements processing including record, watermark, latencyMarker. But @pnowojski did not suggest that way because of inconvenient debugging issue, so I adjusted to only retain function way for frequent record processing because of possible JIT optimization concerns. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services