Hi Hang, Thanks for the comments.
I have a question about the part `Additional option to disable this > optimization`. Is this option a source configuration or a table > configuration? - It is a source configuration. Besides that, there is a little mistake if I do not understand wrongly. > Should `Check if upstream_any is pre-partitioned data source AND contains > the same partition keys as the source` be changed as `Check if upstream_any > is pre-partitioned data source AND contains the same partition keys as > downstream_any` ? - Yes, 'source' should be 'exchange' here. I materialized both points to the FLIP. Please let me know if that answers your questions or if you have any other comments. Regards, Jeyhun On Thu, Mar 14, 2024 at 8:10 AM Hang Ruan <ruanhang1...@gmail.com> wrote: > Hi, Jeyhun. > > Thanks for the FLIP. Totally +1 for it. > > I have a question about the part `Additional option to disable this > optimization`. Is this option a source configuration or a table > configuration? > > Besides that, there is a little mistake if I do not understand wrongly. > Should `Check if upstream_any is pre-partitioned data source AND contains > the same partition keys as the source` be changed as `Check if upstream_any > is pre-partitioned data source AND contains the same partition keys as > downstream_any` ? > > Best, > Hang > > Jeyhun Karimov <je.kari...@gmail.com> 于2024年3月13日周三 21:11写道: > > > Hi Jane, > > > > Thanks for your comments. > > > > > > 1. Concerning the `sourcePartitions()` method, the partition information > > > returned during the optimization phase may not be the same as the > > partition > > > information during runtime execution. For long-running jobs, partitions > > may > > > be continuously created. Is this FLIP equipped to handle scenarios? > > > > > > - Good point. This scenario is definitely supported. > > Once a new partition is added, or in general, new splits are > > discovered, > > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> > > newSplits) > > method will be called. Inside that method, we are able to detect if a > split > > belongs to existing partitions or there is a new partition. > > Once a new partition is detected, we add it to our existing mapping. Our > > mapping looks like Map<Integer, Set<Integer>> > subtaskToPartitionAssignment, > > where > > it maps each source subtaskID to zero or more partitions. > > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I > > > understand that it is also necessary to verify whether the hash key > > within > > > the Exchange node is consistent with the partition key defined in the > > table > > > source that implements `SupportsPartitioning`. > > > > > > - Yes, I overlooked that point, fixed. Actually, the rule is much > > complicated. I tried to simplify it in the FLIP. Good point. > > > > > > 3. Could you elaborate on the desired physical plan and integration with > > > `CompiledPlan` to enhance the overall functionality? > > > > > > - For compiled plan, PartitioningSpec will be used, with a json tag > > "Partitioning". As a result, in the compiled plan, the source operator > will > > have > > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled > plan. > > More about the implementation details below: > > > > -------------------------------- > > PartitioningSpec class > > -------------------------------- > > @JsonTypeName("Partitioning") > > public final class PartitioningSpec extends SourceAbilitySpecBase { > > // some code here > > @Override > > public void apply(DynamicTableSource tableSource, > SourceAbilityContext > > context) { > > if (tableSource instanceof SupportsPartitioning) { > > ((SupportsPartitioning<?>) > tableSource).applyPartitionedRead(); > > } else { > > throw new TableException( > > String.format( > > "%s does not support SupportsPartitioning.", > > tableSource.getClass().getName())); > > } > > } > > // some code here > > } > > > > -------------------------------- > > SourceAbilitySpec class > > -------------------------------- > > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = > > JsonTypeInfo.As.PROPERTY, property = "type") > > @JsonSubTypes({ > > @JsonSubTypes.Type(value = FilterPushDownSpec.class), > > @JsonSubTypes.Type(value = LimitPushDownSpec.class), > > @JsonSubTypes.Type(value = PartitionPushDownSpec.class), > > @JsonSubTypes.Type(value = ProjectPushDownSpec.class), > > @JsonSubTypes.Type(value = ReadingMetadataSpec.class), > > @JsonSubTypes.Type(value = WatermarkPushDownSpec.class), > > @JsonSubTypes.Type(value = SourceWatermarkSpec.class), > > @JsonSubTypes.Type(value = AggregatePushDownSpec.class), > > + @JsonSubTypes.Type(value = PartitioningSpec.class) > // > > new added > > > > > > > > Please let me know if that answers your questions or if you have other > > comments. > > > > Regards, > > Jeyhun > > > > > > On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com> wrote: > > > > > Hi Jeyhun, > > > > > > Thank you for leading the discussion. I'm generally +1 with this > > proposal, > > > along with some questions. Please see my comments below. > > > > > > 1. Concerning the `sourcePartitions()` method, the partition > information > > > returned during the optimization phase may not be the same as the > > partition > > > information during runtime execution. For long-running jobs, partitions > > may > > > be continuously created. Is this FLIP equipped to handle scenarios? > > > > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I > > > understand that it is also necessary to verify whether the hash key > > within > > > the Exchange node is consistent with the partition key defined in the > > table > > > source that implements `SupportsPartitioning`. > > > > > > 3. Could you elaborate on the desired physical plan and integration > with > > > `CompiledPlan` to enhance the overall functionality? > > > > > > Best, > > > Jane > > > > > > On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes > <jhug...@confluent.io.invalid > > > > > > wrote: > > > > > > > Hi Jeyhun, > > > > > > > > I like the idea! Given FLIP-376[1], I wonder if it'd make sense to > > > > generalize FLIP-434 to be about "pre-divided" data to cover "buckets" > > and > > > > "partitions" (and maybe even situations where a data source is > > > partitioned > > > > and bucketed). > > > > > > > > Separate from that, the page mentions TPC-H Q1 as an example. For a > > > join, > > > > any two tables joined on the same bucket key should provide a > concrete > > > > example of a join. Systems like Kafka Streams/ksqlDB call this > > > > "co-partitioning"; for those systems, it is a requirement placed on > the > > > > input sources. For Flink, with FLIP-434, the proposed planner rule > > > > could remove the shuffle. > > > > > > > > Definitely a fun idea; I look forward to hearing more! > > > > > > > > Cheers, > > > > > > > > Jim > > > > > > > > > > > > 1. > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > > > > 2. > > > > > > > > > > > > > > https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements > > > > > > > > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <je.kari...@gmail.com > > > > > > wrote: > > > > > > > > > Hi devs, > > > > > > > > > > I’d like to start a discussion on FLIP-434: Support optimizations > for > > > > > pre-partitioned data sources [1]. > > > > > > > > > > The FLIP introduces taking advantage of pre-partitioned data > sources > > > for > > > > > SQL/Table API (it is already supported as experimental feature in > > > > > DataStream API [2]). > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > > > > Looking forward to your feedback. > > > > > > > > > > Regards, > > > > > Jeyhun > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources > > > > > [2] > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ > > > > > > > > > > > > > > >