godfreyhe commented on a change in pull request #18785: URL: https://github.com/apache/flink/pull/18785#discussion_r808121812
########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java ########## @@ -136,25 +154,38 @@ public String getDescription() { parallelism = ExecutionConfig.PARALLELISM_DEFAULT; break; case KEEP_INPUT_AS_IS: - RequiredDistribution inputDistribution = - ((KeepInputAsIsDistribution) requiredDistribution).getInputDistribution(); - checkArgument( - inputDistribution instanceof HashDistribution, - "Only HashDistribution is supported now"); - partitioner = - new ForwardForConsecutiveHashPartitioner<>( - createHashPartitioner( - ((HashDistribution) inputDistribution), - inputType, - planner)); + KeepInputAsIsDistribution keepInputAsIsDistribution = + (KeepInputAsIsDistribution) requiredDistribution; + if (keepInputAsIsDistribution.isStrict()) { + // explicitly use ForwardPartitioner to guarantee the data distribution is + // exactly the same as input + partitioner = new ForwardPartitioner<>(); + requireUndefinedExchangeMode = true; + } else { + RequiredDistribution inputDistribution = + ((KeepInputAsIsDistribution) requiredDistribution) + .getInputDistribution(); + checkArgument( + inputDistribution instanceof HashDistribution, + "Only HashDistribution is supported now"); + partitioner = + new ForwardForConsecutiveHashPartitioner<>( + createHashPartitioner( + ((HashDistribution) inputDistribution), + inputType, + planner)); + } parallelism = inputTransform.getParallelism(); break; default: throw new TableException(distributionType + "is not supported now!"); } final StreamExchangeMode exchangeMode = - getBatchStreamExchangeMode(planner.getConfiguration(), requiredExchangeMode); + requireUndefinedExchangeMode + ? StreamExchangeMode.UNDEFINED Review comment: consider the chain logic in `StreamingJobGraphGenerator`, the exchange mode of forward partitioner should not be `BATCH`, otherwise the operators can be chained. while for `ForwardForConsecutiveHashPartitioner`, if it is converted to hash shuffle and the exchange mode is set as `ALL_EDGES_BLOCKING` through `table.exec.shuffle-mode`, its final shuffle mode should be batch. the chain logic for ForwardForConsecutiveHashPartitioner will be updated in FLINK-26168. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org