Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5125#discussion_r154999100 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java --- @@ -80,7 +91,10 @@ public void init() throws Exception { this.headOperator); // make sure that stream tasks report their I/O statistics - inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); + inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(), input1WatermarkGauge, input2WatermarkGauge); + + headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); --- End diff -- oh wait. No this is different. For TwoINputStreamTasks we register the input1/2 watermark metrics here, but the common min watermark metric in the operator chain.
---