Hi Jane, Thanks for your comments.
I understand your point. **It would be better if you could sync the > content to the FLIP**. - Sure thing. I added my above answer to the FLIP. Another thing is I'm curious about what the physical plan looks like. Is > there any specific info that will be added to the table source (like > filter/project pushdown)? It would be great if you could attach an example > to the FLIP. - For the physical plan, the table source will have an additional info named "partitionedReading" or "partitionedRead". For example: CREATE TABLE MyTableP ( a bigint, b int, c varchar ) PARTITIONED BY (a, b) with ( 'connector' = 'filesystem', 'format' = 'testcsv', 'path' = '/root_dir') SELECT a, b, COUNT (c) from MyTableP GROUP BY a, b +- LocalHashAggregate(groupBy=[a, b], select=[a, b, Partial_COUNT(c) AS count$0]) +- TableSourceScan(table=[[default_catalog, default_database, MyTableP, partitionedReading]], fields=[a, b, c]) I also added this example to the FLIP. Please let me know if that answers your question or if you have any other comments. Regards, Jeyhun On Thu, Mar 14, 2024 at 8:23 AM Jane Chan <qingyue....@gmail.com> wrote: > Hi Jeyhun, > > Thanks for your clarification. > > > Once a new partition is detected, we add it to our existing mapping. Our > mapping looks like Map<Integer, Set<Integer>> subtaskToPartitionAssignment, > where it maps each source subtaskID to zero or more partitions. > > I understand your point. **It would be better if you could sync the > content to the FLIP**. > > Another thing is I'm curious about what the physical plan looks like. Is > there any specific info that will be added to the table source (like > filter/project pushdown)? It would be great if you could attach an example > to the FLIP. > > Bests, > Jane > > On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com> > wrote: > > > Hi Jane, > > > > Thanks for your comments. > > > > > > 1. Concerning the `sourcePartitions()` method, the partition information > > > returned during the optimization phase may not be the same as the > > partition > > > information during runtime execution. For long-running jobs, partitions > > may > > > be continuously created. Is this FLIP equipped to handle scenarios? > > > > > > - Good point. This scenario is definitely supported. > > Once a new partition is added, or in general, new splits are > > discovered, > > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> > > newSplits) > > method will be called. Inside that method, we are able to detect if a > split > > belongs to existing partitions or there is a new partition. > > Once a new partition is detected, we add it to our existing mapping. Our > > mapping looks like Map<Integer, Set<Integer>> > subtaskToPartitionAssignment, > > where > > it maps each source subtaskID to zero or more partitions. > > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I > > > understand that it is also necessary to verify whether the hash key > > within > > > the Exchange node is consistent with the partition key defined in the > > table > > > source that implements `SupportsPartitioning`. > > > > > > - Yes, I overlooked that point, fixed. Actually, the rule is much > > complicated. I tried to simplify it in the FLIP. Good point. > > > > > > 3. Could you elaborate on the desired physical plan and integration with > > > `CompiledPlan` to enhance the overall functionality? > > > > > > - For compiled plan, PartitioningSpec will be used, with a json tag > > "Partitioning". As a result, in the compiled plan, the source operator > will > > have > > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled > plan. > > More about the implementation details below: > > > > -------------------------------- > > PartitioningSpec class > > -------------------------------- > > @JsonTypeName("Partitioning") > > public final class PartitioningSpec extends SourceAbilitySpecBase { > > // some code here > > @Override > > public void apply(DynamicTableSource tableSource, > SourceAbilityContext > > context) { > > if (tableSource instanceof SupportsPartitioning) { > > ((SupportsPartitioning<?>) > tableSource).applyPartitionedRead(); > > } else { > > throw new TableException( > > String.format( > > "%s does not support SupportsPartitioning.", > > tableSource.getClass().getName())); > > } > > } > > // some code here > > } > > > > -------------------------------- > > SourceAbilitySpec class > > -------------------------------- > > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = > > JsonTypeInfo.As.PROPERTY, property = "type") > > @JsonSubTypes({ > > @JsonSubTypes.Type(value = FilterPushDownSpec.class), > > @JsonSubTypes.Type(value = LimitPushDownSpec.class), > > @JsonSubTypes.Type(value = PartitionPushDownSpec.class), > > @JsonSubTypes.Type(value = ProjectPushDownSpec.class), > > @JsonSubTypes.Type(value = ReadingMetadataSpec.class), > > @JsonSubTypes.Type(value = WatermarkPushDownSpec.class), > > @JsonSubTypes.Type(value = SourceWatermarkSpec.class), > > @JsonSubTypes.Type(value = AggregatePushDownSpec.class), > > + @JsonSubTypes.Type(value = PartitioningSpec.class) > // > > new added > > > > > > > > Please let me know if that answers your questions or if you have other > > comments. > > > > Regards, > > Jeyhun > > > > > > On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com> wrote: > > > > > Hi Jeyhun, > > > > > > Thank you for leading the discussion. I'm generally +1 with this > > proposal, > > > along with some questions. Please see my comments below. > > > > > > 1. Concerning the `sourcePartitions()` method, the partition > information > > > returned during the optimization phase may not be the same as the > > partition > > > information during runtime execution. For long-running jobs, partitions > > may > > > be continuously created. Is this FLIP equipped to handle scenarios? > > > > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I > > > understand that it is also necessary to verify whether the hash key > > within > > > the Exchange node is consistent with the partition key defined in the > > table > > > source that implements `SupportsPartitioning`. > > > > > > 3. Could you elaborate on the desired physical plan and integration > with > > > `CompiledPlan` to enhance the overall functionality? > > > > > > Best, > > > Jane > > > > > > On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes > <jhug...@confluent.io.invalid > > > > > > wrote: > > > > > > > Hi Jeyhun, > > > > > > > > I like the idea! Given FLIP-376[1], I wonder if it'd make sense to > > > > generalize FLIP-434 to be about "pre-divided" data to cover "buckets" > > and > > > > "partitions" (and maybe even situations where a data source is > > > partitioned > > > > and bucketed). > > > > > > > > Separate from that, the page mentions TPC-H Q1 as an example. For a > > > join, > > > > any two tables joined on the same bucket key should provide a > concrete > > > > example of a join. Systems like Kafka Streams/ksqlDB call this > > > > "co-partitioning"; for those systems, it is a requirement placed on > the > > > > input sources. For Flink, with FLIP-434, the proposed planner rule > > > > could remove the shuffle. > > > > > > > > Definitely a fun idea; I look forward to hearing more! > > > > > > > > Cheers, > > > > > > > > Jim > > > > > > > > > > > > 1. > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > > > > 2. > > > > > > > > > > > > > > https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements > > > > > > > > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <je.kari...@gmail.com > > > > > > wrote: > > > > > > > > > Hi devs, > > > > > > > > > > I’d like to start a discussion on FLIP-434: Support optimizations > for > > > > > pre-partitioned data sources [1]. > > > > > > > > > > The FLIP introduces taking advantage of pre-partitioned data > sources > > > for > > > > > SQL/Table API (it is already supported as experimental feature in > > > > > DataStream API [2]). > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > > > > Looking forward to your feedback. > > > > > > > > > > Regards, > > > > > Jeyhun > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources > > > > > [2] > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ > > > > > > > > > > > > > > >