zhuzhurk commented on a change in pull request #17905: URL: https://github.com/apache/flink/pull/17905#discussion_r759094376
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java ########## @@ -391,6 +391,7 @@ private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition) private void finishUnicastBufferBuilder(int targetSubpartition) { final BufferBuilder bufferBuilder = unicastBufferBuilders[targetSubpartition]; if (bufferBuilder != null) { + numBytesProduced.inc(bufferBuilder.finish()); Review comment: We should invoke `finish()` once and record the result. The result can be used multiple times. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java ########## @@ -406,6 +407,7 @@ private void finishUnicastBufferBuilders() { private void finishBroadcastBufferBuilder() { if (broadcastBufferBuilder != null) { + numBytesProduced.inc(broadcastBufferBuilder.finish()); Review comment: We should invoke `finish()` once and record the result. The result can be used multiple times. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java ########## @@ -111,6 +111,14 @@ protected Counter numBuffersOut = new SimpleCounter(); + /** + * The difference with {@link #numBytesOut} : numBytesProduced represents the number of bytes + * actually produced, and numBytesOut represents the number of bytes sent downstream tasks. In Review comment: sent -> sent to ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java ########## @@ -71,4 +74,29 @@ public void testTaskIOMetricGroup() throws InterruptedException { assertThat( taskIO.getBackPressuredTimePerSecond().getCount(), greaterThanOrEqualTo(sleepTime)); } + + @Test + public void testNumBytesProducedOfPartitionsMetrics() { + TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); + TaskIOMetricGroup taskIO = task.getIOMetricGroup(); + + Counter c1 = new SimpleCounter(); + c1.inc(32L); + Counter c2 = new SimpleCounter(); + c2.inc(64L); + + IntermediateResultPartitionID resultPartitionID1 = new IntermediateResultPartitionID(); + IntermediateResultPartitionID resultPartitionID2 = new IntermediateResultPartitionID(); + + taskIO.registerNumBytesProducedCounterForPartition(resultPartitionID1, c1); + taskIO.registerNumBytesProducedCounterForPartition(resultPartitionID2, c2); + + // check Review comment: I think this comment is not needed ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java ########## @@ -822,6 +822,55 @@ public void testBufferSizeNotChanged() throws IOException { assertEquals(bufferSize, subpartition1.pollBuffer().buffer().getSize()); } + @Test + public void testNumBytesProducedCounterForPipelinedUnicast() throws IOException { Review comment: The change does not differentiate with ResultPartitionType, so I think there is not need to add different tests against different ResultPartitionTypes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org