zhuzhurk commented on code in PR #24170:
URL: https://github.com/apache/flink/pull/24170#discussion_r1462752170


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -559,7 +559,7 @@ static DefaultVertexParallelismAndInputInfosDecider from(
                 configuration.get(
                         
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK),
                 configuration.get(
-                        BatchExecutionOptions
-                                
.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM));
+                        
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,

Review Comment:
   It's better to change the default value to `noDefaultValue()` to avoid 
misleading.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java:
##########
@@ -379,6 +379,30 @@ void 
testEvenlyDistributeDataWithMaxSubpartitionLimitation() {
                 subpartitionRanges);
     }
 
+    @Test
+    void testComputeSourceParallelismUpperBound() {
+        Configuration configuration = new Configuration();
+        // Test the case where default source parallelism is not configured, 
falling back to global
+        // max parallelism.
+        VertexParallelismAndInputInfosDecider 
vertexParallelismAndInputInfosDecider =
+                DefaultVertexParallelismAndInputInfosDecider.from(32, 
configuration);
+        assertThat(
+                        
vertexParallelismAndInputInfosDecider.computeSourceParallelismUpperBound(
+                                new JobVertexID(), 128))
+                .isEqualTo(32);
+
+        // Test the case where the configured default source parallelism 
cannot exceed the vertex
+        // max parallelism.
+        configuration.setInteger(

Review Comment:
   It's better to create different tests for different cases, e.g. 
`testComputeSourceParallelismUpperBound` for normal case, 
`testComputeSourceParallelismUpperBoundFallback` for the fallback case, 
`testComputeSourceParallelismUpperBoundNotExceedMaxParallelism` for verifying 
max parallelism effectiveness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to