leonardBang commented on a change in pull request #11853:
URL: https://github.com/apache/flink/pull/11853#discussion_r414634883



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
##########
@@ -71,6 +71,12 @@
                        .withDescription("The default partition name in case 
the dynamic partition" +
                                        " column value is null/empty string");
 
+       public static final ConfigOption<Boolean> SINK_SHUFFLE_BY_PARTITION = 
key("sink.shuffle-by-partition.enable")
+                       .booleanType()
+                       .defaultValue(false)
+                       .withDescription("Before sink, can shuffle by dynamic 
partition fields to sink parallelisms," +
+                                       " this can greatly reduce the number of 
files. But will lead to data skew too.");
+

Review comment:
       How about this ?"The option to enable shuffle data by dynamic partition 
fields in sink phase, this can greatly reduce the number of file for filesystem 
sink but may lead data skew, the default value is disabled."

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecSinkRule.scala
##########
@@ -53,15 +53,20 @@ class StreamExecSinkRule extends ConverterRule(
             val dynamicPartIndices =
               
dynamicPartFields.map(partitionSink.getTableSchema.getFieldNames.indexOf(_))
 
-            if (partitionSink.configurePartitionGrouping(false)) {
-              throw new TableException("Partition grouping in stream mode is 
not supported yet!")
-            }
+            val shuffleEnable = sinkNode
+                .catalogTable
+                .getProperties
+                .get(FileSystemTableFactory.SINK_SHUFFLE_BY_PARTITION.key())
 
-            if (!partitionSink.isInstanceOf[DataStreamTableSink[_]]) {

Review comment:
        `DataStreamTableSink` is a special Type, this change will make all 
stream sink nodes have chance to use the shuffle enable config? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to