wenlong88 commented on a change in pull request #18785:
URL: https://github.com/apache/flink/pull/18785#discussion_r808030853



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
##########
@@ -136,25 +154,38 @@ public String getDescription() {
                 parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
                 break;
             case KEEP_INPUT_AS_IS:
-                RequiredDistribution inputDistribution =
-                        ((KeepInputAsIsDistribution) 
requiredDistribution).getInputDistribution();
-                checkArgument(
-                        inputDistribution instanceof HashDistribution,
-                        "Only HashDistribution is supported now");
-                partitioner =
-                        new ForwardForConsecutiveHashPartitioner<>(
-                                createHashPartitioner(
-                                        ((HashDistribution) inputDistribution),
-                                        inputType,
-                                        planner));
+                KeepInputAsIsDistribution keepInputAsIsDistribution =
+                        (KeepInputAsIsDistribution) requiredDistribution;
+                if (keepInputAsIsDistribution.isStrict()) {
+                    // explicitly use ForwardPartitioner to guarantee the data 
distribution is
+                    // exactly the same as input
+                    partitioner = new ForwardPartitioner<>();
+                    requireUndefinedExchangeMode = true;
+                } else {
+                    RequiredDistribution inputDistribution =
+                            ((KeepInputAsIsDistribution) requiredDistribution)
+                                    .getInputDistribution();
+                    checkArgument(
+                            inputDistribution instanceof HashDistribution,
+                            "Only HashDistribution is supported now");
+                    partitioner =
+                            new ForwardForConsecutiveHashPartitioner<>(
+                                    createHashPartitioner(
+                                            ((HashDistribution) 
inputDistribution),
+                                            inputType,
+                                            planner));
+                }
                 parallelism = inputTransform.getParallelism();
                 break;
             default:
                 throw new TableException(distributionType + "is not supported 
now!");
         }
 
         final StreamExchangeMode exchangeMode =
-                getBatchStreamExchangeMode(planner.getConfiguration(), 
requiredExchangeMode);
+                requireUndefinedExchangeMode
+                        ? StreamExchangeMode.UNDEFINED

Review comment:
       why it is speical when it is forward partitioner? i think forward and 
ForwardForConsecutiveHashPartitioner shoudl required the same exchange mode?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to