Hi Lincoln, Thanks for your reply. My idea was to utilize MapBundleFunction as it was already used in a similar context - MiniBatchLocalGroupAggFunction. I can also extend my PoC for streaming sources and get back to continue our discussion.
Regards, Jeyhun On Wed, Apr 3, 2024 at 4:33 PM Lincoln Lee <lincoln.8...@gmail.com> wrote: > Hi Jeyhun, > > Thanks for your quick response! > > In streaming scenario, shuffle commonly occurs before the stateful > operator, and there's a sanity check[1] when the stateful operator > accesses the state. This implies the consistency requirement of the > partitioner used for data shuffling and state key selector for state > accessing(see KeyGroupStreamPartitioner for more details), > otherwise, there may be state access errors. That is to say, in the > streaming scenario, it is not only the strict requirement described in > FlinkRelDistribution#requireStrict, but also the implied consistency of > hash calculation. > > Also, if this flip targets both streaming and batch scenarios, it is > recommended to do PoC validation for streaming as well. > > [1] https://issues.apache.org/jira/browse/FLINK-29430 > > > Best, > Lincoln Lee > > > Leonard Xu <xbjt...@gmail.com> 于2024年4月3日周三 14:25写道: > > > Hey, Jeyhun > > > > Thanks for kicking off this discussion. I have two questions about > > streaming sources: > > > > (1)The FLIP motivation section says Kafka broker is already partitioned > > w.r.t. some key[s] , Is this the main use case in Kafka world? > Partitioning > > by key fields is not the default partitioner of Kafka default > > partitioner[1] IIUC. > > > > (2) Considering the FLIP’s optimization scope aims to both Batch and > > Streaming pre-partitioned source, could you add a Streaming Source > example > > to help me understand the FLIP better? I think Kafka Source is a good > > candidates for streaming source example, file source is a good one for > > batch source and it really helped me to follow-up the FLIP. > > > > Best, > > Leonard > > [1] > > > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31 > > > > > > > > > 2024年4月3日 上午5:53,Jeyhun Karimov <je.kari...@gmail.com> 写道: > > > > > > Hi Lincoln, > > > > > > Thanks a lot for your comments. Please find my answers below. > > > > > > > > > 1. Is this flip targeted only at batch scenarios or does it include > > >> streaming? > > >> (The flip and the discussion did not explicitly mention this, but in > the > > >> draft pr, I only > > >> saw the implementation for batch scenarios > > >> > > >> > > > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8 > > >> < > > >> > > > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89 > > >>> > > >> ) > > >> If we expect this also apply to streaming, then we need to consider > the > > >> stricter > > >> shuffle restrictions of streaming compared to batch (if support is > > >> considered, > > >> more discussion is needed here, let’s not expand for now). If it only > > >> applies to batch, > > >> it is recommended to clarify in the flip. > > > > > > > > > - The FLIP targets both streaming and batch scenarios. > > > Could you please elaborate more on what you mean by additional > > > restrictions? > > > > > > > > > 2. In the current implementation, the optimized plan seems to have some > > >> problems. > > >> As described in the class comments: > > >> > > >> > > > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60 > > > > > > BatchPhysicalHashAggregate (local) > > > > > > +- BatchPhysicalLocalHashAggregate (local) > > >> +- BatchPhysicalTableSourceScan > > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case > of > > >> one-phase > > >> hashAgg, localAgg is not necessary, which is the scenario currently > > handled > > >> by > > >> `RemoveRedundantLocalHashAggRule` and other rules) > > > > > > > > > - Yes, you are completely right. Note that the PR you referenced is > just > > a > > > quick PoC. > > > Redundant operators you mentioned exist because > > > `RemoveRedundantShuffleRule` just removes the Exchange operator, > > > without modifying upstream/downstream operators. > > > As I mentioned, the implementation is just a PoC and the end > > implementation > > > will make sure that existing redundancy elimination rules remove > > redundant > > > operators. > > > > > > > > > Also, in the draft pr, > > >> the optimization of `testShouldEliminatePartitioning1` & > > >> `testShouldEliminatePartitioning2` > > >> seems didn't take effect? > > >> > > >> > > > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38 > > > > > > > > > - Note that in this example, Exchange operator have a > > > property KEEP_INPUT_AS_IS that indicates that data distribution is the > > same > > > as its input. > > > Since we have redundant operators (as shown above, two aggregate > > operators) > > > one of the rules (not in this FLIP) > > > adds this Exchange operator with KEEP_INPUT_AS_IS in between. > > > Similar to my comment above, the end implementation will be except from > > > redundant operators. > > > > > > In conjunction with question 2, I am wondering if we have a better > choice > > >> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s > > >> `RemoveRedundantLocalXXRule`s > > >> to the `PHYSICAL_REWRITE`). > > >> For example, let the source actively provide some traits (including > > >> `FlinkRelDistribution` > > >> and `RelCollation`) to the planner. The advantage of doing this is to > > >> directly reuse the > > >> current shuffle remove optimization (as `FlinkExpandConversionRule` > > >> implemented), > > >> and according to the data distribution characteristics provided by the > > >> source, the planner > > >> may choose a physical operator with a cheaper costs (for example, > > according > > >> to `RelCollation`, > > >> the planner can use sortAgg, no need for a separate local sort > > operation). > > >> WDYT? > > > > > > > > > - Good point. Makes sense to me. I will check FlinkExpandConversionRule > > to > > > be utilized in the implementation. > > > > > > > > > Regards, > > > Jeyhun > > > > > > > > > > > > On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee <lincoln.8...@gmail.com> > > wrote: > > > > > >> Hi Jeyhun, > > >> > > >> Thank you for driving this, it would be very useful optimization! > > >> > > >> Sorry for joining the discussion now(I originally planned to reply > > earlier, > > >> but > > >> happened to be during my vacation). I have two questions: > > >> > > >> 1. Is this flip targeted only at batch scenarios or does it include > > >> streaming? > > >> (The flip and the discussion did not explicitly mention this, but in > the > > >> draft pr, I only > > >> saw the implementation for batch scenarios > > >> > > >> > > > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8 > > >> < > > >> > > > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89 > > >>> > > >> ) > > >> If we expect this also apply to streaming, then we need to consider > the > > >> stricter > > >> shuffle restrictions of streaming compared to batch (if support is > > >> considered, > > >> more discussion is needed here, let’s not expand for now). If it only > > >> applies to batch, > > >> it is recommended to clarify in the flip. > > >> > > >> 2. In the current implementation, the optimized plan seems to have > some > > >> problems. > > >> As described in the class comments: > > >> > > >> > > > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60 > > >> > > >> BatchPhysicalHashAggregate (local) > > >> > > >> +- BatchPhysicalLocalHashAggregate (local) > > >> > > >> +- BatchPhysicalTableSourceScan > > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case > of > > >> one-phase > > >> hashAgg, localAgg is not necessary, which is the scenario currently > > handled > > >> by > > >> `RemoveRedundantLocalHashAggRule` and other rules). Also, in the > draft > > pr, > > >> the > > >> optimization of `testShouldEliminatePartitioning1` & > > >> `testShouldEliminatePartitioning2` > > >> seems didn't take effect? > > >> > > >> > > > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38 > > >> > > >> In conjunction with question 2, I am wondering if we have a better > > choice > > >> (of course, > > >> not simply adding the current `PHYSICAL_OPT_RULES`'s > > >> `RemoveRedundantLocalXXRule`s > > >> to the `PHYSICAL_REWRITE`). > > >> For example, let the source actively provide some traits (including > > >> `FlinkRelDistribution` > > >> and `RelCollation`) to the planner. The advantage of doing this is to > > >> directly reuse the > > >> current shuffle remove optimization (as `FlinkExpandConversionRule` > > >> implemented), > > >> and according to the data distribution characteristics provided by the > > >> source, the planner > > >> may choose a physical operator with a cheaper costs (for example, > > according > > >> to `RelCollation`, > > >> the planner can use sortAgg, no need for a separate local sort > > operation). > > >> WDYT? > > >> > > >> > > >> Best, > > >> Lincoln Lee > > >> > > >> > > >> Jeyhun Karimov <je.kari...@gmail.com> 于2024年4月1日周一 18:00写道: > > >> > > >>> Hi everyone, > > >>> > > >>> Thanks for your valuable feedback! > > >>> > > >>> The discussion on this FLIP has been going on for a while. > > >>> I would like to start a vote after 48 hours. > > >>> > > >>> Please let me know if you have any concerns or any further > > >>> questions/comments. > > >>> > > >>> Regards, > > >>> Jeyhun > > >>> > > >>> > > >>> On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov <je.kari...@gmail.com > > > > >>> wrote: > > >>> > > >>>> Hi Lorenzo, > > >>>> > > >>>> Thanks a lot for your comments. Please find my answers below: > > >>>> > > >>>> > > >>>> For the interface `SupportsPartitioning`, why returning `Optional`? > > >>>>> If one decides to implement that, partitions must exist (at > maximum, > > >>>>> return and empty list). Returning `Optional` seem just to > complicate > > >> the > > >>>>> logic of the code using that interface. > > >>>> > > >>>> > > >>>> - The reasoning behind the use of Optional is that sometimes (e.g., > in > > >>>> HiveTableSource) the partitioning info is in catalog. > > >>>> Therefore, we return Optional.empty(), so that the list of > partitions > > >>> is > > >>>> queried from the catalog. > > >>>> > > >>>> > > >>>> I foresee the using code doing something like: "if the source > supports > > >>>>> partitioning, get the partitions, but if they don't exist, raise a > > >>> runtime > > >>>>> exception". Let's simply make that safe at compile time and > guarantee > > >>> the > > >>>>> code that partitions exist. > > >>>> > > >>>> > > >>>> - Yes, once partitions cannot be found, neither from catalog nor > from > > >> the > > >>>> interface implementation, then we raise an exception during query > > >> compile > > >>>> time. > > >>>> > > >>>> > > >>>> Another thing is that you show Hive-like partitioning in your FS > > >>>>> structure, do you think it makes sense to add a note about > > >>> auto-discovery > > >>>>> of partitions? > > >>>> > > >>>> > > >>>> - Yes, the FLIP contains just an example partitioning for filesystem > > >>>> connector. Each connector already "knows" about autodiscovery of its > > >>>> partitions. And we rely on this fact. > > >>>> For example, partition discovery is different between kafka and > > >>>> filesystem sources. So, we do not handle the manual discovery of > > >>>> partitions. Please correct me if I misunderstood your question. > > >>>> > > >>>> > > >>>> In other terms, it looks a bit counterintuitive that the user > > >>> implementing > > >>>>> the source has to specify which partitions exist statically (and > they > > >>> can > > >>>>> change at runtime), while the source itself knows the data provider > > >> and > > >>> can > > >>>>> directly implement a method `discoverPartitions`. Then Flink would > > >> take > > >>>>> care of invoking that method when needed. > > >>>> > > >>>> > > >>>> We utilize table option SOURCE_MONITOR_INTERVAL to check whether > > >>>> partitions are static or not. So, a user still should give Flink a > > hint > > >>>> about partitions being static or not. With static partitions Flink > can > > >> do > > >>>> more optimizations. > > >>>> > > >>>> Please let me know if my replies answer your questions or if you > have > > >>> more > > >>>> comments. > > >>>> > > >>>> Regards, > > >>>> Jeyhun > > >>>> > > >>>> > > >>>> > > >>>> On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com> > > >> wrote: > > >>>> > > >>>>> Hello Jeyhun, > > >>>>> I really like the proposal and definitely makes sense to me. > > >>>>> > > >>>>> I have a couple of nits here and there: > > >>>>> > > >>>>> For the interface `SupportsPartitioning`, why returning `Optional`? > > >>>>> If one decides to implement that, partitions must exist (at > maximum, > > >>>>> return and empty list). Returning `Optional` seem just to > complicate > > >> the > > >>>>> logic of the code using that interface. > > >>>>> > > >>>>> I foresee the using code doing something like: "if the source > > supports > > >>>>> partitioning, get the partitions, but if they don't exist, raise a > > >>> runtime > > >>>>> exception". Let's simply make that safe at compile time and > guarantee > > >>> the > > >>>>> code that partitions exist. > > >>>>> > > >>>>> Another thing is that you show Hive-like partitioning in your FS > > >>>>> structure, do you think it makes sense to add a note about > > >>> auto-discovery > > >>>>> of partitions? > > >>>>> > > >>>>> In other terms, it looks a bit counterintuitive that the user > > >>>>> implementing the source has to specify which partitions exist > > >> statically > > >>>>> (and they can change at runtime), while the source itself knows the > > >> data > > >>>>> provider and can directly implement a method `discoverPartitions`. > > >> Then > > >>>>> Flink would take care of invoking that method when needed. > > >>>>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov < > je.kari...@gmail.com > > >>> , > > >>>>> wrote: > > >>>>> > > >>>>> Hi Benchao, > > >>>>> > > >>>>> Thanks for your comments. > > >>>>> > > >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? > What > > >>>>> > > >>>>> if we cannot have a good greatest common divisor, like 127 + 128, > > >>>>> could we just utilize one side's pre-partitioned attribute, and let > > >>>>> another side just do the shuffle? > > >>>>> > > >>>>> > > >>>>> > > >>>>> There are two cases we need to consider: > > >>>>> > > >>>>> 1. Static Partition (no partitions are added during the query > > >> execution) > > >>>>> is > > >>>>> enabled AND both sources implement "SupportsPartitionPushdown" > > >>>>> > > >>>>> In this case, we are sure that no new partitions will be added at > > >>> runtime. > > >>>>> So, we have a chance equalize both sources' partitions and > > >> parallelism, > > >>>>> IFF > > >>>>> both sources implement "SupportsPartitionPushdown" interface. > > >>>>> To achieve so, first we will fetch the existing partitions from > > >> source1 > > >>>>> (say p_s1) and from source2 (say p_s2). > > >>>>> Then, we find the intersection of these two partition sets (say > > >>>>> p_intersect) and pushdown these partitions: > > >>>>> > > >>>>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make > sure > > >>> that > > >>>>> only specific partitions are read > > >>>>> SupportsPartitioning::applyPartitionedRead(p_intersect) // > > partitioned > > >>>>> read > > >>>>> with filtered partitions > > >>>>> > > >>>>> Lastly, we need to change the parallelism of 1) source1, 2) > source2, > > >> and > > >>>>> 3) > > >>>>> all of their downstream operators until (and including) their first > > >>> common > > >>>>> ancestor (e.g., join) to be equal to the number of partitions (size > > of > > >>>>> p_intersect). > > >>>>> > > >>>>> 2. All other cases > > >>>>> > > >>>>> In all other cases, the parallelism of both sources and their > > >> downstream > > >>>>> operators until their common ancestor would be equal to the > MIN(p_s1, > > >>>>> p_s2). > > >>>>> That is, minimum of the partition size of source1 and partition > size > > >> of > > >>>>> source2 will be selected as the parallelism. > > >>>>> Coming back to your example, if source1 parallelism is 127 and > > source2 > > >>>>> parallelism is 128, then we will first check the partition size of > > >>> source1 > > >>>>> and source2. > > >>>>> Say partition size of source1 is 100 and partition size of source2 > is > > >>> 90. > > >>>>> Then, we would set the parallelism for source1, source2, and all of > > >>> their > > >>>>> downstream operators until (and including) the join operator > > >>>>> to 90 (min(100, 90)). > > >>>>> We also plan to implement a cost based decision instead of the > > >>> rule-based > > >>>>> one (the ones explained above - MIN rule). > > >>>>> One possible result of the cost based estimation is to keep the > > >>> partitions > > >>>>> on one side and perform the shuffling on another source. > > >>>>> > > >>>>> > > >>>>> > > >>>>> 2. In our current shuffle remove design > (FlinkExpandConversionRule), > > >>>>> > > >>>>> we don't consider parallelism, we just remove unnecessary shuffles > > >>>>> according to the distribution columns. After this FLIP, the > > >>>>> parallelism may be bundled with source's partitions, then how will > > >>>>> this optimization accommodate with FlinkExpandConversionRule, will > > you > > >>>>> also change downstream operator's parallelisms if we want to also > > >>>>> remove subsequent shuffles? > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> - From my understanding of FlinkExpandConversionRule, its removal > > >> logic > > >>> is > > >>>>> agnostic to operator parallelism. > > >>>>> So, if FlinkExpandConversionRule decides to remove a shuffle > > >> operation, > > >>>>> then this FLIP will search another possible shuffle (the one > closest > > >> to > > >>>>> the > > >>>>> source) to remove. > > >>>>> If there is such an opportunity, this FLIP will remove the shuffle. > > >> So, > > >>>>> from my understanding FlinkExpandConversionRule and this > optimization > > >>> rule > > >>>>> can work together safely. > > >>>>> Please correct me if I misunderstood your question. > > >>>>> > > >>>>> > > >>>>> > > >>>>> Regarding the new optimization rule, have you also considered to > > allow > > >>>>> > > >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For > > >>>>> example, source is pre-partitioned by a, b columns, if we are > > >>>>> consuming this source, and do a aggregate on a, b, c, can we > utilize > > >>>>> this optimization? > > >>>>> > > >>>>> > > >>>>> > > >>>>> - Good point. Yes, there are some cases that non-strict mode will > > >> apply. > > >>>>> For example: > > >>>>> > > >>>>> - pre-partitioned columns and aggregate columns are the same but > have > > >>>>> different order (e.g., source pre-partitioned w.r.t. a,b and > > aggregate > > >>> has > > >>>>> a GROUP BY b,a) > > >>>>> - columns in the Exchange operator is a list-prefix of > pre-partitoned > > >>>>> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and > > >>>>> Exchange's partition columns are a,b) > > >>>>> > > >>>>> Please let me know if the above answers your questions or if you > have > > >>> any > > >>>>> other comments. > > >>>>> > > >>>>> Regards, > > >>>>> Jeyhun > > >>>>> > > >>>>> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org> > > >>> wrote: > > >>>>> > > >>>>> Thanks Jeyhun for bringing up this discussion, it is really > exiting, > > >>>>> +1 for the general idea. > > >>>>> > > >>>>> We also introduced a similar concept in Flink Batch internally to > > cope > > >>>>> with bucketed tables in Hive, it is a very important improvement. > > >>>>> > > >>>>>> One thing to note is that for join queries, the parallelism of > each > > >>> join > > >>>>>> source might be different. This might result in > > >>>>>> inconsistencies while using the pre-partitioned/pre-divided data > > >>> (e.g., > > >>>>>> different mappings of partitions to source operators). > > >>>>>> Therefore, it is the job of planner to detect this and adjust the > > >>>>>> parallelism. With that having in mind, > > >>>>>> the rest (how the split assigners perform) is consistent among > many > > >>>>>> sources. > > >>>>> > > >>>>> > > >>>>> Could you elaborate a little more on this. I added my two cents > here > > >>>>> about this part: > > >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? > What > > >>>>> if we cannot have a good greatest common divisor, like 127 + 128, > > >>>>> could we just utilize one side's pre-partitioned attribute, and let > > >>>>> another side just do the shuffle? > > >>>>> 2. In our current shuffle remove design > (FlinkExpandConversionRule), > > >>>>> we don't consider parallelism, we just remove unnecessary shuffles > > >>>>> according to the distribution columns. After this FLIP, the > > >>>>> parallelism may be bundled with source's partitions, then how will > > >>>>> this optimization accommodate with FlinkExpandConversionRule, will > > you > > >>>>> also change downstream operator's parallelisms if we want to also > > >>>>> remove subsequent shuffles? > > >>>>> > > >>>>> > > >>>>> Regarding the new optimization rule, have you also considered to > > allow > > >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For > > >>>>> example, source is pre-partitioned by a, b columns, if we are > > >>>>> consuming this source, and do a aggregate on a, b, c, can we > utilize > > >>>>> this optimization? > > >>>>> > > >>>>> Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道: > > >>>>> > > >>>>>> > > >>>>>> Hi Jeyhun, > > >>>>>> > > >>>>>> Thanks for your clarification. > > >>>>>> > > >>>>> > > >>>>>>> Once a new partition is detected, we add it to our existing > > >> mapping. > > >>>>> > > >>>>> Our > > >>>>> > > >>>>>> mapping looks like Map<Integer, Set<Integer>> > > >>>>> > > >>>>> subtaskToPartitionAssignment, > > >>>>> > > >>>>>> where it maps each source subtaskID to zero or more partitions. > > >>>>>> > > >>>>>> I understand your point. **It would be better if you could sync > the > > >>>>>> content to the FLIP**. > > >>>>>> > > >>>>>> Another thing is I'm curious about what the physical plan looks > > >> like. > > >>> Is > > >>>>>> there any specific info that will be added to the table source > (like > > >>>>>> filter/project pushdown)? It would be great if you could attach an > > >>>>> > > >>>>> example > > >>>>> > > >>>>>> to the FLIP. > > >>>>>> > > >>>>>> Bests, > > >>>>>> Jane > > >>>>>> > > >>>>>> On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov < > > >> je.kari...@gmail.com> > > >>>>> > > >>>>> wrote: > > >>>>> > > >>>>>> > > >>>>> > > >>>>>>> Hi Jane, > > >>>>>>> > > >>>>>>> Thanks for your comments. > > >>>>>>> > > >>>>>>> > > >>>>>>> 1. Concerning the `sourcePartitions()` method, the partition > > >>>>> > > >>>>> information > > >>>>> > > >>>>>>>> returned during the optimization phase may not be the same as > > >> the > > >>>>> > > >>>>>>> partition > > >>>>> > > >>>>>>>> information during runtime execution. For long-running jobs, > > >>>>> > > >>>>> partitions > > >>>>> > > >>>>>>> may > > >>>>> > > >>>>>>>> be continuously created. Is this FLIP equipped to handle > > >>> scenarios? > > >>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> - Good point. This scenario is definitely supported. > > >>>>>>> Once a new partition is added, or in general, new splits are > > >>>>>>> discovered, > > >>>>>>> > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> > > >>>>>>> newSplits) > > >>>>>>> method will be called. Inside that method, we are able to detect > > >> if > > >>> a > > >>>>> > > >>>>> split > > >>>>> > > >>>>>>> belongs to existing partitions or there is a new partition. > > >>>>>>> Once a new partition is detected, we add it to our existing > > >> mapping. > > >>>>> > > >>>>> Our > > >>>>> > > >>>>>>> mapping looks like Map<Integer, Set<Integer>> > > >>>>> > > >>>>> subtaskToPartitionAssignment, > > >>>>> > > >>>>>>> where > > >>>>>>> it maps each source subtaskID to zero or more partitions. > > >>>>>>> > > >>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, > I > > >>>>> > > >>>>>>>> understand that it is also necessary to verify whether the hash > > >>> key > > >>>>> > > >>>>>>> within > > >>>>> > > >>>>>>>> the Exchange node is consistent with the partition key defined > > >> in > > >>>>> the > > >>>>> > > >>>>>>> table > > >>>>> > > >>>>>>>> source that implements `SupportsPartitioning`. > > >>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> - Yes, I overlooked that point, fixed. Actually, the rule is much > > >>>>>>> complicated. I tried to simplify it in the FLIP. Good point. > > >>>>>>> > > >>>>>>> > > >>>>>>> 3. Could you elaborate on the desired physical plan and > > >> integration > > >>>>> > > >>>>> with > > >>>>> > > >>>>>>>> `CompiledPlan` to enhance the overall functionality? > > >>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> - For compiled plan, PartitioningSpec will be used, with a json > > >> tag > > >>>>>>> "Partitioning". As a result, in the compiled plan, the source > > >>> operator > > >>>>> > > >>>>> will > > >>>>> > > >>>>>>> have > > >>>>>>> "abilities" : [ { "type" : "Partitioning" } ] as part of the > > >>> compiled > > >>>>> > > >>>>> plan. > > >>>>> > > >>>>>>> More about the implementation details below: > > >>>>>>> > > >>>>>>> -------------------------------- > > >>>>>>> PartitioningSpec class > > >>>>>>> -------------------------------- > > >>>>>>> @JsonTypeName("Partitioning") > > >>>>>>> public final class PartitioningSpec extends SourceAbilitySpecBase > > >> { > > >>>>>>> // some code here > > >>>>>>> @Override > > >>>>>>> public void apply(DynamicTableSource tableSource, > > >>>>> > > >>>>> SourceAbilityContext > > >>>>> > > >>>>>>> context) { > > >>>>>>> if (tableSource instanceof SupportsPartitioning) { > > >>>>>>> ((SupportsPartitioning<?>) > > >>>>> > > >>>>> tableSource).applyPartitionedRead(); > > >>>>> > > >>>>>>> } else { > > >>>>>>> throw new TableException( > > >>>>>>> String.format( > > >>>>>>> "%s does not support > > >>>>> > > >>>>> SupportsPartitioning.", > > >>>>> > > >>>>>>> tableSource.getClass().getName())); > > >>>>>>> } > > >>>>>>> } > > >>>>>>> // some code here > > >>>>>>> } > > >>>>>>> > > >>>>>>> -------------------------------- > > >>>>>>> SourceAbilitySpec class > > >>>>>>> -------------------------------- > > >>>>>>> @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = > > >>>>>>> JsonTypeInfo.As.PROPERTY, property = "type") > > >>>>>>> @JsonSubTypes({ > > >>>>>>> @JsonSubTypes.Type(value = FilterPushDownSpec.class), > > >>>>>>> @JsonSubTypes.Type(value = LimitPushDownSpec.class), > > >>>>>>> @JsonSubTypes.Type(value = PartitionPushDownSpec.class), > > >>>>>>> @JsonSubTypes.Type(value = ProjectPushDownSpec.class), > > >>>>>>> @JsonSubTypes.Type(value = ReadingMetadataSpec.class), > > >>>>>>> @JsonSubTypes.Type(value = WatermarkPushDownSpec.class), > > >>>>>>> @JsonSubTypes.Type(value = SourceWatermarkSpec.class), > > >>>>>>> @JsonSubTypes.Type(value = AggregatePushDownSpec.class), > > >>>>>>> + @JsonSubTypes.Type(value = PartitioningSpec.class) > > >>>>> > > >>>>> // > > >>>>> > > >>>>>>> new added > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> Please let me know if that answers your questions or if you have > > >>> other > > >>>>>>> comments. > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> Jeyhun > > >>>>>>> > > >>>>>>> > > >>>>>>> On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com > > > > >>>>> > > >>>>> wrote: > > >>>>> > > >>>>>>> > > >>>>> > > >>>>>>>> Hi Jeyhun, > > >>>>>>>> > > >>>>>>>> Thank you for leading the discussion. I'm generally +1 with this > > >>>>> > > >>>>>>> proposal, > > >>>>> > > >>>>>>>> along with some questions. Please see my comments below. > > >>>>>>>> > > >>>>>>>> 1. Concerning the `sourcePartitions()` method, the partition > > >>>>> > > >>>>> information > > >>>>> > > >>>>>>>> returned during the optimization phase may not be the same as > > >> the > > >>>>> > > >>>>>>> partition > > >>>>> > > >>>>>>>> information during runtime execution. For long-running jobs, > > >>>>> > > >>>>> partitions > > >>>>> > > >>>>>>> may > > >>>>> > > >>>>>>>> be continuously created. Is this FLIP equipped to handle > > >>> scenarios? > > >>>>>>>> > > >>>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization > > >> rule, I > > >>>>>>>> understand that it is also necessary to verify whether the hash > > >>> key > > >>>>> > > >>>>>>> within > > >>>>> > > >>>>>>>> the Exchange node is consistent with the partition key defined > > >> in > > >>>>> the > > >>>>> > > >>>>>>> table > > >>>>> > > >>>>>>>> source that implements `SupportsPartitioning`. > > >>>>>>>> > > >>>>>>>> 3. Could you elaborate on the desired physical plan and > > >>> integration > > >>>>> > > >>>>> with > > >>>>> > > >>>>>>>> `CompiledPlan` to enhance the overall functionality? > > >>>>>>>> > > >>>>>>>> Best, > > >>>>>>>> Jane > > >>>>>>>> > > >>>>>>>> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes > > >>>>> > > >>>>> <jhug...@confluent.io.invalid > > >>>>> > > >>>>>>>> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>> > > >>>>>>>>> Hi Jeyhun, > > >>>>>>>>> > > >>>>>>>>> I like the idea! Given FLIP-376[1], I wonder if it'd make > > >> sense > > >>> to > > >>>>>>>>> generalize FLIP-434 to be about "pre-divided" data to cover > > >>>>> > > >>>>> "buckets" > > >>>>> > > >>>>>>> and > > >>>>> > > >>>>>>>>> "partitions" (and maybe even situations where a data source is > > >>>>> > > >>>>>>>> partitioned > > >>>>> > > >>>>>>>>> and bucketed). > > >>>>>>>>> > > >>>>>>>>> Separate from that, the page mentions TPC-H Q1 as an example. > > >>> For > > >>>>> > > >>>>> a > > >>>>> > > >>>>>>>> join, > > >>>>> > > >>>>>>>>> any two tables joined on the same bucket key should provide a > > >>>>> > > >>>>> concrete > > >>>>> > > >>>>>>>>> example of a join. Systems like Kafka Streams/ksqlDB call this > > >>>>>>>>> "co-partitioning"; for those systems, it is a requirement > > >> placed > > >>>>> > > >>>>> on the > > >>>>> > > >>>>>>>>> input sources. For Flink, with FLIP-434, the proposed planner > > >>> rule > > >>>>>>>>> could remove the shuffle. > > >>>>>>>>> > > >>>>>>>>> Definitely a fun idea; I look forward to hearing more! > > >>>>>>>>> > > >>>>>>>>> Cheers, > > >>>>>>>>> > > >>>>>>>>> Jim > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> 1. > > >>>>>>>>> > > >>>>>>>>> > > >>>>> > > >>>>>>>> > > >>>>> > > >>>>>>> > > >>>>> > > >>>>> > > >>>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > > >>>>> > > >>>>>>>>> 2. > > >>>>>>>>> > > >>>>>>>>> > > >>>>> > > >>>>>>>> > > >>>>> > > >>>>>>> > > >>>>> > > >>>>> > > >>>>> > > >>> > > >> > > > https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements > > >>>>> > > >>>>>>>>> > > >>>>>>>>> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov < > > >>>>> > > >>>>> je.kari...@gmail.com> > > >>>>> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>> > > >>>>>>>>>> Hi devs, > > >>>>>>>>>> > > >>>>>>>>>> I’d like to start a discussion on FLIP-434: Support > > >>>>> > > >>>>> optimizations for > > >>>>> > > >>>>>>>>>> pre-partitioned data sources [1]. > > >>>>>>>>>> > > >>>>>>>>>> The FLIP introduces taking advantage of pre-partitioned data > > >>>>> > > >>>>> sources > > >>>>> > > >>>>>>>> for > > >>>>> > > >>>>>>>>>> SQL/Table API (it is already supported as experimental > > >> feature > > >>>>> in > > >>>>>>>>>> DataStream API [2]). > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Please find more details in the FLIP wiki document [1]. > > >>>>>>>>>> Looking forward to your feedback. > > >>>>>>>>>> > > >>>>>>>>>> Regards, > > >>>>>>>>>> Jeyhun > > >>>>>>>>>> > > >>>>>>>>>> [1] > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>> > > >>>>>>>>> > > >>>>> > > >>>>>>>> > > >>>>> > > >>>>>>> > > >>>>> > > >>>>> > > >>>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources > > >>>>> > > >>>>>>>>>> [2] > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>> > > >>>>>>>>> > > >>>>> > > >>>>>>>> > > >>>>> > > >>>>>>> > > >>>>> > > >>>>> > > >>>>> > > >>> > > >> > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ > > >>>>> > > >>>>>>>>>> > > >>>>> > > >>>>>>>>> > > >>>>> > > >>>>>>>> > > >>>>> > > >>>>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> -- > > >>>>> > > >>>>> Best, > > >>>>> Benchao Li > > >>>>> > > >>>>> > > >>> > > >> > > > > >