Hi Leonard, Thanks a lot for your comments. Please find my answers below:
(1)The FLIP motivation section says Kafka broker is already partitioned > w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning > by key fields is not the default partitioner of Kafka default > partitioner[1] IIUC. - I see your point. The main usecase for Kafka is for ksqlDB users. Consider a ksqlDB query [1] CREATE STREAM products_rekeyed WITH (PARTITIONS=6) AS SELECT * FROM products PARTITION BY product_id; In this query, the output is re-partitioned to be keyed by product_id. In fact, as the documentation [1] (also Jim mentioned above) mentions, this repartitioning is a requirement for join queries. (2) Considering the FLIP’s optimization scope aims to both Batch and > Streaming pre-partitioned source, could you add a Streaming Source example > to help me understand the FLIP better? I think Kafka Source is a good > candidates for streaming source example, file source is a good one for > batch source and it really helped me to follow-up the FLIP. - Currently, I do not have Kafka Source sample integration with this FLIP. My idea was to integrate first to the Flink main repo (e.g., file source in streaming & batch mode) and then to external connectors. But I can try with Kafka Source and get back. Regards, Jeyhun [1] https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/ On Wed, Apr 3, 2024 at 8:25 AM Leonard Xu <xbjt...@gmail.com> wrote: > Hey, Jeyhun > > Thanks for kicking off this discussion. I have two questions about > streaming sources: > > (1)The FLIP motivation section says Kafka broker is already partitioned > w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning > by key fields is not the default partitioner of Kafka default > partitioner[1] IIUC. > > (2) Considering the FLIP’s optimization scope aims to both Batch and > Streaming pre-partitioned source, could you add a Streaming Source example > to help me understand the FLIP better? I think Kafka Source is a good > candidates for streaming source example, file source is a good one for > batch source and it really helped me to follow-up the FLIP. > > Best, > Leonard > [1] > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31 > > > > > 2024年4月3日 上午5:53,Jeyhun Karimov <je.kari...@gmail.com> 写道: > > > > Hi Lincoln, > > > > Thanks a lot for your comments. Please find my answers below. > > > > > > 1. Is this flip targeted only at batch scenarios or does it include > >> streaming? > >> (The flip and the discussion did not explicitly mention this, but in the > >> draft pr, I only > >> saw the implementation for batch scenarios > >> > >> > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8 > >> < > >> > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89 > >>> > >> ) > >> If we expect this also apply to streaming, then we need to consider the > >> stricter > >> shuffle restrictions of streaming compared to batch (if support is > >> considered, > >> more discussion is needed here, let’s not expand for now). If it only > >> applies to batch, > >> it is recommended to clarify in the flip. > > > > > > - The FLIP targets both streaming and batch scenarios. > > Could you please elaborate more on what you mean by additional > > restrictions? > > > > > > 2. In the current implementation, the optimized plan seems to have some > >> problems. > >> As described in the class comments: > >> > >> > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60 > > > > BatchPhysicalHashAggregate (local) > > > > +- BatchPhysicalLocalHashAggregate (local) > >> +- BatchPhysicalTableSourceScan > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of > >> one-phase > >> hashAgg, localAgg is not necessary, which is the scenario currently > handled > >> by > >> `RemoveRedundantLocalHashAggRule` and other rules) > > > > > > - Yes, you are completely right. Note that the PR you referenced is just > a > > quick PoC. > > Redundant operators you mentioned exist because > > `RemoveRedundantShuffleRule` just removes the Exchange operator, > > without modifying upstream/downstream operators. > > As I mentioned, the implementation is just a PoC and the end > implementation > > will make sure that existing redundancy elimination rules remove > redundant > > operators. > > > > > > Also, in the draft pr, > >> the optimization of `testShouldEliminatePartitioning1` & > >> `testShouldEliminatePartitioning2` > >> seems didn't take effect? > >> > >> > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38 > > > > > > - Note that in this example, Exchange operator have a > > property KEEP_INPUT_AS_IS that indicates that data distribution is the > same > > as its input. > > Since we have redundant operators (as shown above, two aggregate > operators) > > one of the rules (not in this FLIP) > > adds this Exchange operator with KEEP_INPUT_AS_IS in between. > > Similar to my comment above, the end implementation will be except from > > redundant operators. > > > > In conjunction with question 2, I am wondering if we have a better choice > >> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s > >> `RemoveRedundantLocalXXRule`s > >> to the `PHYSICAL_REWRITE`). > >> For example, let the source actively provide some traits (including > >> `FlinkRelDistribution` > >> and `RelCollation`) to the planner. The advantage of doing this is to > >> directly reuse the > >> current shuffle remove optimization (as `FlinkExpandConversionRule` > >> implemented), > >> and according to the data distribution characteristics provided by the > >> source, the planner > >> may choose a physical operator with a cheaper costs (for example, > according > >> to `RelCollation`, > >> the planner can use sortAgg, no need for a separate local sort > operation). > >> WDYT? > > > > > > - Good point. Makes sense to me. I will check FlinkExpandConversionRule > to > > be utilized in the implementation. > > > > > > Regards, > > Jeyhun > > > > > > > > On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee <lincoln.8...@gmail.com> > wrote: > > > >> Hi Jeyhun, > >> > >> Thank you for driving this, it would be very useful optimization! > >> > >> Sorry for joining the discussion now(I originally planned to reply > earlier, > >> but > >> happened to be during my vacation). I have two questions: > >> > >> 1. Is this flip targeted only at batch scenarios or does it include > >> streaming? > >> (The flip and the discussion did not explicitly mention this, but in the > >> draft pr, I only > >> saw the implementation for batch scenarios > >> > >> > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8 > >> < > >> > https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89 > >>> > >> ) > >> If we expect this also apply to streaming, then we need to consider the > >> stricter > >> shuffle restrictions of streaming compared to batch (if support is > >> considered, > >> more discussion is needed here, let’s not expand for now). If it only > >> applies to batch, > >> it is recommended to clarify in the flip. > >> > >> 2. In the current implementation, the optimized plan seems to have some > >> problems. > >> As described in the class comments: > >> > >> > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60 > >> > >> BatchPhysicalHashAggregate (local) > >> > >> +- BatchPhysicalLocalHashAggregate (local) > >> > >> +- BatchPhysicalTableSourceScan > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of > >> one-phase > >> hashAgg, localAgg is not necessary, which is the scenario currently > handled > >> by > >> `RemoveRedundantLocalHashAggRule` and other rules). Also, in the draft > pr, > >> the > >> optimization of `testShouldEliminatePartitioning1` & > >> `testShouldEliminatePartitioning2` > >> seems didn't take effect? > >> > >> > https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38 > >> > >> In conjunction with question 2, I am wondering if we have a better > choice > >> (of course, > >> not simply adding the current `PHYSICAL_OPT_RULES`'s > >> `RemoveRedundantLocalXXRule`s > >> to the `PHYSICAL_REWRITE`). > >> For example, let the source actively provide some traits (including > >> `FlinkRelDistribution` > >> and `RelCollation`) to the planner. The advantage of doing this is to > >> directly reuse the > >> current shuffle remove optimization (as `FlinkExpandConversionRule` > >> implemented), > >> and according to the data distribution characteristics provided by the > >> source, the planner > >> may choose a physical operator with a cheaper costs (for example, > according > >> to `RelCollation`, > >> the planner can use sortAgg, no need for a separate local sort > operation). > >> WDYT? > >> > >> > >> Best, > >> Lincoln Lee > >> > >> > >> Jeyhun Karimov <je.kari...@gmail.com> 于2024年4月1日周一 18:00写道: > >> > >>> Hi everyone, > >>> > >>> Thanks for your valuable feedback! > >>> > >>> The discussion on this FLIP has been going on for a while. > >>> I would like to start a vote after 48 hours. > >>> > >>> Please let me know if you have any concerns or any further > >>> questions/comments. > >>> > >>> Regards, > >>> Jeyhun > >>> > >>> > >>> On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov <je.kari...@gmail.com> > >>> wrote: > >>> > >>>> Hi Lorenzo, > >>>> > >>>> Thanks a lot for your comments. Please find my answers below: > >>>> > >>>> > >>>> For the interface `SupportsPartitioning`, why returning `Optional`? > >>>>> If one decides to implement that, partitions must exist (at maximum, > >>>>> return and empty list). Returning `Optional` seem just to complicate > >> the > >>>>> logic of the code using that interface. > >>>> > >>>> > >>>> - The reasoning behind the use of Optional is that sometimes (e.g., in > >>>> HiveTableSource) the partitioning info is in catalog. > >>>> Therefore, we return Optional.empty(), so that the list of partitions > >>> is > >>>> queried from the catalog. > >>>> > >>>> > >>>> I foresee the using code doing something like: "if the source supports > >>>>> partitioning, get the partitions, but if they don't exist, raise a > >>> runtime > >>>>> exception". Let's simply make that safe at compile time and guarantee > >>> the > >>>>> code that partitions exist. > >>>> > >>>> > >>>> - Yes, once partitions cannot be found, neither from catalog nor from > >> the > >>>> interface implementation, then we raise an exception during query > >> compile > >>>> time. > >>>> > >>>> > >>>> Another thing is that you show Hive-like partitioning in your FS > >>>>> structure, do you think it makes sense to add a note about > >>> auto-discovery > >>>>> of partitions? > >>>> > >>>> > >>>> - Yes, the FLIP contains just an example partitioning for filesystem > >>>> connector. Each connector already "knows" about autodiscovery of its > >>>> partitions. And we rely on this fact. > >>>> For example, partition discovery is different between kafka and > >>>> filesystem sources. So, we do not handle the manual discovery of > >>>> partitions. Please correct me if I misunderstood your question. > >>>> > >>>> > >>>> In other terms, it looks a bit counterintuitive that the user > >>> implementing > >>>>> the source has to specify which partitions exist statically (and they > >>> can > >>>>> change at runtime), while the source itself knows the data provider > >> and > >>> can > >>>>> directly implement a method `discoverPartitions`. Then Flink would > >> take > >>>>> care of invoking that method when needed. > >>>> > >>>> > >>>> We utilize table option SOURCE_MONITOR_INTERVAL to check whether > >>>> partitions are static or not. So, a user still should give Flink a > hint > >>>> about partitions being static or not. With static partitions Flink can > >> do > >>>> more optimizations. > >>>> > >>>> Please let me know if my replies answer your questions or if you have > >>> more > >>>> comments. > >>>> > >>>> Regards, > >>>> Jeyhun > >>>> > >>>> > >>>> > >>>> On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com> > >> wrote: > >>>> > >>>>> Hello Jeyhun, > >>>>> I really like the proposal and definitely makes sense to me. > >>>>> > >>>>> I have a couple of nits here and there: > >>>>> > >>>>> For the interface `SupportsPartitioning`, why returning `Optional`? > >>>>> If one decides to implement that, partitions must exist (at maximum, > >>>>> return and empty list). Returning `Optional` seem just to complicate > >> the > >>>>> logic of the code using that interface. > >>>>> > >>>>> I foresee the using code doing something like: "if the source > supports > >>>>> partitioning, get the partitions, but if they don't exist, raise a > >>> runtime > >>>>> exception". Let's simply make that safe at compile time and guarantee > >>> the > >>>>> code that partitions exist. > >>>>> > >>>>> Another thing is that you show Hive-like partitioning in your FS > >>>>> structure, do you think it makes sense to add a note about > >>> auto-discovery > >>>>> of partitions? > >>>>> > >>>>> In other terms, it looks a bit counterintuitive that the user > >>>>> implementing the source has to specify which partitions exist > >> statically > >>>>> (and they can change at runtime), while the source itself knows the > >> data > >>>>> provider and can directly implement a method `discoverPartitions`. > >> Then > >>>>> Flink would take care of invoking that method when needed. > >>>>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov <je.kari...@gmail.com > >>> , > >>>>> wrote: > >>>>> > >>>>> Hi Benchao, > >>>>> > >>>>> Thanks for your comments. > >>>>> > >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What > >>>>> > >>>>> if we cannot have a good greatest common divisor, like 127 + 128, > >>>>> could we just utilize one side's pre-partitioned attribute, and let > >>>>> another side just do the shuffle? > >>>>> > >>>>> > >>>>> > >>>>> There are two cases we need to consider: > >>>>> > >>>>> 1. Static Partition (no partitions are added during the query > >> execution) > >>>>> is > >>>>> enabled AND both sources implement "SupportsPartitionPushdown" > >>>>> > >>>>> In this case, we are sure that no new partitions will be added at > >>> runtime. > >>>>> So, we have a chance equalize both sources' partitions and > >> parallelism, > >>>>> IFF > >>>>> both sources implement "SupportsPartitionPushdown" interface. > >>>>> To achieve so, first we will fetch the existing partitions from > >> source1 > >>>>> (say p_s1) and from source2 (say p_s2). > >>>>> Then, we find the intersection of these two partition sets (say > >>>>> p_intersect) and pushdown these partitions: > >>>>> > >>>>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure > >>> that > >>>>> only specific partitions are read > >>>>> SupportsPartitioning::applyPartitionedRead(p_intersect) // > partitioned > >>>>> read > >>>>> with filtered partitions > >>>>> > >>>>> Lastly, we need to change the parallelism of 1) source1, 2) source2, > >> and > >>>>> 3) > >>>>> all of their downstream operators until (and including) their first > >>> common > >>>>> ancestor (e.g., join) to be equal to the number of partitions (size > of > >>>>> p_intersect). > >>>>> > >>>>> 2. All other cases > >>>>> > >>>>> In all other cases, the parallelism of both sources and their > >> downstream > >>>>> operators until their common ancestor would be equal to the MIN(p_s1, > >>>>> p_s2). > >>>>> That is, minimum of the partition size of source1 and partition size > >> of > >>>>> source2 will be selected as the parallelism. > >>>>> Coming back to your example, if source1 parallelism is 127 and > source2 > >>>>> parallelism is 128, then we will first check the partition size of > >>> source1 > >>>>> and source2. > >>>>> Say partition size of source1 is 100 and partition size of source2 is > >>> 90. > >>>>> Then, we would set the parallelism for source1, source2, and all of > >>> their > >>>>> downstream operators until (and including) the join operator > >>>>> to 90 (min(100, 90)). > >>>>> We also plan to implement a cost based decision instead of the > >>> rule-based > >>>>> one (the ones explained above - MIN rule). > >>>>> One possible result of the cost based estimation is to keep the > >>> partitions > >>>>> on one side and perform the shuffling on another source. > >>>>> > >>>>> > >>>>> > >>>>> 2. In our current shuffle remove design (FlinkExpandConversionRule), > >>>>> > >>>>> we don't consider parallelism, we just remove unnecessary shuffles > >>>>> according to the distribution columns. After this FLIP, the > >>>>> parallelism may be bundled with source's partitions, then how will > >>>>> this optimization accommodate with FlinkExpandConversionRule, will > you > >>>>> also change downstream operator's parallelisms if we want to also > >>>>> remove subsequent shuffles? > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> - From my understanding of FlinkExpandConversionRule, its removal > >> logic > >>> is > >>>>> agnostic to operator parallelism. > >>>>> So, if FlinkExpandConversionRule decides to remove a shuffle > >> operation, > >>>>> then this FLIP will search another possible shuffle (the one closest > >> to > >>>>> the > >>>>> source) to remove. > >>>>> If there is such an opportunity, this FLIP will remove the shuffle. > >> So, > >>>>> from my understanding FlinkExpandConversionRule and this optimization > >>> rule > >>>>> can work together safely. > >>>>> Please correct me if I misunderstood your question. > >>>>> > >>>>> > >>>>> > >>>>> Regarding the new optimization rule, have you also considered to > allow > >>>>> > >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For > >>>>> example, source is pre-partitioned by a, b columns, if we are > >>>>> consuming this source, and do a aggregate on a, b, c, can we utilize > >>>>> this optimization? > >>>>> > >>>>> > >>>>> > >>>>> - Good point. Yes, there are some cases that non-strict mode will > >> apply. > >>>>> For example: > >>>>> > >>>>> - pre-partitioned columns and aggregate columns are the same but have > >>>>> different order (e.g., source pre-partitioned w.r.t. a,b and > aggregate > >>> has > >>>>> a GROUP BY b,a) > >>>>> - columns in the Exchange operator is a list-prefix of pre-partitoned > >>>>> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and > >>>>> Exchange's partition columns are a,b) > >>>>> > >>>>> Please let me know if the above answers your questions or if you have > >>> any > >>>>> other comments. > >>>>> > >>>>> Regards, > >>>>> Jeyhun > >>>>> > >>>>> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org> > >>> wrote: > >>>>> > >>>>> Thanks Jeyhun for bringing up this discussion, it is really exiting, > >>>>> +1 for the general idea. > >>>>> > >>>>> We also introduced a similar concept in Flink Batch internally to > cope > >>>>> with bucketed tables in Hive, it is a very important improvement. > >>>>> > >>>>>> One thing to note is that for join queries, the parallelism of each > >>> join > >>>>>> source might be different. This might result in > >>>>>> inconsistencies while using the pre-partitioned/pre-divided data > >>> (e.g., > >>>>>> different mappings of partitions to source operators). > >>>>>> Therefore, it is the job of planner to detect this and adjust the > >>>>>> parallelism. With that having in mind, > >>>>>> the rest (how the split assigners perform) is consistent among many > >>>>>> sources. > >>>>> > >>>>> > >>>>> Could you elaborate a little more on this. I added my two cents here > >>>>> about this part: > >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What > >>>>> if we cannot have a good greatest common divisor, like 127 + 128, > >>>>> could we just utilize one side's pre-partitioned attribute, and let > >>>>> another side just do the shuffle? > >>>>> 2. In our current shuffle remove design (FlinkExpandConversionRule), > >>>>> we don't consider parallelism, we just remove unnecessary shuffles > >>>>> according to the distribution columns. After this FLIP, the > >>>>> parallelism may be bundled with source's partitions, then how will > >>>>> this optimization accommodate with FlinkExpandConversionRule, will > you > >>>>> also change downstream operator's parallelisms if we want to also > >>>>> remove subsequent shuffles? > >>>>> > >>>>> > >>>>> Regarding the new optimization rule, have you also considered to > allow > >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For > >>>>> example, source is pre-partitioned by a, b columns, if we are > >>>>> consuming this source, and do a aggregate on a, b, c, can we utilize > >>>>> this optimization? > >>>>> > >>>>> Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道: > >>>>> > >>>>>> > >>>>>> Hi Jeyhun, > >>>>>> > >>>>>> Thanks for your clarification. > >>>>>> > >>>>> > >>>>>>> Once a new partition is detected, we add it to our existing > >> mapping. > >>>>> > >>>>> Our > >>>>> > >>>>>> mapping looks like Map<Integer, Set<Integer>> > >>>>> > >>>>> subtaskToPartitionAssignment, > >>>>> > >>>>>> where it maps each source subtaskID to zero or more partitions. > >>>>>> > >>>>>> I understand your point. **It would be better if you could sync the > >>>>>> content to the FLIP**. > >>>>>> > >>>>>> Another thing is I'm curious about what the physical plan looks > >> like. > >>> Is > >>>>>> there any specific info that will be added to the table source (like > >>>>>> filter/project pushdown)? It would be great if you could attach an > >>>>> > >>>>> example > >>>>> > >>>>>> to the FLIP. > >>>>>> > >>>>>> Bests, > >>>>>> Jane > >>>>>> > >>>>>> On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov < > >> je.kari...@gmail.com> > >>>>> > >>>>> wrote: > >>>>> > >>>>>> > >>>>> > >>>>>>> Hi Jane, > >>>>>>> > >>>>>>> Thanks for your comments. > >>>>>>> > >>>>>>> > >>>>>>> 1. Concerning the `sourcePartitions()` method, the partition > >>>>> > >>>>> information > >>>>> > >>>>>>>> returned during the optimization phase may not be the same as > >> the > >>>>> > >>>>>>> partition > >>>>> > >>>>>>>> information during runtime execution. For long-running jobs, > >>>>> > >>>>> partitions > >>>>> > >>>>>>> may > >>>>> > >>>>>>>> be continuously created. Is this FLIP equipped to handle > >>> scenarios? > >>>>> > >>>>>>> > >>>>>>> > >>>>>>> - Good point. This scenario is definitely supported. > >>>>>>> Once a new partition is added, or in general, new splits are > >>>>>>> discovered, > >>>>>>> PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit> > >>>>>>> newSplits) > >>>>>>> method will be called. Inside that method, we are able to detect > >> if > >>> a > >>>>> > >>>>> split > >>>>> > >>>>>>> belongs to existing partitions or there is a new partition. > >>>>>>> Once a new partition is detected, we add it to our existing > >> mapping. > >>>>> > >>>>> Our > >>>>> > >>>>>>> mapping looks like Map<Integer, Set<Integer>> > >>>>> > >>>>> subtaskToPartitionAssignment, > >>>>> > >>>>>>> where > >>>>>>> it maps each source subtaskID to zero or more partitions. > >>>>>>> > >>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I > >>>>> > >>>>>>>> understand that it is also necessary to verify whether the hash > >>> key > >>>>> > >>>>>>> within > >>>>> > >>>>>>>> the Exchange node is consistent with the partition key defined > >> in > >>>>> the > >>>>> > >>>>>>> table > >>>>> > >>>>>>>> source that implements `SupportsPartitioning`. > >>>>> > >>>>>>> > >>>>>>> > >>>>>>> - Yes, I overlooked that point, fixed. Actually, the rule is much > >>>>>>> complicated. I tried to simplify it in the FLIP. Good point. > >>>>>>> > >>>>>>> > >>>>>>> 3. Could you elaborate on the desired physical plan and > >> integration > >>>>> > >>>>> with > >>>>> > >>>>>>>> `CompiledPlan` to enhance the overall functionality? > >>>>> > >>>>>>> > >>>>>>> > >>>>>>> - For compiled plan, PartitioningSpec will be used, with a json > >> tag > >>>>>>> "Partitioning". As a result, in the compiled plan, the source > >>> operator > >>>>> > >>>>> will > >>>>> > >>>>>>> have > >>>>>>> "abilities" : [ { "type" : "Partitioning" } ] as part of the > >>> compiled > >>>>> > >>>>> plan. > >>>>> > >>>>>>> More about the implementation details below: > >>>>>>> > >>>>>>> -------------------------------- > >>>>>>> PartitioningSpec class > >>>>>>> -------------------------------- > >>>>>>> @JsonTypeName("Partitioning") > >>>>>>> public final class PartitioningSpec extends SourceAbilitySpecBase > >> { > >>>>>>> // some code here > >>>>>>> @Override > >>>>>>> public void apply(DynamicTableSource tableSource, > >>>>> > >>>>> SourceAbilityContext > >>>>> > >>>>>>> context) { > >>>>>>> if (tableSource instanceof SupportsPartitioning) { > >>>>>>> ((SupportsPartitioning<?>) > >>>>> > >>>>> tableSource).applyPartitionedRead(); > >>>>> > >>>>>>> } else { > >>>>>>> throw new TableException( > >>>>>>> String.format( > >>>>>>> "%s does not support > >>>>> > >>>>> SupportsPartitioning.", > >>>>> > >>>>>>> tableSource.getClass().getName())); > >>>>>>> } > >>>>>>> } > >>>>>>> // some code here > >>>>>>> } > >>>>>>> > >>>>>>> -------------------------------- > >>>>>>> SourceAbilitySpec class > >>>>>>> -------------------------------- > >>>>>>> @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = > >>>>>>> JsonTypeInfo.As.PROPERTY, property = "type") > >>>>>>> @JsonSubTypes({ > >>>>>>> @JsonSubTypes.Type(value = FilterPushDownSpec.class), > >>>>>>> @JsonSubTypes.Type(value = LimitPushDownSpec.class), > >>>>>>> @JsonSubTypes.Type(value = PartitionPushDownSpec.class), > >>>>>>> @JsonSubTypes.Type(value = ProjectPushDownSpec.class), > >>>>>>> @JsonSubTypes.Type(value = ReadingMetadataSpec.class), > >>>>>>> @JsonSubTypes.Type(value = WatermarkPushDownSpec.class), > >>>>>>> @JsonSubTypes.Type(value = SourceWatermarkSpec.class), > >>>>>>> @JsonSubTypes.Type(value = AggregatePushDownSpec.class), > >>>>>>> + @JsonSubTypes.Type(value = PartitioningSpec.class) > >>>>> > >>>>> // > >>>>> > >>>>>>> new added > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> Please let me know if that answers your questions or if you have > >>> other > >>>>>>> comments. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Jeyhun > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com> > >>>>> > >>>>> wrote: > >>>>> > >>>>>>> > >>>>> > >>>>>>>> Hi Jeyhun, > >>>>>>>> > >>>>>>>> Thank you for leading the discussion. I'm generally +1 with this > >>>>> > >>>>>>> proposal, > >>>>> > >>>>>>>> along with some questions. Please see my comments below. > >>>>>>>> > >>>>>>>> 1. Concerning the `sourcePartitions()` method, the partition > >>>>> > >>>>> information > >>>>> > >>>>>>>> returned during the optimization phase may not be the same as > >> the > >>>>> > >>>>>>> partition > >>>>> > >>>>>>>> information during runtime execution. For long-running jobs, > >>>>> > >>>>> partitions > >>>>> > >>>>>>> may > >>>>> > >>>>>>>> be continuously created. Is this FLIP equipped to handle > >>> scenarios? > >>>>>>>> > >>>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization > >> rule, I > >>>>>>>> understand that it is also necessary to verify whether the hash > >>> key > >>>>> > >>>>>>> within > >>>>> > >>>>>>>> the Exchange node is consistent with the partition key defined > >> in > >>>>> the > >>>>> > >>>>>>> table > >>>>> > >>>>>>>> source that implements `SupportsPartitioning`. > >>>>>>>> > >>>>>>>> 3. Could you elaborate on the desired physical plan and > >>> integration > >>>>> > >>>>> with > >>>>> > >>>>>>>> `CompiledPlan` to enhance the overall functionality? > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Jane > >>>>>>>> > >>>>>>>> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes > >>>>> > >>>>> <jhug...@confluent.io.invalid > >>>>> > >>>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>> > >>>>>>>>> Hi Jeyhun, > >>>>>>>>> > >>>>>>>>> I like the idea! Given FLIP-376[1], I wonder if it'd make > >> sense > >>> to > >>>>>>>>> generalize FLIP-434 to be about "pre-divided" data to cover > >>>>> > >>>>> "buckets" > >>>>> > >>>>>>> and > >>>>> > >>>>>>>>> "partitions" (and maybe even situations where a data source is > >>>>> > >>>>>>>> partitioned > >>>>> > >>>>>>>>> and bucketed). > >>>>>>>>> > >>>>>>>>> Separate from that, the page mentions TPC-H Q1 as an example. > >>> For > >>>>> > >>>>> a > >>>>> > >>>>>>>> join, > >>>>> > >>>>>>>>> any two tables joined on the same bucket key should provide a > >>>>> > >>>>> concrete > >>>>> > >>>>>>>>> example of a join. Systems like Kafka Streams/ksqlDB call this > >>>>>>>>> "co-partitioning"; for those systems, it is a requirement > >> placed > >>>>> > >>>>> on the > >>>>> > >>>>>>>>> input sources. For Flink, with FLIP-434, the proposed planner > >>> rule > >>>>>>>>> could remove the shuffle. > >>>>>>>>> > >>>>>>>>> Definitely a fun idea; I look forward to hearing more! > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> > >>>>>>>>> Jim > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> 1. > >>>>>>>>> > >>>>>>>>> > >>>>> > >>>>>>>> > >>>>> > >>>>>>> > >>>>> > >>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause > >>>>> > >>>>>>>>> 2. > >>>>>>>>> > >>>>>>>>> > >>>>> > >>>>>>>> > >>>>> > >>>>>>> > >>>>> > >>>>> > >>>>> > >>> > >> > https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements > >>>>> > >>>>>>>>> > >>>>>>>>> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov < > >>>>> > >>>>> je.kari...@gmail.com> > >>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>> > >>>>>>>>>> Hi devs, > >>>>>>>>>> > >>>>>>>>>> I’d like to start a discussion on FLIP-434: Support > >>>>> > >>>>> optimizations for > >>>>> > >>>>>>>>>> pre-partitioned data sources [1]. > >>>>>>>>>> > >>>>>>>>>> The FLIP introduces taking advantage of pre-partitioned data > >>>>> > >>>>> sources > >>>>> > >>>>>>>> for > >>>>> > >>>>>>>>>> SQL/Table API (it is already supported as experimental > >> feature > >>>>> in > >>>>>>>>>> DataStream API [2]). > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Please find more details in the FLIP wiki document [1]. > >>>>>>>>>> Looking forward to your feedback. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Jeyhun > >>>>>>>>>> > >>>>>>>>>> [1] > >>>>>>>>>> > >>>>>>>>>> > >>>>> > >>>>>>>>> > >>>>> > >>>>>>>> > >>>>> > >>>>>>> > >>>>> > >>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources > >>>>> > >>>>>>>>>> [2] > >>>>>>>>>> > >>>>>>>>>> > >>>>> > >>>>>>>>> > >>>>> > >>>>>>>> > >>>>> > >>>>>>> > >>>>> > >>>>> > >>>>> > >>> > >> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/ > >>>>> > >>>>>>>>>> > >>>>> > >>>>>>>>> > >>>>> > >>>>>>>> > >>>>> > >>>>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> > >>>>> Best, > >>>>> Benchao Li > >>>>> > >>>>> > >>> > >> > >