zhuzhurk commented on a change in pull request #17905:
URL: https://github.com/apache/flink/pull/17905#discussion_r759094376



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -391,6 +391,7 @@ private BufferBuilder requestNewBufferBuilderFromPool(int 
targetSubpartition)
     private void finishUnicastBufferBuilder(int targetSubpartition) {
         final BufferBuilder bufferBuilder = 
unicastBufferBuilders[targetSubpartition];
         if (bufferBuilder != null) {
+            numBytesProduced.inc(bufferBuilder.finish());

Review comment:
       We should invoke `finish()` once and record the result. The result can 
be used multiple times.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -406,6 +407,7 @@ private void finishUnicastBufferBuilders() {
 
     private void finishBroadcastBufferBuilder() {
         if (broadcastBufferBuilder != null) {
+            numBytesProduced.inc(broadcastBufferBuilder.finish());

Review comment:
       We should invoke `finish()` once and record the result. The result can 
be used multiple times.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
##########
@@ -111,6 +111,14 @@
 
     protected Counter numBuffersOut = new SimpleCounter();
 
+    /**
+     * The difference with {@link #numBytesOut} : numBytesProduced represents 
the number of bytes
+     * actually produced, and numBytesOut represents the number of bytes sent 
downstream tasks. In

Review comment:
       sent -> sent to

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
##########
@@ -71,4 +74,29 @@ public void testTaskIOMetricGroup() throws 
InterruptedException {
         assertThat(
                 taskIO.getBackPressuredTimePerSecond().getCount(), 
greaterThanOrEqualTo(sleepTime));
     }
+
+    @Test
+    public void testNumBytesProducedOfPartitionsMetrics() {
+        TaskMetricGroup task = 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+        TaskIOMetricGroup taskIO = task.getIOMetricGroup();
+
+        Counter c1 = new SimpleCounter();
+        c1.inc(32L);
+        Counter c2 = new SimpleCounter();
+        c2.inc(64L);
+
+        IntermediateResultPartitionID resultPartitionID1 = new 
IntermediateResultPartitionID();
+        IntermediateResultPartitionID resultPartitionID2 = new 
IntermediateResultPartitionID();
+
+        taskIO.registerNumBytesProducedCounterForPartition(resultPartitionID1, 
c1);
+        taskIO.registerNumBytesProducedCounterForPartition(resultPartitionID2, 
c2);
+
+        // check

Review comment:
       I think this comment is not needed

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
##########
@@ -822,6 +822,55 @@ public void testBufferSizeNotChanged() throws IOException {
         assertEquals(bufferSize, 
subpartition1.pollBuffer().buffer().getSize());
     }
 
+    @Test
+    public void testNumBytesProducedCounterForPipelinedUnicast() throws 
IOException {

Review comment:
       The change does not differentiate with ResultPartitionType, so I think 
there is not need to add different tests against different ResultPartitionTypes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to