Hi Jim,
Thanks for your comments. I wonder if it'd make sense to > generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and > "partitions" (and maybe even situations where a data source is partitioned > and bucketed). Now that I go through FLIP-376 [1] again, your suggestion definitely makes sense. For any source connector that can derive "DISTRIBUTION" metadata (e.g., distribution key/columns, bucket names, etc) from the input source (e.g., given the dfs path), FLIP-434 [2] can and should support reading pre-bucketed and/or pre-divided data. I also added the relevant info to the FLIP-434 [2]. Separate from that, the page mentions TPC-H Q1 as an example. For a join, > any two tables joined on the same bucket key should provide a concrete > example of a join. Systems like Kafka Streams/ksqlDB call this > "co-partitioning"; for those systems, it is a requirement placed on the > input sources. For Flink, with FLIP-434, the proposed planner rule > could remove the shuffle. - Yes, with this proposal we might be able to remove the shuffle when compared to Kafka Streams/ksqlDB. One thing to note is that for join queries, the parallelism of each join source might be different. This might result in inconsistencies while using the pre-partitioned/pre-divided data (e.g., different mappings of partitions to source operators). Therefore, it is the job of planner to detect this and adjust the parallelism. With that having in mind, the rest (how the split assigners perform) is consistent among many sources. Please let. me know if that answers your questions or if you have any other comments. Regards, Jeyhun [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-434 %3A+Support+optimizations+for+pre-partitioned+data+sources On Tue, Mar 12, 2024 at 4:11 AM Jim Hughes <jhug...@confluent.io.invalid> wrote: > Hi Jeyhun, > > I like the idea! Given FLIP-376[1], I wonder if it'd make sense to > generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and > "partitions" (and maybe even situations where a data source is partitioned > and bucketed). > > Separate from that, the page mentions TPC-H Q1 as an example. For a join, > any two tables joined on the same bucket key should provide a concrete > example of a join. Systems like Kafka Streams/ksqlDB call this > "co-partitioning"; for those systems, it is a requirement placed on the > input sources. For Flink, with FLIP-434, the proposed planner rule > could remove the shuffle. > > Definitely a fun idea; I look forward to hearing more! > > Cheers, > > Jim > > > 1. > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > 2. > > https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements > > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <je.kari...@gmail.com> > wrote: > > > Hi devs, > > > > I’d like to start a discussion on FLIP-434: Support optimizations for > > pre-partitioned data sources [1]. > > > > The FLIP introduces taking advantage of pre-partitioned data sources for > > SQL/Table API (it is already supported as experimental feature in > > DataStream API [2]). > > > > > > Please find more details in the FLIP wiki document [1]. > > Looking forward to your feedback. > > > > Regards, > > Jeyhun > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources > > [2] > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ > > >