Hey, Jeyhun Thanks for kicking off this discussion. I have two questions about streaming sources:
(1)The FLIP motivation section says Kafka broker is already partitioned w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning by key fields is not the default partitioner of Kafka default partitioner[1] IIUC. (2) Considering the FLIP’s optimization scope aims to both Batch and Streaming pre-partitioned source, could you add a Streaming Source example to help me understand the FLIP better? I think Kafka Source is a good candidates for streaming source example, file source is a good one for batch source and it really helped me to follow-up the FLIP. Best, Leonard [1]https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31 > 2024年4月3日 上午5:53,Jeyhun Karimov <je.kari...@gmail.com> 写道: > > Hi Lincoln, > > Thanks a lot for your comments. Please find my answers below. > > > 1. Is this flip targeted only at batch scenarios or does it include >> streaming? >> (The flip and the discussion did not explicitly mention this, but in the >> draft pr, I only >> saw the implementation for batch scenarios >> >> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8 >> < >> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89 >>> >> ) >> If we expect this also apply to streaming, then we need to consider the >> stricter >> shuffle restrictions of streaming compared to batch (if support is >> considered, >> more discussion is needed here, let’s not expand for now). If it only >> applies to batch, >> it is recommended to clarify in the flip. > > > - The FLIP targets both streaming and batch scenarios. > Could you please elaborate more on what you mean by additional > restrictions? > > > 2. In the current implementation, the optimized plan seems to have some >> problems. >> As described in the class comments: >> >> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60 > > BatchPhysicalHashAggregate (local) > > +- BatchPhysicalLocalHashAggregate (local) >> +- BatchPhysicalTableSourceScan >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of >> one-phase >> hashAgg, localAgg is not necessary, which is the scenario currently handled >> by >> `RemoveRedundantLocalHashAggRule` and other rules) > > > - Yes, you are completely right. Note that the PR you referenced is just a > quick PoC. > Redundant operators you mentioned exist because > `RemoveRedundantShuffleRule` just removes the Exchange operator, > without modifying upstream/downstream operators. > As I mentioned, the implementation is just a PoC and the end implementation > will make sure that existing redundancy elimination rules remove redundant > operators. > > > Also, in the draft pr, >> the optimization of `testShouldEliminatePartitioning1` & >> `testShouldEliminatePartitioning2` >> seems didn't take effect? >> >> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38 > > > - Note that in this example, Exchange operator have a > property KEEP_INPUT_AS_IS that indicates that data distribution is the same > as its input. > Since we have redundant operators (as shown above, two aggregate operators) > one of the rules (not in this FLIP) > adds this Exchange operator with KEEP_INPUT_AS_IS in between. > Similar to my comment above, the end implementation will be except from > redundant operators. > > In conjunction with question 2, I am wondering if we have a better choice >> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s >> `RemoveRedundantLocalXXRule`s >> to the `PHYSICAL_REWRITE`). >> For example, let the source actively provide some traits (including >> `FlinkRelDistribution` >> and `RelCollation`) to the planner. The advantage of doing this is to >> directly reuse the >> current shuffle remove optimization (as `FlinkExpandConversionRule` >> implemented), >> and according to the data distribution characteristics provided by the >> source, the planner >> may choose a physical operator with a cheaper costs (for example, according >> to `RelCollation`, >> the planner can use sortAgg, no need for a separate local sort operation). >> WDYT? > > > - Good point. Makes sense to me. I will check FlinkExpandConversionRule to > be utilized in the implementation. > > > Regards, > Jeyhun > > > > On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee <lincoln.8...@gmail.com> wrote: > >> Hi Jeyhun, >> >> Thank you for driving this, it would be very useful optimization! >> >> Sorry for joining the discussion now(I originally planned to reply earlier, >> but >> happened to be during my vacation). I have two questions: >> >> 1. Is this flip targeted only at batch scenarios or does it include >> streaming? >> (The flip and the discussion did not explicitly mention this, but in the >> draft pr, I only >> saw the implementation for batch scenarios >> >> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8 >> < >> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89 >>> >> ) >> If we expect this also apply to streaming, then we need to consider the >> stricter >> shuffle restrictions of streaming compared to batch (if support is >> considered, >> more discussion is needed here, let’s not expand for now). If it only >> applies to batch, >> it is recommended to clarify in the flip. >> >> 2. In the current implementation, the optimized plan seems to have some >> problems. >> As described in the class comments: >> >> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60 >> >> BatchPhysicalHashAggregate (local) >> >> +- BatchPhysicalLocalHashAggregate (local) >> >> +- BatchPhysicalTableSourceScan >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of >> one-phase >> hashAgg, localAgg is not necessary, which is the scenario currently handled >> by >> `RemoveRedundantLocalHashAggRule` and other rules). Also, in the draft pr, >> the >> optimization of `testShouldEliminatePartitioning1` & >> `testShouldEliminatePartitioning2` >> seems didn't take effect? >> >> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38 >> >> In conjunction with question 2, I am wondering if we have a better choice >> (of course, >> not simply adding the current `PHYSICAL_OPT_RULES`'s >> `RemoveRedundantLocalXXRule`s >> to the `PHYSICAL_REWRITE`). >> For example, let the source actively provide some traits (including >> `FlinkRelDistribution` >> and `RelCollation`) to the planner. The advantage of doing this is to >> directly reuse the >> current shuffle remove optimization (as `FlinkExpandConversionRule` >> implemented), >> and according to the data distribution characteristics provided by the >> source, the planner >> may choose a physical operator with a cheaper costs (for example, according >> to `RelCollation`, >> the planner can use sortAgg, no need for a separate local sort operation). >> WDYT? >> >> >> Best, >> Lincoln Lee >> >> >> Jeyhun Karimov <je.kari...@gmail.com> 于2024年4月1日周一 18:00写道: >> >>> Hi everyone, >>> >>> Thanks for your valuable feedback! >>> >>> The discussion on this FLIP has been going on for a while. >>> I would like to start a vote after 48 hours. >>> >>> Please let me know if you have any concerns or any further >>> questions/comments. >>> >>> Regards, >>> Jeyhun >>> >>> >>> On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov <je.kari...@gmail.com> >>> wrote: >>> >>>> Hi Lorenzo, >>>> >>>> Thanks a lot for your comments. Please find my answers below: >>>> >>>> >>>> For the interface `SupportsPartitioning`, why returning `Optional`? >>>>> If one decides to implement that, partitions must exist (at maximum, >>>>> return and empty list). Returning `Optional` seem just to complicate >> the >>>>> logic of the code using that interface. >>>> >>>> >>>> - The reasoning behind the use of Optional is that sometimes (e.g., in >>>> HiveTableSource) the partitioning info is in catalog. >>>> Therefore, we return Optional.empty(), so that the list of partitions >>> is >>>> queried from the catalog. >>>> >>>> >>>> I foresee the using code doing something like: "if the source supports >>>>> partitioning, get the partitions, but if they don't exist, raise a >>> runtime >>>>> exception". Let's simply make that safe at compile time and guarantee >>> the >>>>> code that partitions exist. >>>> >>>> >>>> - Yes, once partitions cannot be found, neither from catalog nor from >> the >>>> interface implementation, then we raise an exception during query >> compile >>>> time. >>>> >>>> >>>> Another thing is that you show Hive-like partitioning in your FS >>>>> structure, do you think it makes sense to add a note about >>> auto-discovery >>>>> of partitions? >>>> >>>> >>>> - Yes, the FLIP contains just an example partitioning for filesystem >>>> connector. Each connector already "knows" about autodiscovery of its >>>> partitions. And we rely on this fact. >>>> For example, partition discovery is different between kafka and >>>> filesystem sources. So, we do not handle the manual discovery of >>>> partitions. Please correct me if I misunderstood your question. >>>> >>>> >>>> In other terms, it looks a bit counterintuitive that the user >>> implementing >>>>> the source has to specify which partitions exist statically (and they >>> can >>>>> change at runtime), while the source itself knows the data provider >> and >>> can >>>>> directly implement a method `discoverPartitions`. Then Flink would >> take >>>>> care of invoking that method when needed. >>>> >>>> >>>> We utilize table option SOURCE_MONITOR_INTERVAL to check whether >>>> partitions are static or not. So, a user still should give Flink a hint >>>> about partitions being static or not. With static partitions Flink can >> do >>>> more optimizations. >>>> >>>> Please let me know if my replies answer your questions or if you have >>> more >>>> comments. >>>> >>>> Regards, >>>> Jeyhun >>>> >>>> >>>> >>>> On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com> >> wrote: >>>> >>>>> Hello Jeyhun, >>>>> I really like the proposal and definitely makes sense to me. >>>>> >>>>> I have a couple of nits here and there: >>>>> >>>>> For the interface `SupportsPartitioning`, why returning `Optional`? >>>>> If one decides to implement that, partitions must exist (at maximum, >>>>> return and empty list). Returning `Optional` seem just to complicate >> the >>>>> logic of the code using that interface. >>>>> >>>>> I foresee the using code doing something like: "if the source supports >>>>> partitioning, get the partitions, but if they don't exist, raise a >>> runtime >>>>> exception". Let's simply make that safe at compile time and guarantee >>> the >>>>> code that partitions exist. >>>>> >>>>> Another thing is that you show Hive-like partitioning in your FS >>>>> structure, do you think it makes sense to add a note about >>> auto-discovery >>>>> of partitions? >>>>> >>>>> In other terms, it looks a bit counterintuitive that the user >>>>> implementing the source has to specify which partitions exist >> statically >>>>> (and they can change at runtime), while the source itself knows the >> data >>>>> provider and can directly implement a method `discoverPartitions`. >> Then >>>>> Flink would take care of invoking that method when needed. >>>>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov <je.kari...@gmail.com >>> , >>>>> wrote: >>>>> >>>>> Hi Benchao, >>>>> >>>>> Thanks for your comments. >>>>> >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What >>>>> >>>>> if we cannot have a good greatest common divisor, like 127 + 128, >>>>> could we just utilize one side's pre-partitioned attribute, and let >>>>> another side just do the shuffle? >>>>> >>>>> >>>>> >>>>> There are two cases we need to consider: >>>>> >>>>> 1. Static Partition (no partitions are added during the query >> execution) >>>>> is >>>>> enabled AND both sources implement "SupportsPartitionPushdown" >>>>> >>>>> In this case, we are sure that no new partitions will be added at >>> runtime. >>>>> So, we have a chance equalize both sources' partitions and >> parallelism, >>>>> IFF >>>>> both sources implement "SupportsPartitionPushdown" interface. >>>>> To achieve so, first we will fetch the existing partitions from >> source1 >>>>> (say p_s1) and from source2 (say p_s2). >>>>> Then, we find the intersection of these two partition sets (say >>>>> p_intersect) and pushdown these partitions: >>>>> >>>>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure >>> that >>>>> only specific partitions are read >>>>> SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned >>>>> read >>>>> with filtered partitions >>>>> >>>>> Lastly, we need to change the parallelism of 1) source1, 2) source2, >> and >>>>> 3) >>>>> all of their downstream operators until (and including) their first >>> common >>>>> ancestor (e.g., join) to be equal to the number of partitions (size of >>>>> p_intersect). >>>>> >>>>> 2. All other cases >>>>> >>>>> In all other cases, the parallelism of both sources and their >> downstream >>>>> operators until their common ancestor would be equal to the MIN(p_s1, >>>>> p_s2). >>>>> That is, minimum of the partition size of source1 and partition size >> of >>>>> source2 will be selected as the parallelism. >>>>> Coming back to your example, if source1 parallelism is 127 and source2 >>>>> parallelism is 128, then we will first check the partition size of >>> source1 >>>>> and source2. >>>>> Say partition size of source1 is 100 and partition size of source2 is >>> 90. >>>>> Then, we would set the parallelism for source1, source2, and all of >>> their >>>>> downstream operators until (and including) the join operator >>>>> to 90 (min(100, 90)). >>>>> We also plan to implement a cost based decision instead of the >>> rule-based >>>>> one (the ones explained above - MIN rule). >>>>> One possible result of the cost based estimation is to keep the >>> partitions >>>>> on one side and perform the shuffling on another source. >>>>> >>>>> >>>>> >>>>> 2. In our current shuffle remove design (FlinkExpandConversionRule), >>>>> >>>>> we don't consider parallelism, we just remove unnecessary shuffles >>>>> according to the distribution columns. After this FLIP, the >>>>> parallelism may be bundled with source's partitions, then how will >>>>> this optimization accommodate with FlinkExpandConversionRule, will you >>>>> also change downstream operator's parallelisms if we want to also >>>>> remove subsequent shuffles? >>>>> >>>>> >>>>> >>>>> >>>>> - From my understanding of FlinkExpandConversionRule, its removal >> logic >>> is >>>>> agnostic to operator parallelism. >>>>> So, if FlinkExpandConversionRule decides to remove a shuffle >> operation, >>>>> then this FLIP will search another possible shuffle (the one closest >> to >>>>> the >>>>> source) to remove. >>>>> If there is such an opportunity, this FLIP will remove the shuffle. >> So, >>>>> from my understanding FlinkExpandConversionRule and this optimization >>> rule >>>>> can work together safely. >>>>> Please correct me if I misunderstood your question. >>>>> >>>>> >>>>> >>>>> Regarding the new optimization rule, have you also considered to allow >>>>> >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For >>>>> example, source is pre-partitioned by a, b columns, if we are >>>>> consuming this source, and do a aggregate on a, b, c, can we utilize >>>>> this optimization? >>>>> >>>>> >>>>> >>>>> - Good point. Yes, there are some cases that non-strict mode will >> apply. >>>>> For example: >>>>> >>>>> - pre-partitioned columns and aggregate columns are the same but have >>>>> different order (e.g., source pre-partitioned w.r.t. a,b and aggregate >>> has >>>>> a GROUP BY b,a) >>>>> - columns in the Exchange operator is a list-prefix of pre-partitoned >>>>> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and >>>>> Exchange's partition columns are a,b) >>>>> >>>>> Please let me know if the above answers your questions or if you have >>> any >>>>> other comments. >>>>> >>>>> Regards, >>>>> Jeyhun >>>>> >>>>> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org> >>> wrote: >>>>> >>>>> Thanks Jeyhun for bringing up this discussion, it is really exiting, >>>>> +1 for the general idea. >>>>> >>>>> We also introduced a similar concept in Flink Batch internally to cope >>>>> with bucketed tables in Hive, it is a very important improvement. >>>>> >>>>>> One thing to note is that for join queries, the parallelism of each >>> join >>>>>> source might be different. This might result in >>>>>> inconsistencies while using the pre-partitioned/pre-divided data >>> (e.g., >>>>>> different mappings of partitions to source operators). >>>>>> Therefore, it is the job of planner to detect this and adjust the >>>>>> parallelism. With that having in mind, >>>>>> the rest (how the split assigners perform) is consistent among many >>>>>> sources. >>>>> >>>>> >>>>> Could you elaborate a little more on this. I added my two cents here >>>>> about this part: >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What >>>>> if we cannot have a good greatest common divisor, like 127 + 128, >>>>> could we just utilize one side's pre-partitioned attribute, and let >>>>> another side just do the shuffle? >>>>> 2. In our current shuffle remove design (FlinkExpandConversionRule), >>>>> we don't consider parallelism, we just remove unnecessary shuffles >>>>> according to the distribution columns. After this FLIP, the >>>>> parallelism may be bundled with source's partitions, then how will >>>>> this optimization accommodate with FlinkExpandConversionRule, will you >>>>> also change downstream operator's parallelisms if we want to also >>>>> remove subsequent shuffles? >>>>> >>>>> >>>>> Regarding the new optimization rule, have you also considered to allow >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For >>>>> example, source is pre-partitioned by a, b columns, if we are >>>>> consuming this source, and do a aggregate on a, b, c, can we utilize >>>>> this optimization? >>>>> >>>>> Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道: >>>>> >>>>>> >>>>>> Hi Jeyhun, >>>>>> >>>>>> Thanks for your clarification. >>>>>> >>>>> >>>>>>> Once a new partition is detected, we add it to our existing >> mapping. >>>>> >>>>> Our >>>>> >>>>>> mapping looks like Map<Integer, Set<Integer>> >>>>> >>>>> subtaskToPartitionAssignment, >>>>> >>>>>> where it maps each source subtaskID to zero or more partitions. >>>>>> >>>>>> I understand your point. **It would be better if you could sync the >>>>>> content to the FLIP**. >>>>>> >>>>>> Another thing is I'm curious about what the physical plan looks >> like. >>> Is >>>>>> there any specific info that will be added to the table source (like >>>>>> filter/project pushdown)? It would be great if you could attach an >>>>> >>>>> example >>>>> >>>>>> to the FLIP. >>>>>> >>>>>> Bests, >>>>>> Jane >>>>>> >>>>>> On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov < >> je.kari...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>>> >>>>> >>>>>>> Hi Jane, >>>>>>> >>>>>>> Thanks for your comments. >>>>>>> >>>>>>> >>>>>>> 1. Concerning the `sourcePartitions()` method, the partition >>>>> >>>>> information >>>>> >>>>>>>> returned during the optimization phase may not be the same as >> the >>>>> >>>>>>> partition >>>>> >>>>>>>> information during runtime execution. For long-running jobs, >>>>> >>>>> partitions >>>>> >>>>>>> may >>>>> >>>>>>>> be continuously created. Is this FLIP equipped to handle >>> scenarios? >>>>> >>>>>>> >>>>>>> >>>>>>> - Good point. This scenario is definitely supported. >>>>>>> Once a new partition is added, or in general, new splits are >>>>>>> discovered, >>>>>>> PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> >>>>>>> newSplits) >>>>>>> method will be called. Inside that method, we are able to detect >> if >>> a >>>>> >>>>> split >>>>> >>>>>>> belongs to existing partitions or there is a new partition. >>>>>>> Once a new partition is detected, we add it to our existing >> mapping. >>>>> >>>>> Our >>>>> >>>>>>> mapping looks like Map<Integer, Set<Integer>> >>>>> >>>>> subtaskToPartitionAssignment, >>>>> >>>>>>> where >>>>>>> it maps each source subtaskID to zero or more partitions. >>>>>>> >>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I >>>>> >>>>>>>> understand that it is also necessary to verify whether the hash >>> key >>>>> >>>>>>> within >>>>> >>>>>>>> the Exchange node is consistent with the partition key defined >> in >>>>> the >>>>> >>>>>>> table >>>>> >>>>>>>> source that implements `SupportsPartitioning`. >>>>> >>>>>>> >>>>>>> >>>>>>> - Yes, I overlooked that point, fixed. Actually, the rule is much >>>>>>> complicated. I tried to simplify it in the FLIP. Good point. >>>>>>> >>>>>>> >>>>>>> 3. Could you elaborate on the desired physical plan and >> integration >>>>> >>>>> with >>>>> >>>>>>>> `CompiledPlan` to enhance the overall functionality? >>>>> >>>>>>> >>>>>>> >>>>>>> - For compiled plan, PartitioningSpec will be used, with a json >> tag >>>>>>> "Partitioning". As a result, in the compiled plan, the source >>> operator >>>>> >>>>> will >>>>> >>>>>>> have >>>>>>> "abilities" : [ { "type" : "Partitioning" } ] as part of the >>> compiled >>>>> >>>>> plan. >>>>> >>>>>>> More about the implementation details below: >>>>>>> >>>>>>> -------------------------------- >>>>>>> PartitioningSpec class >>>>>>> -------------------------------- >>>>>>> @JsonTypeName("Partitioning") >>>>>>> public final class PartitioningSpec extends SourceAbilitySpecBase >> { >>>>>>> // some code here >>>>>>> @Override >>>>>>> public void apply(DynamicTableSource tableSource, >>>>> >>>>> SourceAbilityContext >>>>> >>>>>>> context) { >>>>>>> if (tableSource instanceof SupportsPartitioning) { >>>>>>> ((SupportsPartitioning<?>) >>>>> >>>>> tableSource).applyPartitionedRead(); >>>>> >>>>>>> } else { >>>>>>> throw new TableException( >>>>>>> String.format( >>>>>>> "%s does not support >>>>> >>>>> SupportsPartitioning.", >>>>> >>>>>>> tableSource.getClass().getName())); >>>>>>> } >>>>>>> } >>>>>>> // some code here >>>>>>> } >>>>>>> >>>>>>> -------------------------------- >>>>>>> SourceAbilitySpec class >>>>>>> -------------------------------- >>>>>>> @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = >>>>>>> JsonTypeInfo.As.PROPERTY, property = "type") >>>>>>> @JsonSubTypes({ >>>>>>> @JsonSubTypes.Type(value = FilterPushDownSpec.class), >>>>>>> @JsonSubTypes.Type(value = LimitPushDownSpec.class), >>>>>>> @JsonSubTypes.Type(value = PartitionPushDownSpec.class), >>>>>>> @JsonSubTypes.Type(value = ProjectPushDownSpec.class), >>>>>>> @JsonSubTypes.Type(value = ReadingMetadataSpec.class), >>>>>>> @JsonSubTypes.Type(value = WatermarkPushDownSpec.class), >>>>>>> @JsonSubTypes.Type(value = SourceWatermarkSpec.class), >>>>>>> @JsonSubTypes.Type(value = AggregatePushDownSpec.class), >>>>>>> + @JsonSubTypes.Type(value = PartitioningSpec.class) >>>>> >>>>> // >>>>> >>>>>>> new added >>>>>>> >>>>>>> >>>>>>> >>>>>>> Please let me know if that answers your questions or if you have >>> other >>>>>>> comments. >>>>>>> >>>>>>> Regards, >>>>>>> Jeyhun >>>>>>> >>>>>>> >>>>>>> On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>>>> >>>>> >>>>>>>> Hi Jeyhun, >>>>>>>> >>>>>>>> Thank you for leading the discussion. I'm generally +1 with this >>>>> >>>>>>> proposal, >>>>> >>>>>>>> along with some questions. Please see my comments below. >>>>>>>> >>>>>>>> 1. Concerning the `sourcePartitions()` method, the partition >>>>> >>>>> information >>>>> >>>>>>>> returned during the optimization phase may not be the same as >> the >>>>> >>>>>>> partition >>>>> >>>>>>>> information during runtime execution. For long-running jobs, >>>>> >>>>> partitions >>>>> >>>>>>> may >>>>> >>>>>>>> be continuously created. Is this FLIP equipped to handle >>> scenarios? >>>>>>>> >>>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization >> rule, I >>>>>>>> understand that it is also necessary to verify whether the hash >>> key >>>>> >>>>>>> within >>>>> >>>>>>>> the Exchange node is consistent with the partition key defined >> in >>>>> the >>>>> >>>>>>> table >>>>> >>>>>>>> source that implements `SupportsPartitioning`. >>>>>>>> >>>>>>>> 3. Could you elaborate on the desired physical plan and >>> integration >>>>> >>>>> with >>>>> >>>>>>>> `CompiledPlan` to enhance the overall functionality? >>>>>>>> >>>>>>>> Best, >>>>>>>> Jane >>>>>>>> >>>>>>>> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes >>>>> >>>>> <jhug...@confluent.io.invalid >>>>> >>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>> >>>>>>>>> Hi Jeyhun, >>>>>>>>> >>>>>>>>> I like the idea! Given FLIP-376[1], I wonder if it'd make >> sense >>> to >>>>>>>>> generalize FLIP-434 to be about "pre-divided" data to cover >>>>> >>>>> "buckets" >>>>> >>>>>>> and >>>>> >>>>>>>>> "partitions" (and maybe even situations where a data source is >>>>> >>>>>>>> partitioned >>>>> >>>>>>>>> and bucketed). >>>>>>>>> >>>>>>>>> Separate from that, the page mentions TPC-H Q1 as an example. >>> For >>>>> >>>>> a >>>>> >>>>>>>> join, >>>>> >>>>>>>>> any two tables joined on the same bucket key should provide a >>>>> >>>>> concrete >>>>> >>>>>>>>> example of a join. Systems like Kafka Streams/ksqlDB call this >>>>>>>>> "co-partitioning"; for those systems, it is a requirement >> placed >>>>> >>>>> on the >>>>> >>>>>>>>> input sources. For Flink, with FLIP-434, the proposed planner >>> rule >>>>>>>>> could remove the shuffle. >>>>>>>>> >>>>>>>>> Definitely a fun idea; I look forward to hearing more! >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> >>>>>>>>> Jim >>>>>>>>> >>>>>>>>> >>>>>>>>> 1. >>>>>>>>> >>>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause >>>>> >>>>>>>>> 2. >>>>>>>>> >>>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>> >>> >> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements >>>>> >>>>>>>>> >>>>>>>>> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov < >>>>> >>>>> je.kari...@gmail.com> >>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>> >>>>>>>>>> Hi devs, >>>>>>>>>> >>>>>>>>>> I’d like to start a discussion on FLIP-434: Support >>>>> >>>>> optimizations for >>>>> >>>>>>>>>> pre-partitioned data sources [1]. >>>>>>>>>> >>>>>>>>>> The FLIP introduces taking advantage of pre-partitioned data >>>>> >>>>> sources >>>>> >>>>>>>> for >>>>> >>>>>>>>>> SQL/Table API (it is already supported as experimental >> feature >>>>> in >>>>>>>>>> DataStream API [2]). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Please find more details in the FLIP wiki document [1]. >>>>>>>>>> Looking forward to your feedback. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Jeyhun >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources >>>>> >>>>>>>>>> [2] >>>>>>>>>> >>>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>> >>> >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ >>>>> >>>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Best, >>>>> Benchao Li >>>>> >>>>> >>> >>