Hi Lorenzo, Thanks a lot for your comments. Please find my answers below:
For the interface `SupportsPartitioning`, why returning `Optional`? > If one decides to implement that, partitions must exist (at maximum, > return and empty list). Returning `Optional` seem just to complicate the > logic of the code using that interface. - The reasoning behind the use of Optional is that sometimes (e.g., in HiveTableSource) the partitioning info is in catalog. Therefore, we return Optional.empty(), so that the list of partitions is queried from the catalog. I foresee the using code doing something like: "if the source supports > partitioning, get the partitions, but if they don't exist, raise a runtime > exception". Let's simply make that safe at compile time and guarantee the > code that partitions exist. - Yes, once partitions cannot be found, neither from catalog nor from the interface implementation, then we raise an exception during query compile time. Another thing is that you show Hive-like partitioning in your FS > structure, do you think it makes sense to add a note about auto-discovery > of partitions? - Yes, the FLIP contains just an example partitioning for filesystem connector. Each connector already "knows" about autodiscovery of its partitions. And we rely on this fact. For example, partition discovery is different between kafka and filesystem sources. So, we do not handle the manual discovery of partitions. Please correct me if I misunderstood your question. In other terms, it looks a bit counterintuitive that the user implementing > the source has to specify which partitions exist statically (and they can > change at runtime), while the source itself knows the data provider and can > directly implement a method `discoverPartitions`. Then Flink would take > care of invoking that method when needed. We utilize table option SOURCE_MONITOR_INTERVAL to check whether partitions are static or not. So, a user still should give Flink a hint about partitions being static or not. With static partitions Flink can do more optimizations. Please let me know if my replies answer your questions or if you have more comments. Regards, Jeyhun On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com> wrote: > Hello Jeyhun, > I really like the proposal and definitely makes sense to me. > > I have a couple of nits here and there: > > For the interface `SupportsPartitioning`, why returning `Optional`? > If one decides to implement that, partitions must exist (at maximum, > return and empty list). Returning `Optional` seem just to complicate the > logic of the code using that interface. > > I foresee the using code doing something like: "if the source supports > partitioning, get the partitions, but if they don't exist, raise a runtime > exception". Let's simply make that safe at compile time and guarantee the > code that partitions exist. > > Another thing is that you show Hive-like partitioning in your FS > structure, do you think it makes sense to add a note about auto-discovery > of partitions? > > In other terms, it looks a bit counterintuitive that the user implementing > the source has to specify which partitions exist statically (and they can > change at runtime), while the source itself knows the data provider and can > directly implement a method `discoverPartitions`. Then Flink would take > care of invoking that method when needed. > On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov <je.kari...@gmail.com>, > wrote: > > Hi Benchao, > > Thanks for your comments. > > 1. What the parallelism would you take? E.g., 128 + 256 => 128? What > > if we cannot have a good greatest common divisor, like 127 + 128, > could we just utilize one side's pre-partitioned attribute, and let > another side just do the shuffle? > > > > There are two cases we need to consider: > > 1. Static Partition (no partitions are added during the query execution) is > enabled AND both sources implement "SupportsPartitionPushdown" > > In this case, we are sure that no new partitions will be added at runtime. > So, we have a chance equalize both sources' partitions and parallelism, IFF > both sources implement "SupportsPartitionPushdown" interface. > To achieve so, first we will fetch the existing partitions from source1 > (say p_s1) and from source2 (say p_s2). > Then, we find the intersection of these two partition sets (say > p_intersect) and pushdown these partitions: > > SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure that > only specific partitions are read > SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned read > with filtered partitions > > Lastly, we need to change the parallelism of 1) source1, 2) source2, and 3) > all of their downstream operators until (and including) their first common > ancestor (e.g., join) to be equal to the number of partitions (size of > p_intersect). > > 2. All other cases > > In all other cases, the parallelism of both sources and their downstream > operators until their common ancestor would be equal to the MIN(p_s1, > p_s2). > That is, minimum of the partition size of source1 and partition size of > source2 will be selected as the parallelism. > Coming back to your example, if source1 parallelism is 127 and source2 > parallelism is 128, then we will first check the partition size of source1 > and source2. > Say partition size of source1 is 100 and partition size of source2 is 90. > Then, we would set the parallelism for source1, source2, and all of their > downstream operators until (and including) the join operator > to 90 (min(100, 90)). > We also plan to implement a cost based decision instead of the rule-based > one (the ones explained above - MIN rule). > One possible result of the cost based estimation is to keep the partitions > on one side and perform the shuffling on another source. > > > > 2. In our current shuffle remove design (FlinkExpandConversionRule), > > we don't consider parallelism, we just remove unnecessary shuffles > according to the distribution columns. After this FLIP, the > parallelism may be bundled with source's partitions, then how will > this optimization accommodate with FlinkExpandConversionRule, will you > also change downstream operator's parallelisms if we want to also > remove subsequent shuffles? > > > > > - From my understanding of FlinkExpandConversionRule, its removal logic is > agnostic to operator parallelism. > So, if FlinkExpandConversionRule decides to remove a shuffle operation, > then this FLIP will search another possible shuffle (the one closest to the > source) to remove. > If there is such an opportunity, this FLIP will remove the shuffle. So, > from my understanding FlinkExpandConversionRule and this optimization rule > can work together safely. > Please correct me if I misunderstood your question. > > > > Regarding the new optimization rule, have you also considered to allow > > some non-strict mode like FlinkRelDistribution#requireStrict? For > example, source is pre-partitioned by a, b columns, if we are > consuming this source, and do a aggregate on a, b, c, can we utilize > this optimization? > > > > - Good point. Yes, there are some cases that non-strict mode will apply. > For example: > > - pre-partitioned columns and aggregate columns are the same but have > different order (e.g., source pre-partitioned w.r.t. a,b and aggregate has > a GROUP BY b,a) > - columns in the Exchange operator is a list-prefix of pre-partitoned > columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and > Exchange's partition columns are a,b) > > Please let me know if the above answers your questions or if you have any > other comments. > > Regards, > Jeyhun > > On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org> wrote: > > Thanks Jeyhun for bringing up this discussion, it is really exiting, > +1 for the general idea. > > We also introduced a similar concept in Flink Batch internally to cope > with bucketed tables in Hive, it is a very important improvement. > > > One thing to note is that for join queries, the parallelism of each join > > source might be different. This might result in > > inconsistencies while using the pre-partitioned/pre-divided data (e.g., > > different mappings of partitions to source operators). > > Therefore, it is the job of planner to detect this and adjust the > > parallelism. With that having in mind, > > the rest (how the split assigners perform) is consistent among many > > sources. > > > Could you elaborate a little more on this. I added my two cents here > about this part: > 1. What the parallelism would you take? E.g., 128 + 256 => 128? What > if we cannot have a good greatest common divisor, like 127 + 128, > could we just utilize one side's pre-partitioned attribute, and let > another side just do the shuffle? > 2. In our current shuffle remove design (FlinkExpandConversionRule), > we don't consider parallelism, we just remove unnecessary shuffles > according to the distribution columns. After this FLIP, the > parallelism may be bundled with source's partitions, then how will > this optimization accommodate with FlinkExpandConversionRule, will you > also change downstream operator's parallelisms if we want to also > remove subsequent shuffles? > > > Regarding the new optimization rule, have you also considered to allow > some non-strict mode like FlinkRelDistribution#requireStrict? For > example, source is pre-partitioned by a, b columns, if we are > consuming this source, and do a aggregate on a, b, c, can we utilize > this optimization? > > Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道: > > > > > Hi Jeyhun, > > > > Thanks for your clarification. > > > > > > Once a new partition is detected, we add it to our existing mapping. > > Our > > > mapping looks like Map<Integer, Set<Integer>> > > subtaskToPartitionAssignment, > > > where it maps each source subtaskID to zero or more partitions. > > > > I understand your point. **It would be better if you could sync the > > content to the FLIP**. > > > > Another thing is I'm curious about what the physical plan looks like. Is > > there any specific info that will be added to the table source (like > > filter/project pushdown)? It would be great if you could attach an > > example > > > to the FLIP. > > > > Bests, > > Jane > > > > On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov <je.kari...@gmail.com> > > wrote: > > > > > > > Hi Jane, > > > > > > Thanks for your comments. > > > > > > > > > 1. Concerning the `sourcePartitions()` method, the partition > > information > > > > > returned during the optimization phase may not be the same as the > > > > partition > > > > > information during runtime execution. For long-running jobs, > > partitions > > > > may > > > > > be continuously created. Is this FLIP equipped to handle scenarios? > > > > > > > > > > - Good point. This scenario is definitely supported. > > > Once a new partition is added, or in general, new splits are > > > discovered, > > > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> > > > newSplits) > > > method will be called. Inside that method, we are able to detect if a > > split > > > > belongs to existing partitions or there is a new partition. > > > Once a new partition is detected, we add it to our existing mapping. > > Our > > > > mapping looks like Map<Integer, Set<Integer>> > > subtaskToPartitionAssignment, > > > > where > > > it maps each source subtaskID to zero or more partitions. > > > > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I > > > > > understand that it is also necessary to verify whether the hash key > > > > within > > > > > the Exchange node is consistent with the partition key defined in the > > > > table > > > > > source that implements `SupportsPartitioning`. > > > > > > > > > > - Yes, I overlooked that point, fixed. Actually, the rule is much > > > complicated. I tried to simplify it in the FLIP. Good point. > > > > > > > > > 3. Could you elaborate on the desired physical plan and integration > > with > > > > > `CompiledPlan` to enhance the overall functionality? > > > > > > > > > > - For compiled plan, PartitioningSpec will be used, with a json tag > > > "Partitioning". As a result, in the compiled plan, the source operator > > will > > > > have > > > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled > > plan. > > > > More about the implementation details below: > > > > > > -------------------------------- > > > PartitioningSpec class > > > -------------------------------- > > > @JsonTypeName("Partitioning") > > > public final class PartitioningSpec extends SourceAbilitySpecBase { > > > // some code here > > > @Override > > > public void apply(DynamicTableSource tableSource, > > SourceAbilityContext > > > > context) { > > > if (tableSource instanceof SupportsPartitioning) { > > > ((SupportsPartitioning<?>) > > tableSource).applyPartitionedRead(); > > > > } else { > > > throw new TableException( > > > String.format( > > > "%s does not support > > SupportsPartitioning.", > > > > tableSource.getClass().getName())); > > > } > > > } > > > // some code here > > > } > > > > > > -------------------------------- > > > SourceAbilitySpec class > > > -------------------------------- > > > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = > > > JsonTypeInfo.As.PROPERTY, property = "type") > > > @JsonSubTypes({ > > > @JsonSubTypes.Type(value = FilterPushDownSpec.class), > > > @JsonSubTypes.Type(value = LimitPushDownSpec.class), > > > @JsonSubTypes.Type(value = PartitionPushDownSpec.class), > > > @JsonSubTypes.Type(value = ProjectPushDownSpec.class), > > > @JsonSubTypes.Type(value = ReadingMetadataSpec.class), > > > @JsonSubTypes.Type(value = WatermarkPushDownSpec.class), > > > @JsonSubTypes.Type(value = SourceWatermarkSpec.class), > > > @JsonSubTypes.Type(value = AggregatePushDownSpec.class), > > > + @JsonSubTypes.Type(value = PartitioningSpec.class) > > // > > > > new added > > > > > > > > > > > > Please let me know if that answers your questions or if you have other > > > comments. > > > > > > Regards, > > > Jeyhun > > > > > > > > > On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com> > > wrote: > > > > > > > > > Hi Jeyhun, > > > > > > > > Thank you for leading the discussion. I'm generally +1 with this > > > > proposal, > > > > > along with some questions. Please see my comments below. > > > > > > > > 1. Concerning the `sourcePartitions()` method, the partition > > information > > > > > returned during the optimization phase may not be the same as the > > > > partition > > > > > information during runtime execution. For long-running jobs, > > partitions > > > > may > > > > > be continuously created. Is this FLIP equipped to handle scenarios? > > > > > > > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I > > > > understand that it is also necessary to verify whether the hash key > > > > within > > > > > the Exchange node is consistent with the partition key defined in the > > > > table > > > > > source that implements `SupportsPartitioning`. > > > > > > > > 3. Could you elaborate on the desired physical plan and integration > > with > > > > > `CompiledPlan` to enhance the overall functionality? > > > > > > > > Best, > > > > Jane > > > > > > > > On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes > > <jhug...@confluent.io.invalid > > > > > > > > > wrote: > > > > > > > > > > Hi Jeyhun, > > > > > > > > > > I like the idea! Given FLIP-376[1], I wonder if it'd make sense to > > > > > generalize FLIP-434 to be about "pre-divided" data to cover > > "buckets" > > > > and > > > > > > "partitions" (and maybe even situations where a data source is > > > > > partitioned > > > > > > and bucketed). > > > > > > > > > > Separate from that, the page mentions TPC-H Q1 as an example. For > > a > > > > > join, > > > > > > any two tables joined on the same bucket key should provide a > > concrete > > > > > > example of a join. Systems like Kafka Streams/ksqlDB call this > > > > > "co-partitioning"; for those systems, it is a requirement placed > > on the > > > > > > input sources. For Flink, with FLIP-434, the proposed planner rule > > > > > could remove the shuffle. > > > > > > > > > > Definitely a fun idea; I look forward to hearing more! > > > > > > > > > > Cheers, > > > > > > > > > > Jim > > > > > > > > > > > > > > > 1. > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > > > > > > 2. > > > > > > > > > > > > > > > > > > > > > > https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements > > > > > > > > > > > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov < > > je.kari...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > Hi devs, > > > > > > > > > > > > I’d like to start a discussion on FLIP-434: Support > > optimizations for > > > > > > > pre-partitioned data sources [1]. > > > > > > > > > > > > The FLIP introduces taking advantage of pre-partitioned data > > sources > > > > > for > > > > > > > SQL/Table API (it is already supported as experimental feature in > > > > > > DataStream API [2]). > > > > > > > > > > > > > > > > > > Please find more details in the FLIP wiki document [1]. > > > > > > Looking forward to your feedback. > > > > > > > > > > > > Regards, > > > > > > Jeyhun > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources > > > > > > > [2] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > Best, > Benchao Li > >