wanglijie95 commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764582435



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##########
@@ -81,52 +88,36 @@ private int calculateParallelism(List<BlockingResultInfo> 
consumedResults) {
                                                 .reduce(0L, Long::sum))
                         .sum();
 
-        if (broadcastBytes > dataVolumePerTask
-                || (broadcastBytes == dataVolumePerTask && nonBroadcastBytes > 
0)) {
-            LOG.warn(
-                    "The minimum size of one task to process is larger than "
-                            + "the size of data volume which is configured by "
-                            + "'"
-                            + 
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key()
-                            + "'. "
-                            + "Parallelism will be set to {}.",
-                    maxParallelism);
-
-            return maxParallelism;
-        } else if (broadcastBytes == dataVolumePerTask) {
-            return minParallelism;
-        } else {
-            int parallelism =
-                    (int)
-                            Math.ceil(
-                                    (double) nonBroadcastBytes
-                                            / (dataVolumePerTask - 
broadcastBytes));
-            parallelism = Math.max(parallelism, minParallelism);
-            parallelism = Math.min(parallelism, maxParallelism);
-            return parallelism;
-        }
-    }
-
-    /** The factory to instantiate {@link DefaultVertexParallelismDecider}. */
-    public static class Factory implements VertexParallelismDecider.Factory {
+        long expectedMaxBroadcastBytes =
+                (long) Math.ceil((dataVolumePerTask * CAP_RATIO_OF_BROADCAST));
 
-        private final Configuration configuration;
+        if (broadcastBytes > expectedMaxBroadcastBytes) {
+            LOG.info(
+                    "The number of broadcast bytes: {} is larger than the 
expected maximum value: {} ('{}' * {})."
+                            + " Use the expected maximum value as the number 
of broadcast bytes to decide the parallelism.",
+                    broadcastBytes,
+                    expectedMaxBroadcastBytes,
+                    
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(),
+                    CAP_RATIO_OF_BROADCAST);
 
-        public Factory(Configuration configuration) {
-            this.configuration = configuration;
+            broadcastBytes = expectedMaxBroadcastBytes;
         }
 
-        @Override
-        public VertexParallelismDecider create() {
-            return new DefaultVertexParallelismDecider(
-                    configuration.getInteger(
-                            
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM),
-                    configuration.getInteger(
-                            
JobManagerOptions.ADAPTIVE_BACH_SCHEDULER_MIN_PARALLELISM),
-                    configuration.get(
-                            
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK),
-                    configuration.get(
-                            
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM));
-        }
+        int parallelism =
+                (int) Math.ceil((double) nonBroadcastBytes / 
(dataVolumePerTask - broadcastBytes));
+        parallelism = Math.max(parallelism, minParallelism);
+        parallelism = Math.min(parallelism, maxParallelism);
+        return parallelism;
+    }
+
+    public static DefaultVertexParallelismDecider from(Configuration 
configuration) {
+        return new DefaultVertexParallelismDecider(

Review comment:
       Yes, you are right. I will add the checks in ctor. The  
`dataVolumePerTask` is `MemorySize` type, I will check it's not null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to