godfreyhe commented on a change in pull request #18785:
URL: https://github.com/apache/flink/pull/18785#discussion_r808121812



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
##########
@@ -136,25 +154,38 @@ public String getDescription() {
                 parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
                 break;
             case KEEP_INPUT_AS_IS:
-                RequiredDistribution inputDistribution =
-                        ((KeepInputAsIsDistribution) 
requiredDistribution).getInputDistribution();
-                checkArgument(
-                        inputDistribution instanceof HashDistribution,
-                        "Only HashDistribution is supported now");
-                partitioner =
-                        new ForwardForConsecutiveHashPartitioner<>(
-                                createHashPartitioner(
-                                        ((HashDistribution) inputDistribution),
-                                        inputType,
-                                        planner));
+                KeepInputAsIsDistribution keepInputAsIsDistribution =
+                        (KeepInputAsIsDistribution) requiredDistribution;
+                if (keepInputAsIsDistribution.isStrict()) {
+                    // explicitly use ForwardPartitioner to guarantee the data 
distribution is
+                    // exactly the same as input
+                    partitioner = new ForwardPartitioner<>();
+                    requireUndefinedExchangeMode = true;
+                } else {
+                    RequiredDistribution inputDistribution =
+                            ((KeepInputAsIsDistribution) requiredDistribution)
+                                    .getInputDistribution();
+                    checkArgument(
+                            inputDistribution instanceof HashDistribution,
+                            "Only HashDistribution is supported now");
+                    partitioner =
+                            new ForwardForConsecutiveHashPartitioner<>(
+                                    createHashPartitioner(
+                                            ((HashDistribution) 
inputDistribution),
+                                            inputType,
+                                            planner));
+                }
                 parallelism = inputTransform.getParallelism();
                 break;
             default:
                 throw new TableException(distributionType + "is not supported 
now!");
         }
 
         final StreamExchangeMode exchangeMode =
-                getBatchStreamExchangeMode(planner.getConfiguration(), 
requiredExchangeMode);
+                requireUndefinedExchangeMode
+                        ? StreamExchangeMode.UNDEFINED

Review comment:
       consider the chain logic in `StreamingJobGraphGenerator`, the exchange 
mode of forward partitioner should not be `BATCH`, otherwise the operators can 
be chained. while for `ForwardForConsecutiveHashPartitioner`, if it is 
converted to hash shuffle and the exchange mode is set as `ALL_EDGES_BLOCKING` 
through `table.exec.shuffle-mode`, its final shuffle mode should be batch. the 
chain logic for ForwardForConsecutiveHashPartitioner will be updated in 
FLINK-26168.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to