Hi Lincoln, I think I was misunderstood. My approach was not to use MiniBatchLocalGroupAggFunction directly but use the similar approach to it. Currently, local and global aggregate functions are used together in query plans. In my quick PoC, I verified that my modified version of MiniBatchLocalGroupAggFunction (used without timers) can be achieved without global aggregation, just a different implementation of MapBundleFunction. So, we are not using global aggregate function (that depends on keyed state) at all.
The very general idea is similar to the pre-shuffle pre-aggregation [1]. In our case, we just utilize pre-aggregation part and no shuffle after that. I am also ok with scoping the FLIP for batch scenarios first, and create PR for streaming scenarios (since streaming implementation does not change public interfaces) afterwards. WDYT? [1] https://github.com/TU-Berlin-DIMA/AdCom On Tue, Apr 9, 2024 at 5:34 PM Lincoln Lee <lincoln.8...@gmail.com> wrote: > Thanks Jeyhun for your reply! > > Unfortunately, MiniBatchLocalGroupAggFunction only works for local agg > in two-phase aggregation, while global aggregation (which is actually > handled > by the KeyedMapBundleOperator) still relies on the KeyedStream, meaning > that consistency of the partitioner and state key selector is still > required. > > Best, > Lincoln Lee > > > Jeyhun Karimov <je.kari...@gmail.com> 于2024年4月6日周六 05:11写道: > > > Hi Lincoln, > > > > I did a bit of analysis on small PoC. > > Please find my comments below: > > > > - In general, current design supports streaming workloads. However, as > you > > mentioned it comes with some (implementation-related) difficulties. > > One of them (as you also mentioned) is that most of the operators utilize > > keyed functions (e.g., Aggregate functions). > > As a result, we cannot directly, utilize these operators (e.g., > > StreamPhysicalGroupbyAggregate) because they work on keyed inputs and > their > > tasks > > utilize specific keyGroupRange. > > > > - As I mentioned above, my idea is to utilize similar approach > > to MiniBatchLocalGroupAggFunction that is not time based and supports > also > > retractions. > > The existing implementation of this function already supports quite a big > > part of the scope. With this implementation, we utilize MapbundleFunction > > that is not bound to a specific key range. > > > > - As the next milestone, more generic optimization is required that > > introduces 1) new streaming distribution type as KEEP_INPUT_AS_IS, > > 2) utilization of a ForwardHashExchangeProcessor, 3) corresponding > chaining > > strategy > > > > Currently, the plan is to first support this FLIP for batch workloads > > (e.g., files, pre-divided data and buckets). Next, support for streaming > > workloads. > > > > I hope I have answered your question. > > > > Regards, > > Jeyhun > > > > On Wed, Apr 3, 2024 at 4:33 PM Lincoln Lee <lincoln.8...@gmail.com> > wrote: > > > > > Hi Jeyhun, > > > > > > Thanks for your quick response! > > > > > > In streaming scenario, shuffle commonly occurs before the stateful > > > operator, and there's a sanity check[1] when the stateful operator > > > accesses the state. This implies the consistency requirement of the > > > partitioner used for data shuffling and state key selector for state > > > accessing(see KeyGroupStreamPartitioner for more details), > > > otherwise, there may be state access errors. That is to say, in the > > > streaming scenario, it is not only the strict requirement described in > > > FlinkRelDistribution#requireStrict, but also the implied consistency of > > > hash calculation. > > > > > > Also, if this flip targets both streaming and batch scenarios, it is > > > recommended to do PoC validation for streaming as well. > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-29430 > > > > > > > > > Best, > > > Lincoln Lee > > > > > > > > > Leonard Xu <xbjt...@gmail.com> 于2024年4月3日周三 14:25写道: > > > > > > > Hey, Jeyhun > > > > > > > > Thanks for kicking off this discussion. I have two questions about > > > > streaming sources: > > > > > > > > (1)The FLIP motivation section says Kafka broker is already > > partitioned > > > > w.r.t. some key[s] , Is this the main use case in Kafka world? > > > Partitioning > > > > by key fields is not the default partitioner of Kafka default > > > > partitioner[1] IIUC. > > > > > > > > (2) Considering the FLIP’s optimization scope aims to both Batch and > > > > Streaming pre-partitioned source, could you add a Streaming Source > > > example > > > > to help me understand the FLIP better? I think Kafka Source is a > good > > > > candidates for streaming source example, file source is a good one > for > > > > batch source and it really helped me to follow-up the FLIP. > > > > > > > > Best, > > > > Leonard > > > > [1] > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31 > > > > > > > > > > > > > > > > > 2024年4月3日 上午5:53,Jeyhun Karimov <je.kari...@gmail.com> 写道: > > > > > > > > > > Hi Lincoln, > > > > > > > > > > Thanks a lot for your comments. Please find my answers below. > > > > > > > > > > > > > > > 1. Is this flip targeted only at batch scenarios or does it include > > > > >> streaming? > > > > >> (The flip and the discussion did not explicitly mention this, but > in > > > the > > > > >> draft pr, I only > > > > >> saw the implementation for batch scenarios > > > > >> > > > > >> > > > > > > > > > > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8 > > > > >> < > > > > >> > > > > > > > > > > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89 > > > > >>> > > > > >> ) > > > > >> If we expect this also apply to streaming, then we need to > consider > > > the > > > > >> stricter > > > > >> shuffle restrictions of streaming compared to batch (if support is > > > > >> considered, > > > > >> more discussion is needed here, let’s not expand for now). If it > > only > > > > >> applies to batch, > > > > >> it is recommended to clarify in the flip. > > > > > > > > > > > > > > > - The FLIP targets both streaming and batch scenarios. > > > > > Could you please elaborate more on what you mean by additional > > > > > restrictions? > > > > > > > > > > > > > > > 2. In the current implementation, the optimized plan seems to have > > some > > > > >> problems. > > > > >> As described in the class comments: > > > > >> > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60 > > > > > > > > > > BatchPhysicalHashAggregate (local) > > > > > > > > > > +- BatchPhysicalLocalHashAggregate (local) > > > > >> +- BatchPhysicalTableSourceScan > > > > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the > case > > > of > > > > >> one-phase > > > > >> hashAgg, localAgg is not necessary, which is the scenario > currently > > > > handled > > > > >> by > > > > >> `RemoveRedundantLocalHashAggRule` and other rules) > > > > > > > > > > > > > > > - Yes, you are completely right. Note that the PR you referenced is > > > just > > > > a > > > > > quick PoC. > > > > > Redundant operators you mentioned exist because > > > > > `RemoveRedundantShuffleRule` just removes the Exchange operator, > > > > > without modifying upstream/downstream operators. > > > > > As I mentioned, the implementation is just a PoC and the end > > > > implementation > > > > > will make sure that existing redundancy elimination rules remove > > > > redundant > > > > > operators. > > > > > > > > > > > > > > > Also, in the draft pr, > > > > >> the optimization of `testShouldEliminatePartitioning1` & > > > > >> `testShouldEliminatePartitioning2` > > > > >> seems didn't take effect? > > > > >> > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38 > > > > > > > > > > > > > > > - Note that in this example, Exchange operator have a > > > > > property KEEP_INPUT_AS_IS that indicates that data distribution is > > the > > > > same > > > > > as its input. > > > > > Since we have redundant operators (as shown above, two aggregate > > > > operators) > > > > > one of the rules (not in this FLIP) > > > > > adds this Exchange operator with KEEP_INPUT_AS_IS in between. > > > > > Similar to my comment above, the end implementation will be except > > from > > > > > redundant operators. > > > > > > > > > > In conjunction with question 2, I am wondering if we have a better > > > choice > > > > >> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s > > > > >> `RemoveRedundantLocalXXRule`s > > > > >> to the `PHYSICAL_REWRITE`). > > > > >> For example, let the source actively provide some traits > (including > > > > >> `FlinkRelDistribution` > > > > >> and `RelCollation`) to the planner. The advantage of doing this is > > to > > > > >> directly reuse the > > > > >> current shuffle remove optimization (as > `FlinkExpandConversionRule` > > > > >> implemented), > > > > >> and according to the data distribution characteristics provided by > > the > > > > >> source, the planner > > > > >> may choose a physical operator with a cheaper costs (for example, > > > > according > > > > >> to `RelCollation`, > > > > >> the planner can use sortAgg, no need for a separate local sort > > > > operation). > > > > >> WDYT? > > > > > > > > > > > > > > > - Good point. Makes sense to me. I will check > > FlinkExpandConversionRule > > > > to > > > > > be utilized in the implementation. > > > > > > > > > > > > > > > Regards, > > > > > Jeyhun > > > > > > > > > > > > > > > > > > > > On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee <lincoln.8...@gmail.com > > > > > > wrote: > > > > > > > > > >> Hi Jeyhun, > > > > >> > > > > >> Thank you for driving this, it would be very useful optimization! > > > > >> > > > > >> Sorry for joining the discussion now(I originally planned to reply > > > > earlier, > > > > >> but > > > > >> happened to be during my vacation). I have two questions: > > > > >> > > > > >> 1. Is this flip targeted only at batch scenarios or does it > include > > > > >> streaming? > > > > >> (The flip and the discussion did not explicitly mention this, but > in > > > the > > > > >> draft pr, I only > > > > >> saw the implementation for batch scenarios > > > > >> > > > > >> > > > > > > > > > > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8 > > > > >> < > > > > >> > > > > > > > > > > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89 > > > > >>> > > > > >> ) > > > > >> If we expect this also apply to streaming, then we need to > consider > > > the > > > > >> stricter > > > > >> shuffle restrictions of streaming compared to batch (if support is > > > > >> considered, > > > > >> more discussion is needed here, let’s not expand for now). If it > > only > > > > >> applies to batch, > > > > >> it is recommended to clarify in the flip. > > > > >> > > > > >> 2. In the current implementation, the optimized plan seems to have > > > some > > > > >> problems. > > > > >> As described in the class comments: > > > > >> > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60 > > > > >> > > > > >> BatchPhysicalHashAggregate (local) > > > > >> > > > > >> +- BatchPhysicalLocalHashAggregate (local) > > > > >> > > > > >> +- BatchPhysicalTableSourceScan > > > > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the > case > > > of > > > > >> one-phase > > > > >> hashAgg, localAgg is not necessary, which is the scenario > currently > > > > handled > > > > >> by > > > > >> `RemoveRedundantLocalHashAggRule` and other rules). Also, in the > > > draft > > > > pr, > > > > >> the > > > > >> optimization of `testShouldEliminatePartitioning1` & > > > > >> `testShouldEliminatePartitioning2` > > > > >> seems didn't take effect? > > > > >> > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38 > > > > >> > > > > >> In conjunction with question 2, I am wondering if we have a better > > > > choice > > > > >> (of course, > > > > >> not simply adding the current `PHYSICAL_OPT_RULES`'s > > > > >> `RemoveRedundantLocalXXRule`s > > > > >> to the `PHYSICAL_REWRITE`). > > > > >> For example, let the source actively provide some traits > (including > > > > >> `FlinkRelDistribution` > > > > >> and `RelCollation`) to the planner. The advantage of doing this is > > to > > > > >> directly reuse the > > > > >> current shuffle remove optimization (as > `FlinkExpandConversionRule` > > > > >> implemented), > > > > >> and according to the data distribution characteristics provided by > > the > > > > >> source, the planner > > > > >> may choose a physical operator with a cheaper costs (for example, > > > > according > > > > >> to `RelCollation`, > > > > >> the planner can use sortAgg, no need for a separate local sort > > > > operation). > > > > >> WDYT? > > > > >> > > > > >> > > > > >> Best, > > > > >> Lincoln Lee > > > > >> > > > > >> > > > > >> Jeyhun Karimov <je.kari...@gmail.com> 于2024年4月1日周一 18:00写道: > > > > >> > > > > >>> Hi everyone, > > > > >>> > > > > >>> Thanks for your valuable feedback! > > > > >>> > > > > >>> The discussion on this FLIP has been going on for a while. > > > > >>> I would like to start a vote after 48 hours. > > > > >>> > > > > >>> Please let me know if you have any concerns or any further > > > > >>> questions/comments. > > > > >>> > > > > >>> Regards, > > > > >>> Jeyhun > > > > >>> > > > > >>> > > > > >>> On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov < > > je.kari...@gmail.com > > > > > > > > >>> wrote: > > > > >>> > > > > >>>> Hi Lorenzo, > > > > >>>> > > > > >>>> Thanks a lot for your comments. Please find my answers below: > > > > >>>> > > > > >>>> > > > > >>>> For the interface `SupportsPartitioning`, why returning > > `Optional`? > > > > >>>>> If one decides to implement that, partitions must exist (at > > > maximum, > > > > >>>>> return and empty list). Returning `Optional` seem just to > > > complicate > > > > >> the > > > > >>>>> logic of the code using that interface. > > > > >>>> > > > > >>>> > > > > >>>> - The reasoning behind the use of Optional is that sometimes > > (e.g., > > > in > > > > >>>> HiveTableSource) the partitioning info is in catalog. > > > > >>>> Therefore, we return Optional.empty(), so that the list of > > > partitions > > > > >>> is > > > > >>>> queried from the catalog. > > > > >>>> > > > > >>>> > > > > >>>> I foresee the using code doing something like: "if the source > > > supports > > > > >>>>> partitioning, get the partitions, but if they don't exist, > raise > > a > > > > >>> runtime > > > > >>>>> exception". Let's simply make that safe at compile time and > > > guarantee > > > > >>> the > > > > >>>>> code that partitions exist. > > > > >>>> > > > > >>>> > > > > >>>> - Yes, once partitions cannot be found, neither from catalog nor > > > from > > > > >> the > > > > >>>> interface implementation, then we raise an exception during > query > > > > >> compile > > > > >>>> time. > > > > >>>> > > > > >>>> > > > > >>>> Another thing is that you show Hive-like partitioning in your FS > > > > >>>>> structure, do you think it makes sense to add a note about > > > > >>> auto-discovery > > > > >>>>> of partitions? > > > > >>>> > > > > >>>> > > > > >>>> - Yes, the FLIP contains just an example partitioning for > > filesystem > > > > >>>> connector. Each connector already "knows" about autodiscovery of > > its > > > > >>>> partitions. And we rely on this fact. > > > > >>>> For example, partition discovery is different between kafka and > > > > >>>> filesystem sources. So, we do not handle the manual discovery of > > > > >>>> partitions. Please correct me if I misunderstood your question. > > > > >>>> > > > > >>>> > > > > >>>> In other terms, it looks a bit counterintuitive that the user > > > > >>> implementing > > > > >>>>> the source has to specify which partitions exist statically > (and > > > they > > > > >>> can > > > > >>>>> change at runtime), while the source itself knows the data > > provider > > > > >> and > > > > >>> can > > > > >>>>> directly implement a method `discoverPartitions`. Then Flink > > would > > > > >> take > > > > >>>>> care of invoking that method when needed. > > > > >>>> > > > > >>>> > > > > >>>> We utilize table option SOURCE_MONITOR_INTERVAL to check whether > > > > >>>> partitions are static or not. So, a user still should give > Flink a > > > > hint > > > > >>>> about partitions being static or not. With static partitions > Flink > > > can > > > > >> do > > > > >>>> more optimizations. > > > > >>>> > > > > >>>> Please let me know if my replies answer your questions or if you > > > have > > > > >>> more > > > > >>>> comments. > > > > >>>> > > > > >>>> Regards, > > > > >>>> Jeyhun > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com > > > > > > >> wrote: > > > > >>>> > > > > >>>>> Hello Jeyhun, > > > > >>>>> I really like the proposal and definitely makes sense to me. > > > > >>>>> > > > > >>>>> I have a couple of nits here and there: > > > > >>>>> > > > > >>>>> For the interface `SupportsPartitioning`, why returning > > `Optional`? > > > > >>>>> If one decides to implement that, partitions must exist (at > > > maximum, > > > > >>>>> return and empty list). Returning `Optional` seem just to > > > complicate > > > > >> the > > > > >>>>> logic of the code using that interface. > > > > >>>>> > > > > >>>>> I foresee the using code doing something like: "if the source > > > > supports > > > > >>>>> partitioning, get the partitions, but if they don't exist, > raise > > a > > > > >>> runtime > > > > >>>>> exception". Let's simply make that safe at compile time and > > > guarantee > > > > >>> the > > > > >>>>> code that partitions exist. > > > > >>>>> > > > > >>>>> Another thing is that you show Hive-like partitioning in your > FS > > > > >>>>> structure, do you think it makes sense to add a note about > > > > >>> auto-discovery > > > > >>>>> of partitions? > > > > >>>>> > > > > >>>>> In other terms, it looks a bit counterintuitive that the user > > > > >>>>> implementing the source has to specify which partitions exist > > > > >> statically > > > > >>>>> (and they can change at runtime), while the source itself knows > > the > > > > >> data > > > > >>>>> provider and can directly implement a method > > `discoverPartitions`. > > > > >> Then > > > > >>>>> Flink would take care of invoking that method when needed. > > > > >>>>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov < > > > je.kari...@gmail.com > > > > >>> , > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>> Hi Benchao, > > > > >>>>> > > > > >>>>> Thanks for your comments. > > > > >>>>> > > > > >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? > > > What > > > > >>>>> > > > > >>>>> if we cannot have a good greatest common divisor, like 127 + > 128, > > > > >>>>> could we just utilize one side's pre-partitioned attribute, and > > let > > > > >>>>> another side just do the shuffle? > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> There are two cases we need to consider: > > > > >>>>> > > > > >>>>> 1. Static Partition (no partitions are added during the query > > > > >> execution) > > > > >>>>> is > > > > >>>>> enabled AND both sources implement "SupportsPartitionPushdown" > > > > >>>>> > > > > >>>>> In this case, we are sure that no new partitions will be added > at > > > > >>> runtime. > > > > >>>>> So, we have a chance equalize both sources' partitions and > > > > >> parallelism, > > > > >>>>> IFF > > > > >>>>> both sources implement "SupportsPartitionPushdown" interface. > > > > >>>>> To achieve so, first we will fetch the existing partitions from > > > > >> source1 > > > > >>>>> (say p_s1) and from source2 (say p_s2). > > > > >>>>> Then, we find the intersection of these two partition sets (say > > > > >>>>> p_intersect) and pushdown these partitions: > > > > >>>>> > > > > >>>>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make > > > sure > > > > >>> that > > > > >>>>> only specific partitions are read > > > > >>>>> SupportsPartitioning::applyPartitionedRead(p_intersect) // > > > > partitioned > > > > >>>>> read > > > > >>>>> with filtered partitions > > > > >>>>> > > > > >>>>> Lastly, we need to change the parallelism of 1) source1, 2) > > > source2, > > > > >> and > > > > >>>>> 3) > > > > >>>>> all of their downstream operators until (and including) their > > first > > > > >>> common > > > > >>>>> ancestor (e.g., join) to be equal to the number of partitions > > (size > > > > of > > > > >>>>> p_intersect). > > > > >>>>> > > > > >>>>> 2. All other cases > > > > >>>>> > > > > >>>>> In all other cases, the parallelism of both sources and their > > > > >> downstream > > > > >>>>> operators until their common ancestor would be equal to the > > > MIN(p_s1, > > > > >>>>> p_s2). > > > > >>>>> That is, minimum of the partition size of source1 and partition > > > size > > > > >> of > > > > >>>>> source2 will be selected as the parallelism. > > > > >>>>> Coming back to your example, if source1 parallelism is 127 and > > > > source2 > > > > >>>>> parallelism is 128, then we will first check the partition size > > of > > > > >>> source1 > > > > >>>>> and source2. > > > > >>>>> Say partition size of source1 is 100 and partition size of > > source2 > > > is > > > > >>> 90. > > > > >>>>> Then, we would set the parallelism for source1, source2, and > all > > of > > > > >>> their > > > > >>>>> downstream operators until (and including) the join operator > > > > >>>>> to 90 (min(100, 90)). > > > > >>>>> We also plan to implement a cost based decision instead of the > > > > >>> rule-based > > > > >>>>> one (the ones explained above - MIN rule). > > > > >>>>> One possible result of the cost based estimation is to keep the > > > > >>> partitions > > > > >>>>> on one side and perform the shuffling on another source. > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> 2. In our current shuffle remove design > > > (FlinkExpandConversionRule), > > > > >>>>> > > > > >>>>> we don't consider parallelism, we just remove unnecessary > > shuffles > > > > >>>>> according to the distribution columns. After this FLIP, the > > > > >>>>> parallelism may be bundled with source's partitions, then how > > will > > > > >>>>> this optimization accommodate with FlinkExpandConversionRule, > > will > > > > you > > > > >>>>> also change downstream operator's parallelisms if we want to > also > > > > >>>>> remove subsequent shuffles? > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> - From my understanding of FlinkExpandConversionRule, its > removal > > > > >> logic > > > > >>> is > > > > >>>>> agnostic to operator parallelism. > > > > >>>>> So, if FlinkExpandConversionRule decides to remove a shuffle > > > > >> operation, > > > > >>>>> then this FLIP will search another possible shuffle (the one > > > closest > > > > >> to > > > > >>>>> the > > > > >>>>> source) to remove. > > > > >>>>> If there is such an opportunity, this FLIP will remove the > > shuffle. > > > > >> So, > > > > >>>>> from my understanding FlinkExpandConversionRule and this > > > optimization > > > > >>> rule > > > > >>>>> can work together safely. > > > > >>>>> Please correct me if I misunderstood your question. > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> Regarding the new optimization rule, have you also considered > to > > > > allow > > > > >>>>> > > > > >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? > For > > > > >>>>> example, source is pre-partitioned by a, b columns, if we are > > > > >>>>> consuming this source, and do a aggregate on a, b, c, can we > > > utilize > > > > >>>>> this optimization? > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> - Good point. Yes, there are some cases that non-strict mode > will > > > > >> apply. > > > > >>>>> For example: > > > > >>>>> > > > > >>>>> - pre-partitioned columns and aggregate columns are the same > but > > > have > > > > >>>>> different order (e.g., source pre-partitioned w.r.t. a,b and > > > > aggregate > > > > >>> has > > > > >>>>> a GROUP BY b,a) > > > > >>>>> - columns in the Exchange operator is a list-prefix of > > > pre-partitoned > > > > >>>>> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c > > and > > > > >>>>> Exchange's partition columns are a,b) > > > > >>>>> > > > > >>>>> Please let me know if the above answers your questions or if > you > > > have > > > > >>> any > > > > >>>>> other comments. > > > > >>>>> > > > > >>>>> Regards, > > > > >>>>> Jeyhun > > > > >>>>> > > > > >>>>> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li < > > libenc...@apache.org> > > > > >>> wrote: > > > > >>>>> > > > > >>>>> Thanks Jeyhun for bringing up this discussion, it is really > > > exiting, > > > > >>>>> +1 for the general idea. > > > > >>>>> > > > > >>>>> We also introduced a similar concept in Flink Batch internally > to > > > > cope > > > > >>>>> with bucketed tables in Hive, it is a very important > improvement. > > > > >>>>> > > > > >>>>>> One thing to note is that for join queries, the parallelism of > > > each > > > > >>> join > > > > >>>>>> source might be different. This might result in > > > > >>>>>> inconsistencies while using the pre-partitioned/pre-divided > data > > > > >>> (e.g., > > > > >>>>>> different mappings of partitions to source operators). > > > > >>>>>> Therefore, it is the job of planner to detect this and adjust > > the > > > > >>>>>> parallelism. With that having in mind, > > > > >>>>>> the rest (how the split assigners perform) is consistent among > > > many > > > > >>>>>> sources. > > > > >>>>> > > > > >>>>> > > > > >>>>> Could you elaborate a little more on this. I added my two cents > > > here > > > > >>>>> about this part: > > > > >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? > > > What > > > > >>>>> if we cannot have a good greatest common divisor, like 127 + > 128, > > > > >>>>> could we just utilize one side's pre-partitioned attribute, and > > let > > > > >>>>> another side just do the shuffle? > > > > >>>>> 2. In our current shuffle remove design > > > (FlinkExpandConversionRule), > > > > >>>>> we don't consider parallelism, we just remove unnecessary > > shuffles > > > > >>>>> according to the distribution columns. After this FLIP, the > > > > >>>>> parallelism may be bundled with source's partitions, then how > > will > > > > >>>>> this optimization accommodate with FlinkExpandConversionRule, > > will > > > > you > > > > >>>>> also change downstream operator's parallelisms if we want to > also > > > > >>>>> remove subsequent shuffles? > > > > >>>>> > > > > >>>>> > > > > >>>>> Regarding the new optimization rule, have you also considered > to > > > > allow > > > > >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? > For > > > > >>>>> example, source is pre-partitioned by a, b columns, if we are > > > > >>>>> consuming this source, and do a aggregate on a, b, c, can we > > > utilize > > > > >>>>> this optimization? > > > > >>>>> > > > > >>>>> Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道: > > > > >>>>> > > > > >>>>>> > > > > >>>>>> Hi Jeyhun, > > > > >>>>>> > > > > >>>>>> Thanks for your clarification. > > > > >>>>>> > > > > >>>>> > > > > >>>>>>> Once a new partition is detected, we add it to our existing > > > > >> mapping. > > > > >>>>> > > > > >>>>> Our > > > > >>>>> > > > > >>>>>> mapping looks like Map<Integer, Set<Integer>> > > > > >>>>> > > > > >>>>> subtaskToPartitionAssignment, > > > > >>>>> > > > > >>>>>> where it maps each source subtaskID to zero or more > partitions. > > > > >>>>>> > > > > >>>>>> I understand your point. **It would be better if you could > sync > > > the > > > > >>>>>> content to the FLIP**. > > > > >>>>>> > > > > >>>>>> Another thing is I'm curious about what the physical plan > looks > > > > >> like. > > > > >>> Is > > > > >>>>>> there any specific info that will be added to the table source > > > (like > > > > >>>>>> filter/project pushdown)? It would be great if you could > attach > > an > > > > >>>>> > > > > >>>>> example > > > > >>>>> > > > > >>>>>> to the FLIP. > > > > >>>>>> > > > > >>>>>> Bests, > > > > >>>>>> Jane > > > > >>>>>> > > > > >>>>>> On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov < > > > > >> je.kari...@gmail.com> > > > > >>>>> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>>>>> Hi Jane, > > > > >>>>>>> > > > > >>>>>>> Thanks for your comments. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> 1. Concerning the `sourcePartitions()` method, the partition > > > > >>>>> > > > > >>>>> information > > > > >>>>> > > > > >>>>>>>> returned during the optimization phase may not be the same > as > > > > >> the > > > > >>>>> > > > > >>>>>>> partition > > > > >>>>> > > > > >>>>>>>> information during runtime execution. For long-running jobs, > > > > >>>>> > > > > >>>>> partitions > > > > >>>>> > > > > >>>>>>> may > > > > >>>>> > > > > >>>>>>>> be continuously created. Is this FLIP equipped to handle > > > > >>> scenarios? > > > > >>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> - Good point. This scenario is definitely supported. > > > > >>>>>>> Once a new partition is added, or in general, new splits are > > > > >>>>>>> discovered, > > > > >>>>>>> > > > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> > > > > >>>>>>> newSplits) > > > > >>>>>>> method will be called. Inside that method, we are able to > > detect > > > > >> if > > > > >>> a > > > > >>>>> > > > > >>>>> split > > > > >>>>> > > > > >>>>>>> belongs to existing partitions or there is a new partition. > > > > >>>>>>> Once a new partition is detected, we add it to our existing > > > > >> mapping. > > > > >>>>> > > > > >>>>> Our > > > > >>>>> > > > > >>>>>>> mapping looks like Map<Integer, Set<Integer>> > > > > >>>>> > > > > >>>>> subtaskToPartitionAssignment, > > > > >>>>> > > > > >>>>>>> where > > > > >>>>>>> it maps each source subtaskID to zero or more partitions. > > > > >>>>>>> > > > > >>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization > > rule, > > > I > > > > >>>>> > > > > >>>>>>>> understand that it is also necessary to verify whether the > > hash > > > > >>> key > > > > >>>>> > > > > >>>>>>> within > > > > >>>>> > > > > >>>>>>>> the Exchange node is consistent with the partition key > defined > > > > >> in > > > > >>>>> the > > > > >>>>> > > > > >>>>>>> table > > > > >>>>> > > > > >>>>>>>> source that implements `SupportsPartitioning`. > > > > >>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> - Yes, I overlooked that point, fixed. Actually, the rule is > > much > > > > >>>>>>> complicated. I tried to simplify it in the FLIP. Good point. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> 3. Could you elaborate on the desired physical plan and > > > > >> integration > > > > >>>>> > > > > >>>>> with > > > > >>>>> > > > > >>>>>>>> `CompiledPlan` to enhance the overall functionality? > > > > >>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> - For compiled plan, PartitioningSpec will be used, with a > json > > > > >> tag > > > > >>>>>>> "Partitioning". As a result, in the compiled plan, the source > > > > >>> operator > > > > >>>>> > > > > >>>>> will > > > > >>>>> > > > > >>>>>>> have > > > > >>>>>>> "abilities" : [ { "type" : "Partitioning" } ] as part of the > > > > >>> compiled > > > > >>>>> > > > > >>>>> plan. > > > > >>>>> > > > > >>>>>>> More about the implementation details below: > > > > >>>>>>> > > > > >>>>>>> -------------------------------- > > > > >>>>>>> PartitioningSpec class > > > > >>>>>>> -------------------------------- > > > > >>>>>>> @JsonTypeName("Partitioning") > > > > >>>>>>> public final class PartitioningSpec extends > > SourceAbilitySpecBase > > > > >> { > > > > >>>>>>> // some code here > > > > >>>>>>> @Override > > > > >>>>>>> public void apply(DynamicTableSource tableSource, > > > > >>>>> > > > > >>>>> SourceAbilityContext > > > > >>>>> > > > > >>>>>>> context) { > > > > >>>>>>> if (tableSource instanceof SupportsPartitioning) { > > > > >>>>>>> ((SupportsPartitioning<?>) > > > > >>>>> > > > > >>>>> tableSource).applyPartitionedRead(); > > > > >>>>> > > > > >>>>>>> } else { > > > > >>>>>>> throw new TableException( > > > > >>>>>>> String.format( > > > > >>>>>>> "%s does not support > > > > >>>>> > > > > >>>>> SupportsPartitioning.", > > > > >>>>> > > > > >>>>>>> tableSource.getClass().getName())); > > > > >>>>>>> } > > > > >>>>>>> } > > > > >>>>>>> // some code here > > > > >>>>>>> } > > > > >>>>>>> > > > > >>>>>>> -------------------------------- > > > > >>>>>>> SourceAbilitySpec class > > > > >>>>>>> -------------------------------- > > > > >>>>>>> @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = > > > > >>>>>>> JsonTypeInfo.As.PROPERTY, property = "type") > > > > >>>>>>> @JsonSubTypes({ > > > > >>>>>>> @JsonSubTypes.Type(value = FilterPushDownSpec.class), > > > > >>>>>>> @JsonSubTypes.Type(value = LimitPushDownSpec.class), > > > > >>>>>>> @JsonSubTypes.Type(value = PartitionPushDownSpec.class), > > > > >>>>>>> @JsonSubTypes.Type(value = ProjectPushDownSpec.class), > > > > >>>>>>> @JsonSubTypes.Type(value = ReadingMetadataSpec.class), > > > > >>>>>>> @JsonSubTypes.Type(value = WatermarkPushDownSpec.class), > > > > >>>>>>> @JsonSubTypes.Type(value = SourceWatermarkSpec.class), > > > > >>>>>>> @JsonSubTypes.Type(value = AggregatePushDownSpec.class), > > > > >>>>>>> + @JsonSubTypes.Type(value = PartitioningSpec.class) > > > > >>>>> > > > > >>>>> // > > > > >>>>> > > > > >>>>>>> new added > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Please let me know if that answers your questions or if you > > have > > > > >>> other > > > > >>>>>>> comments. > > > > >>>>>>> > > > > >>>>>>> Regards, > > > > >>>>>>> Jeyhun > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On Tue, Mar 12, 2024 at 8:56 AM Jane Chan < > > qingyue....@gmail.com > > > > > > > > >>>>> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>>>> > > > > >>>>> > > > > >>>>>>>> Hi Jeyhun, > > > > >>>>>>>> > > > > >>>>>>>> Thank you for leading the discussion. I'm generally +1 with > > this > > > > >>>>> > > > > >>>>>>> proposal, > > > > >>>>> > > > > >>>>>>>> along with some questions. Please see my comments below. > > > > >>>>>>>> > > > > >>>>>>>> 1. Concerning the `sourcePartitions()` method, the partition > > > > >>>>> > > > > >>>>> information > > > > >>>>> > > > > >>>>>>>> returned during the optimization phase may not be the same > as > > > > >> the > > > > >>>>> > > > > >>>>>>> partition > > > > >>>>> > > > > >>>>>>>> information during runtime execution. For long-running jobs, > > > > >>>>> > > > > >>>>> partitions > > > > >>>>> > > > > >>>>>>> may > > > > >>>>> > > > > >>>>>>>> be continuously created. Is this FLIP equipped to handle > > > > >>> scenarios? > > > > >>>>>>>> > > > > >>>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization > > > > >> rule, I > > > > >>>>>>>> understand that it is also necessary to verify whether the > > hash > > > > >>> key > > > > >>>>> > > > > >>>>>>> within > > > > >>>>> > > > > >>>>>>>> the Exchange node is consistent with the partition key > defined > > > > >> in > > > > >>>>> the > > > > >>>>> > > > > >>>>>>> table > > > > >>>>> > > > > >>>>>>>> source that implements `SupportsPartitioning`. > > > > >>>>>>>> > > > > >>>>>>>> 3. Could you elaborate on the desired physical plan and > > > > >>> integration > > > > >>>>> > > > > >>>>> with > > > > >>>>> > > > > >>>>>>>> `CompiledPlan` to enhance the overall functionality? > > > > >>>>>>>> > > > > >>>>>>>> Best, > > > > >>>>>>>> Jane > > > > >>>>>>>> > > > > >>>>>>>> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes > > > > >>>>> > > > > >>>>> <jhug...@confluent.io.invalid > > > > >>>>> > > > > >>>>>>>> > > > > >>>>>>>> wrote: > > > > >>>>>>>> > > > > >>>>> > > > > >>>>>>>>> Hi Jeyhun, > > > > >>>>>>>>> > > > > >>>>>>>>> I like the idea! Given FLIP-376[1], I wonder if it'd make > > > > >> sense > > > > >>> to > > > > >>>>>>>>> generalize FLIP-434 to be about "pre-divided" data to cover > > > > >>>>> > > > > >>>>> "buckets" > > > > >>>>> > > > > >>>>>>> and > > > > >>>>> > > > > >>>>>>>>> "partitions" (and maybe even situations where a data source > > is > > > > >>>>> > > > > >>>>>>>> partitioned > > > > >>>>> > > > > >>>>>>>>> and bucketed). > > > > >>>>>>>>> > > > > >>>>>>>>> Separate from that, the page mentions TPC-H Q1 as an > example. > > > > >>> For > > > > >>>>> > > > > >>>>> a > > > > >>>>> > > > > >>>>>>>> join, > > > > >>>>> > > > > >>>>>>>>> any two tables joined on the same bucket key should > provide a > > > > >>>>> > > > > >>>>> concrete > > > > >>>>> > > > > >>>>>>>>> example of a join. Systems like Kafka Streams/ksqlDB call > > this > > > > >>>>>>>>> "co-partitioning"; for those systems, it is a requirement > > > > >> placed > > > > >>>>> > > > > >>>>> on the > > > > >>>>> > > > > >>>>>>>>> input sources. For Flink, with FLIP-434, the proposed > planner > > > > >>> rule > > > > >>>>>>>>> could remove the shuffle. > > > > >>>>>>>>> > > > > >>>>>>>>> Definitely a fun idea; I look forward to hearing more! > > > > >>>>>>>>> > > > > >>>>>>>>> Cheers, > > > > >>>>>>>>> > > > > >>>>>>>>> Jim > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> 1. > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>>>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > > > > >>>>> > > > > >>>>>>>>> 2. > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>>>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>> > > > > >> > > > > > > > > > > https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements > > > > >>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov < > > > > >>>>> > > > > >>>>> je.kari...@gmail.com> > > > > >>>>> > > > > >>>>>>>>> wrote: > > > > >>>>>>>>> > > > > >>>>> > > > > >>>>>>>>>> Hi devs, > > > > >>>>>>>>>> > > > > >>>>>>>>>> I’d like to start a discussion on FLIP-434: Support > > > > >>>>> > > > > >>>>> optimizations for > > > > >>>>> > > > > >>>>>>>>>> pre-partitioned data sources [1]. > > > > >>>>>>>>>> > > > > >>>>>>>>>> The FLIP introduces taking advantage of pre-partitioned > data > > > > >>>>> > > > > >>>>> sources > > > > >>>>> > > > > >>>>>>>> for > > > > >>>>> > > > > >>>>>>>>>> SQL/Table API (it is already supported as experimental > > > > >> feature > > > > >>>>> in > > > > >>>>>>>>>> DataStream API [2]). > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> Please find more details in the FLIP wiki document [1]. > > > > >>>>>>>>>> Looking forward to your feedback. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Regards, > > > > >>>>>>>>>> Jeyhun > > > > >>>>>>>>>> > > > > >>>>>>>>>> [1] > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>> > > > > >>>>>>>>> > > > > >>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>>>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>> > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources > > > > >>>>> > > > > >>>>>>>>>> [2] > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>> > > > > >>>>>>>>> > > > > >>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>>>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>> > > > > >> > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ > > > > >>>>> > > > > >>>>>>>>>> > > > > >>>>> > > > > >>>>>>>>> > > > > >>>>> > > > > >>>>>>>> > > > > >>>>> > > > > >>>>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> > > > > >>>>> -- > > > > >>>>> > > > > >>>>> Best, > > > > >>>>> Benchao Li > > > > >>>>> > > > > >>>>> > > > > >>> > > > > >> > > > > > > > > > > > > > >