Hi Lincoln,

Thanks for your reply. My idea was to utilize MapBundleFunction as it was
already used in a similar context - MiniBatchLocalGroupAggFunction.
I can also extend my PoC for streaming sources and get back to continue our
discussion.

Regards,
Jeyhun

On Wed, Apr 3, 2024 at 4:33 PM Lincoln Lee <lincoln.8...@gmail.com> wrote:

> Hi Jeyhun,
>
> Thanks for your quick response!
>
> In streaming scenario, shuffle commonly occurs before the stateful
> operator, and there's a sanity check[1] when the stateful operator
> accesses the state. This implies the consistency requirement of the
> partitioner used for data shuffling and state key selector for state
> accessing(see KeyGroupStreamPartitioner for more details),
> otherwise, there may be state access errors. That is to say, in the
> streaming scenario, it is not only the strict requirement described in
> FlinkRelDistribution#requireStrict, but also the implied consistency of
> hash calculation.
>
> Also, if this flip targets both streaming and batch scenarios, it is
> recommended to do PoC validation for streaming as well.
>
> [1] https://issues.apache.org/jira/browse/FLINK-29430
>
>
> Best,
> Lincoln Lee
>
>
> Leonard Xu <xbjt...@gmail.com> 于2024年4月3日周三 14:25写道:
>
> > Hey, Jeyhun
> >
> > Thanks for kicking off this discussion. I have two questions about
> > streaming sources:
> >
> > (1)The FLIP  motivation section says Kafka broker is already partitioned
> > w.r.t. some key[s] , Is this the main use case in Kafka world?
> Partitioning
> > by key fields is not the default partitioner of Kafka default
> > partitioner[1] IIUC.
> >
> > (2) Considering the FLIP’s optimization scope aims to both Batch and
> > Streaming pre-partitioned source, could you add a Streaming Source
> example
> > to help me understand the  FLIP better? I think Kafka Source is a good
> > candidates for streaming source example, file source is a good one for
> > batch source and it really helped me to follow-up the FLIP.
> >
> > Best,
> > Leonard
> > [1]
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
> >
> >
> >
> > > 2024年4月3日 上午5:53,Jeyhun Karimov <je.kari...@gmail.com> 写道:
> > >
> > > Hi Lincoln,
> > >
> > > Thanks a lot for your comments. Please find my answers below.
> > >
> > >
> > > 1. Is this flip targeted only at batch scenarios or does it include
> > >> streaming?
> > >> (The flip and the discussion did not explicitly mention this, but in
> the
> > >> draft pr, I only
> > >> saw the implementation for batch scenarios
> > >>
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> > >> <
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> > >>>
> > >> )
> > >> If we expect this also apply to streaming, then we need to consider
> the
> > >> stricter
> > >> shuffle restrictions of streaming compared to batch (if support is
> > >> considered,
> > >> more discussion is needed here, let’s not expand for now). If it only
> > >> applies to batch,
> > >> it is recommended to clarify in the flip.
> > >
> > >
> > > - The FLIP targets both streaming and batch scenarios.
> > > Could you please elaborate more on what you mean by additional
> > > restrictions?
> > >
> > >
> > > 2. In the current implementation, the optimized plan seems to have some
> > >> problems.
> > >> As described in the class comments:
> > >>
> > >>
> >
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> > >
> > > BatchPhysicalHashAggregate (local)
> > >
> > >   +- BatchPhysicalLocalHashAggregate (local)
> > >>      +- BatchPhysicalTableSourceScan
> > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case
> of
> > >> one-phase
> > >> hashAgg, localAgg is not necessary, which is the scenario currently
> > handled
> > >> by
> > >> `RemoveRedundantLocalHashAggRule` and other rules)
> > >
> > >
> > > - Yes, you are completely right. Note that the PR you referenced is
> just
> > a
> > > quick PoC.
> > > Redundant operators you mentioned exist because
> > > `RemoveRedundantShuffleRule` just removes the Exchange operator,
> > > without modifying upstream/downstream operators.
> > > As I mentioned, the implementation is just a PoC and the end
> > implementation
> > > will make sure that existing redundancy elimination rules remove
> > redundant
> > > operators.
> > >
> > >
> > > Also, in the draft pr,
> > >> the optimization of `testShouldEliminatePartitioning1` &
> > >> `testShouldEliminatePartitioning2`
> > >> seems didn't take effect?
> > >>
> > >>
> >
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
> > >
> > >
> > > -  Note that in this example, Exchange operator have a
> > > property KEEP_INPUT_AS_IS that indicates that data distribution is the
> > same
> > > as its input.
> > > Since we have redundant operators (as shown above, two aggregate
> > operators)
> > > one of the rules (not in this FLIP)
> > > adds this Exchange operator with KEEP_INPUT_AS_IS in between.
> > > Similar to my comment above, the end implementation will be except from
> > > redundant operators.
> > >
> > > In conjunction with question 2, I am wondering if we have a better
> choice
> > >> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s
> > >> `RemoveRedundantLocalXXRule`s
> > >> to the `PHYSICAL_REWRITE`).
> > >> For example, let the source actively provide some traits (including
> > >> `FlinkRelDistribution`
> > >> and `RelCollation`) to the planner. The advantage of doing this is to
> > >> directly reuse the
> > >> current shuffle remove optimization (as `FlinkExpandConversionRule`
> > >> implemented),
> > >> and according to the data distribution characteristics provided by the
> > >> source, the planner
> > >> may choose a physical operator with a cheaper costs (for example,
> > according
> > >> to `RelCollation`,
> > >> the planner can use sortAgg, no need for a separate local sort
> > operation).
> > >> WDYT?
> > >
> > >
> > > - Good point. Makes sense to me. I will check FlinkExpandConversionRule
> > to
> > > be utilized in the implementation.
> > >
> > >
> > > Regards,
> > > Jeyhun
> > >
> > >
> > >
> > > On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee <lincoln.8...@gmail.com>
> > wrote:
> > >
> > >> Hi Jeyhun,
> > >>
> > >> Thank you for driving this, it would be very useful optimization!
> > >>
> > >> Sorry for joining the discussion now(I originally planned to reply
> > earlier,
> > >> but
> > >> happened to be during my vacation). I have two questions:
> > >>
> > >> 1. Is this flip targeted only at batch scenarios or does it include
> > >> streaming?
> > >> (The flip and the discussion did not explicitly mention this, but in
> the
> > >> draft pr, I only
> > >> saw the implementation for batch scenarios
> > >>
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> > >> <
> > >>
> >
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> > >>>
> > >> )
> > >> If we expect this also apply to streaming, then we need to consider
> the
> > >> stricter
> > >> shuffle restrictions of streaming compared to batch (if support is
> > >> considered,
> > >> more discussion is needed here, let’s not expand for now). If it only
> > >> applies to batch,
> > >> it is recommended to clarify in the flip.
> > >>
> > >> 2. In the current implementation, the optimized plan seems to have
> some
> > >> problems.
> > >> As described in the class comments:
> > >>
> > >>
> >
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> > >>
> > >> BatchPhysicalHashAggregate (local)
> > >>
> > >>   +- BatchPhysicalLocalHashAggregate (local)
> > >>
> > >>      +- BatchPhysicalTableSourceScan
> > >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case
> of
> > >> one-phase
> > >> hashAgg, localAgg is not necessary, which is the scenario currently
> > handled
> > >> by
> > >> `RemoveRedundantLocalHashAggRule` and other rules).  Also, in the
> draft
> > pr,
> > >> the
> > >> optimization of `testShouldEliminatePartitioning1` &
> > >> `testShouldEliminatePartitioning2`
> > >> seems didn't take effect?
> > >>
> > >>
> >
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
> > >>
> > >> In conjunction with question 2, I am wondering if we have a better
> > choice
> > >> (of course,
> > >> not simply adding the current `PHYSICAL_OPT_RULES`'s
> > >> `RemoveRedundantLocalXXRule`s
> > >> to the `PHYSICAL_REWRITE`).
> > >> For example, let the source actively provide some traits (including
> > >> `FlinkRelDistribution`
> > >> and `RelCollation`) to the planner. The advantage of doing this is to
> > >> directly reuse the
> > >> current shuffle remove optimization (as `FlinkExpandConversionRule`
> > >> implemented),
> > >> and according to the data distribution characteristics provided by the
> > >> source, the planner
> > >> may choose a physical operator with a cheaper costs (for example,
> > according
> > >> to `RelCollation`,
> > >> the planner can use sortAgg, no need for a separate local sort
> > operation).
> > >> WDYT?
> > >>
> > >>
> > >> Best,
> > >> Lincoln Lee
> > >>
> > >>
> > >> Jeyhun Karimov <je.kari...@gmail.com> 于2024年4月1日周一 18:00写道:
> > >>
> > >>> Hi everyone,
> > >>>
> > >>> Thanks for your valuable feedback!
> > >>>
> > >>> The discussion on this FLIP has been going on for a while.
> > >>> I would like to start a vote after 48 hours.
> > >>>
> > >>> Please let me know if you have any concerns or any further
> > >>> questions/comments.
> > >>>
> > >>> Regards,
> > >>> Jeyhun
> > >>>
> > >>>
> > >>> On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov <je.kari...@gmail.com
> >
> > >>> wrote:
> > >>>
> > >>>> Hi Lorenzo,
> > >>>>
> > >>>> Thanks a lot for your comments. Please find my answers below:
> > >>>>
> > >>>>
> > >>>> For the interface `SupportsPartitioning`, why returning `Optional`?
> > >>>>> If one decides to implement that, partitions must exist (at
> maximum,
> > >>>>> return and empty list). Returning `Optional` seem just to
> complicate
> > >> the
> > >>>>> logic of the code using that interface.
> > >>>>
> > >>>>
> > >>>> - The reasoning behind the use of Optional is that sometimes (e.g.,
> in
> > >>>> HiveTableSource) the partitioning info is in catalog.
> > >>>>  Therefore, we return Optional.empty(), so that the list of
> partitions
> > >>> is
> > >>>> queried from the catalog.
> > >>>>
> > >>>>
> > >>>> I foresee the using code doing something like: "if the source
> supports
> > >>>>> partitioning, get the partitions, but if they don't exist, raise a
> > >>> runtime
> > >>>>> exception". Let's simply make that safe at compile time and
> guarantee
> > >>> the
> > >>>>> code that partitions exist.
> > >>>>
> > >>>>
> > >>>> - Yes, once partitions cannot be found, neither from catalog nor
> from
> > >> the
> > >>>> interface implementation, then we raise an exception during query
> > >> compile
> > >>>> time.
> > >>>>
> > >>>>
> > >>>> Another thing is that you show Hive-like partitioning in your FS
> > >>>>> structure, do you think it makes sense to add a note about
> > >>> auto-discovery
> > >>>>> of partitions?
> > >>>>
> > >>>>
> > >>>> - Yes, the FLIP contains just an example partitioning for filesystem
> > >>>> connector. Each connector already "knows" about autodiscovery of its
> > >>>> partitions. And we rely on this fact.
> > >>>>  For example, partition discovery is different between kafka and
> > >>>> filesystem sources. So, we do not handle the manual discovery of
> > >>>> partitions. Please correct me if I misunderstood your question.
> > >>>>
> > >>>>
> > >>>> In other terms, it looks a bit counterintuitive that the user
> > >>> implementing
> > >>>>> the source has to specify which partitions exist statically (and
> they
> > >>> can
> > >>>>> change at runtime), while the source itself knows the data provider
> > >> and
> > >>> can
> > >>>>> directly implement a method `discoverPartitions`. Then Flink would
> > >> take
> > >>>>> care of invoking that method when needed.
> > >>>>
> > >>>>
> > >>>> We utilize table option SOURCE_MONITOR_INTERVAL to check whether
> > >>>> partitions are static or not. So, a user still should give Flink a
> > hint
> > >>>> about partitions being static or not. With static partitions Flink
> can
> > >> do
> > >>>> more optimizations.
> > >>>>
> > >>>> Please let me know if my replies answer your questions or if you
> have
> > >>> more
> > >>>> comments.
> > >>>>
> > >>>> Regards,
> > >>>> Jeyhun
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com>
> > >> wrote:
> > >>>>
> > >>>>> Hello Jeyhun,
> > >>>>> I really like the proposal and definitely makes sense to me.
> > >>>>>
> > >>>>> I have a couple of nits here and there:
> > >>>>>
> > >>>>> For the interface `SupportsPartitioning`, why returning `Optional`?
> > >>>>> If one decides to implement that, partitions must exist (at
> maximum,
> > >>>>> return and empty list). Returning `Optional` seem just to
> complicate
> > >> the
> > >>>>> logic of the code using that interface.
> > >>>>>
> > >>>>> I foresee the using code doing something like: "if the source
> > supports
> > >>>>> partitioning, get the partitions, but if they don't exist, raise a
> > >>> runtime
> > >>>>> exception". Let's simply make that safe at compile time and
> guarantee
> > >>> the
> > >>>>> code that partitions exist.
> > >>>>>
> > >>>>> Another thing is that you show Hive-like partitioning in your FS
> > >>>>> structure, do you think it makes sense to add a note about
> > >>> auto-discovery
> > >>>>> of partitions?
> > >>>>>
> > >>>>> In other terms, it looks a bit counterintuitive that the user
> > >>>>> implementing the source has to specify which partitions exist
> > >> statically
> > >>>>> (and they can change at runtime), while the source itself knows the
> > >> data
> > >>>>> provider and can directly implement a method `discoverPartitions`.
> > >> Then
> > >>>>> Flink would take care of invoking that method when needed.
> > >>>>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov <
> je.kari...@gmail.com
> > >>> ,
> > >>>>> wrote:
> > >>>>>
> > >>>>> Hi Benchao,
> > >>>>>
> > >>>>> Thanks for your comments.
> > >>>>>
> > >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128?
> What
> > >>>>>
> > >>>>> if we cannot have a good greatest common divisor, like 127 + 128,
> > >>>>> could we just utilize one side's pre-partitioned attribute, and let
> > >>>>> another side just do the shuffle?
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> There are two cases we need to consider:
> > >>>>>
> > >>>>> 1. Static Partition (no partitions are added during the query
> > >> execution)
> > >>>>> is
> > >>>>> enabled AND both sources implement "SupportsPartitionPushdown"
> > >>>>>
> > >>>>> In this case, we are sure that no new partitions will be added at
> > >>> runtime.
> > >>>>> So, we have a chance equalize both sources' partitions and
> > >> parallelism,
> > >>>>> IFF
> > >>>>> both sources implement "SupportsPartitionPushdown" interface.
> > >>>>> To achieve so, first we will fetch the existing partitions from
> > >> source1
> > >>>>> (say p_s1) and from source2 (say p_s2).
> > >>>>> Then, we find the intersection of these two partition sets (say
> > >>>>> p_intersect) and pushdown these partitions:
> > >>>>>
> > >>>>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make
> sure
> > >>> that
> > >>>>> only specific partitions are read
> > >>>>> SupportsPartitioning::applyPartitionedRead(p_intersect) //
> > partitioned
> > >>>>> read
> > >>>>> with filtered partitions
> > >>>>>
> > >>>>> Lastly, we need to change the parallelism of 1) source1, 2)
> source2,
> > >> and
> > >>>>> 3)
> > >>>>> all of their downstream operators until (and including) their first
> > >>> common
> > >>>>> ancestor (e.g., join) to be equal to the number of partitions (size
> > of
> > >>>>> p_intersect).
> > >>>>>
> > >>>>> 2. All other cases
> > >>>>>
> > >>>>> In all other cases, the parallelism of both sources and their
> > >> downstream
> > >>>>> operators until their common ancestor would be equal to the
> MIN(p_s1,
> > >>>>> p_s2).
> > >>>>> That is, minimum of the partition size of source1 and partition
> size
> > >> of
> > >>>>> source2 will be selected as the parallelism.
> > >>>>> Coming back to your example, if source1 parallelism is 127 and
> > source2
> > >>>>> parallelism is 128, then we will first check the partition size of
> > >>> source1
> > >>>>> and source2.
> > >>>>> Say partition size of source1 is 100 and partition size of source2
> is
> > >>> 90.
> > >>>>> Then, we would set the parallelism for source1, source2, and all of
> > >>> their
> > >>>>> downstream operators until (and including) the join operator
> > >>>>> to 90 (min(100, 90)).
> > >>>>> We also plan to implement a cost based decision instead of the
> > >>> rule-based
> > >>>>> one (the ones explained above - MIN rule).
> > >>>>> One possible result of the cost based estimation is to keep the
> > >>> partitions
> > >>>>> on one side and perform the shuffling on another source.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> 2. In our current shuffle remove design
> (FlinkExpandConversionRule),
> > >>>>>
> > >>>>> we don't consider parallelism, we just remove unnecessary shuffles
> > >>>>> according to the distribution columns. After this FLIP, the
> > >>>>> parallelism may be bundled with source's partitions, then how will
> > >>>>> this optimization accommodate with FlinkExpandConversionRule, will
> > you
> > >>>>> also change downstream operator's parallelisms if we want to also
> > >>>>> remove subsequent shuffles?
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> - From my understanding of FlinkExpandConversionRule, its removal
> > >> logic
> > >>> is
> > >>>>> agnostic to operator parallelism.
> > >>>>> So, if FlinkExpandConversionRule decides to remove a shuffle
> > >> operation,
> > >>>>> then this FLIP will search another possible shuffle (the one
> closest
> > >> to
> > >>>>> the
> > >>>>> source) to remove.
> > >>>>> If there is such an opportunity, this FLIP will remove the shuffle.
> > >> So,
> > >>>>> from my understanding FlinkExpandConversionRule and this
> optimization
> > >>> rule
> > >>>>> can work together safely.
> > >>>>> Please correct me if I misunderstood your question.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Regarding the new optimization rule, have you also considered to
> > allow
> > >>>>>
> > >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For
> > >>>>> example, source is pre-partitioned by a, b columns, if we are
> > >>>>> consuming this source, and do a aggregate on a, b, c, can we
> utilize
> > >>>>> this optimization?
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> - Good point. Yes, there are some cases that non-strict mode will
> > >> apply.
> > >>>>> For example:
> > >>>>>
> > >>>>> - pre-partitioned columns and aggregate columns are the same but
> have
> > >>>>> different order (e.g., source pre-partitioned w.r.t. a,b and
> > aggregate
> > >>> has
> > >>>>> a GROUP BY b,a)
> > >>>>> - columns in the Exchange operator is a list-prefix of
> pre-partitoned
> > >>>>> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and
> > >>>>> Exchange's partition columns are a,b)
> > >>>>>
> > >>>>> Please let me know if the above answers your questions or if you
> have
> > >>> any
> > >>>>> other comments.
> > >>>>>
> > >>>>> Regards,
> > >>>>> Jeyhun
> > >>>>>
> > >>>>> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org>
> > >>> wrote:
> > >>>>>
> > >>>>> Thanks Jeyhun for bringing up this discussion, it is really
> exiting,
> > >>>>> +1 for the general idea.
> > >>>>>
> > >>>>> We also introduced a similar concept in Flink Batch internally to
> > cope
> > >>>>> with bucketed tables in Hive, it is a very important improvement.
> > >>>>>
> > >>>>>> One thing to note is that for join queries, the parallelism of
> each
> > >>> join
> > >>>>>> source might be different. This might result in
> > >>>>>> inconsistencies while using the pre-partitioned/pre-divided data
> > >>> (e.g.,
> > >>>>>> different mappings of partitions to source operators).
> > >>>>>> Therefore, it is the job of planner to detect this and adjust the
> > >>>>>> parallelism. With that having in mind,
> > >>>>>> the rest (how the split assigners perform) is consistent among
> many
> > >>>>>> sources.
> > >>>>>
> > >>>>>
> > >>>>> Could you elaborate a little more on this. I added my two cents
> here
> > >>>>> about this part:
> > >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128?
> What
> > >>>>> if we cannot have a good greatest common divisor, like 127 + 128,
> > >>>>> could we just utilize one side's pre-partitioned attribute, and let
> > >>>>> another side just do the shuffle?
> > >>>>> 2. In our current shuffle remove design
> (FlinkExpandConversionRule),
> > >>>>> we don't consider parallelism, we just remove unnecessary shuffles
> > >>>>> according to the distribution columns. After this FLIP, the
> > >>>>> parallelism may be bundled with source's partitions, then how will
> > >>>>> this optimization accommodate with FlinkExpandConversionRule, will
> > you
> > >>>>> also change downstream operator's parallelisms if we want to also
> > >>>>> remove subsequent shuffles?
> > >>>>>
> > >>>>>
> > >>>>> Regarding the new optimization rule, have you also considered to
> > allow
> > >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For
> > >>>>> example, source is pre-partitioned by a, b columns, if we are
> > >>>>> consuming this source, and do a aggregate on a, b, c, can we
> utilize
> > >>>>> this optimization?
> > >>>>>
> > >>>>> Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道:
> > >>>>>
> > >>>>>>
> > >>>>>> Hi Jeyhun,
> > >>>>>>
> > >>>>>> Thanks for your clarification.
> > >>>>>>
> > >>>>>
> > >>>>>>> Once a new partition is detected, we add it to our existing
> > >> mapping.
> > >>>>>
> > >>>>> Our
> > >>>>>
> > >>>>>> mapping looks like Map<Integer, Set<Integer>>
> > >>>>>
> > >>>>> subtaskToPartitionAssignment,
> > >>>>>
> > >>>>>> where it maps each source subtaskID to zero or more partitions.
> > >>>>>>
> > >>>>>> I understand your point. **It would be better if you could sync
> the
> > >>>>>> content to the FLIP**.
> > >>>>>>
> > >>>>>> Another thing is I'm curious about what the physical plan looks
> > >> like.
> > >>> Is
> > >>>>>> there any specific info that will be added to the table source
> (like
> > >>>>>> filter/project pushdown)? It would be great if you could attach an
> > >>>>>
> > >>>>> example
> > >>>>>
> > >>>>>> to the FLIP.
> > >>>>>>
> > >>>>>> Bests,
> > >>>>>> Jane
> > >>>>>>
> > >>>>>> On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov <
> > >> je.kari...@gmail.com>
> > >>>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>>
> > >>>>>
> > >>>>>>> Hi Jane,
> > >>>>>>>
> > >>>>>>> Thanks for your comments.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 1. Concerning the `sourcePartitions()` method, the partition
> > >>>>>
> > >>>>> information
> > >>>>>
> > >>>>>>>> returned during the optimization phase may not be the same as
> > >> the
> > >>>>>
> > >>>>>>> partition
> > >>>>>
> > >>>>>>>> information during runtime execution. For long-running jobs,
> > >>>>>
> > >>>>> partitions
> > >>>>>
> > >>>>>>> may
> > >>>>>
> > >>>>>>>> be continuously created. Is this FLIP equipped to handle
> > >>> scenarios?
> > >>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> - Good point. This scenario is definitely supported.
> > >>>>>>> Once a new partition is added, or in general, new splits are
> > >>>>>>> discovered,
> > >>>>>>>
> PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit>
> > >>>>>>> newSplits)
> > >>>>>>> method will be called. Inside that method, we are able to detect
> > >> if
> > >>> a
> > >>>>>
> > >>>>> split
> > >>>>>
> > >>>>>>> belongs to existing partitions or there is a new partition.
> > >>>>>>> Once a new partition is detected, we add it to our existing
> > >> mapping.
> > >>>>>
> > >>>>> Our
> > >>>>>
> > >>>>>>> mapping looks like Map<Integer, Set<Integer>>
> > >>>>>
> > >>>>> subtaskToPartitionAssignment,
> > >>>>>
> > >>>>>>> where
> > >>>>>>> it maps each source subtaskID to zero or more partitions.
> > >>>>>>>
> > >>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule,
> I
> > >>>>>
> > >>>>>>>> understand that it is also necessary to verify whether the hash
> > >>> key
> > >>>>>
> > >>>>>>> within
> > >>>>>
> > >>>>>>>> the Exchange node is consistent with the partition key defined
> > >> in
> > >>>>> the
> > >>>>>
> > >>>>>>> table
> > >>>>>
> > >>>>>>>> source that implements `SupportsPartitioning`.
> > >>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> - Yes, I overlooked that point, fixed. Actually, the rule is much
> > >>>>>>> complicated. I tried to simplify it in the FLIP. Good point.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> 3. Could you elaborate on the desired physical plan and
> > >> integration
> > >>>>>
> > >>>>> with
> > >>>>>
> > >>>>>>>> `CompiledPlan` to enhance the overall functionality?
> > >>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> - For compiled plan, PartitioningSpec will be used, with a json
> > >> tag
> > >>>>>>> "Partitioning". As a result, in the compiled plan, the source
> > >>> operator
> > >>>>>
> > >>>>> will
> > >>>>>
> > >>>>>>> have
> > >>>>>>> "abilities" : [ { "type" : "Partitioning" } ] as part of the
> > >>> compiled
> > >>>>>
> > >>>>> plan.
> > >>>>>
> > >>>>>>> More about the implementation details below:
> > >>>>>>>
> > >>>>>>> --------------------------------
> > >>>>>>> PartitioningSpec class
> > >>>>>>> --------------------------------
> > >>>>>>> @JsonTypeName("Partitioning")
> > >>>>>>> public final class PartitioningSpec extends SourceAbilitySpecBase
> > >> {
> > >>>>>>> // some code here
> > >>>>>>> @Override
> > >>>>>>> public void apply(DynamicTableSource tableSource,
> > >>>>>
> > >>>>> SourceAbilityContext
> > >>>>>
> > >>>>>>> context) {
> > >>>>>>> if (tableSource instanceof SupportsPartitioning) {
> > >>>>>>> ((SupportsPartitioning<?>)
> > >>>>>
> > >>>>> tableSource).applyPartitionedRead();
> > >>>>>
> > >>>>>>> } else {
> > >>>>>>> throw new TableException(
> > >>>>>>> String.format(
> > >>>>>>> "%s does not support
> > >>>>>
> > >>>>> SupportsPartitioning.",
> > >>>>>
> > >>>>>>> tableSource.getClass().getName()));
> > >>>>>>> }
> > >>>>>>> }
> > >>>>>>> // some code here
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> --------------------------------
> > >>>>>>> SourceAbilitySpec class
> > >>>>>>> --------------------------------
> > >>>>>>> @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
> > >>>>>>> JsonTypeInfo.As.PROPERTY, property = "type")
> > >>>>>>> @JsonSubTypes({
> > >>>>>>> @JsonSubTypes.Type(value = FilterPushDownSpec.class),
> > >>>>>>> @JsonSubTypes.Type(value = LimitPushDownSpec.class),
> > >>>>>>> @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
> > >>>>>>> @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
> > >>>>>>> @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
> > >>>>>>> @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
> > >>>>>>> @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
> > >>>>>>> @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
> > >>>>>>> + @JsonSubTypes.Type(value = PartitioningSpec.class)
> > >>>>>
> > >>>>> //
> > >>>>>
> > >>>>>>> new added
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Please let me know if that answers your questions or if you have
> > >>> other
> > >>>>>>> comments.
> > >>>>>>>
> > >>>>>>> Regards,
> > >>>>>>> Jeyhun
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com
> >
> > >>>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>>>
> > >>>>>
> > >>>>>>>> Hi Jeyhun,
> > >>>>>>>>
> > >>>>>>>> Thank you for leading the discussion. I'm generally +1 with this
> > >>>>>
> > >>>>>>> proposal,
> > >>>>>
> > >>>>>>>> along with some questions. Please see my comments below.
> > >>>>>>>>
> > >>>>>>>> 1. Concerning the `sourcePartitions()` method, the partition
> > >>>>>
> > >>>>> information
> > >>>>>
> > >>>>>>>> returned during the optimization phase may not be the same as
> > >> the
> > >>>>>
> > >>>>>>> partition
> > >>>>>
> > >>>>>>>> information during runtime execution. For long-running jobs,
> > >>>>>
> > >>>>> partitions
> > >>>>>
> > >>>>>>> may
> > >>>>>
> > >>>>>>>> be continuously created. Is this FLIP equipped to handle
> > >>> scenarios?
> > >>>>>>>>
> > >>>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization
> > >> rule, I
> > >>>>>>>> understand that it is also necessary to verify whether the hash
> > >>> key
> > >>>>>
> > >>>>>>> within
> > >>>>>
> > >>>>>>>> the Exchange node is consistent with the partition key defined
> > >> in
> > >>>>> the
> > >>>>>
> > >>>>>>> table
> > >>>>>
> > >>>>>>>> source that implements `SupportsPartitioning`.
> > >>>>>>>>
> > >>>>>>>> 3. Could you elaborate on the desired physical plan and
> > >>> integration
> > >>>>>
> > >>>>> with
> > >>>>>
> > >>>>>>>> `CompiledPlan` to enhance the overall functionality?
> > >>>>>>>>
> > >>>>>>>> Best,
> > >>>>>>>> Jane
> > >>>>>>>>
> > >>>>>>>> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes
> > >>>>>
> > >>>>> <jhug...@confluent.io.invalid
> > >>>>>
> > >>>>>>>>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>
> > >>>>>>>>> Hi Jeyhun,
> > >>>>>>>>>
> > >>>>>>>>> I like the idea! Given FLIP-376[1], I wonder if it'd make
> > >> sense
> > >>> to
> > >>>>>>>>> generalize FLIP-434 to be about "pre-divided" data to cover
> > >>>>>
> > >>>>> "buckets"
> > >>>>>
> > >>>>>>> and
> > >>>>>
> > >>>>>>>>> "partitions" (and maybe even situations where a data source is
> > >>>>>
> > >>>>>>>> partitioned
> > >>>>>
> > >>>>>>>>> and bucketed).
> > >>>>>>>>>
> > >>>>>>>>> Separate from that, the page mentions TPC-H Q1 as an example.
> > >>> For
> > >>>>>
> > >>>>> a
> > >>>>>
> > >>>>>>>> join,
> > >>>>>
> > >>>>>>>>> any two tables joined on the same bucket key should provide a
> > >>>>>
> > >>>>> concrete
> > >>>>>
> > >>>>>>>>> example of a join. Systems like Kafka Streams/ksqlDB call this
> > >>>>>>>>> "co-partitioning"; for those systems, it is a requirement
> > >> placed
> > >>>>>
> > >>>>> on the
> > >>>>>
> > >>>>>>>>> input sources. For Flink, with FLIP-434, the proposed planner
> > >>> rule
> > >>>>>>>>> could remove the shuffle.
> > >>>>>>>>>
> > >>>>>>>>> Definitely a fun idea; I look forward to hearing more!
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>>
> > >>>>>>>>> Jim
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> 1.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>
> > >>>>>>>>
> > >>>>>
> > >>>>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> > >>>>>
> > >>>>>>>>> 2.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>
> > >>>>>>>>
> > >>>>>
> > >>>>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
> > >>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <
> > >>>>>
> > >>>>> je.kari...@gmail.com>
> > >>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>
> > >>>>>>>>>> Hi devs,
> > >>>>>>>>>>
> > >>>>>>>>>> I’d like to start a discussion on FLIP-434: Support
> > >>>>>
> > >>>>> optimizations for
> > >>>>>
> > >>>>>>>>>> pre-partitioned data sources [1].
> > >>>>>>>>>>
> > >>>>>>>>>> The FLIP introduces taking advantage of pre-partitioned data
> > >>>>>
> > >>>>> sources
> > >>>>>
> > >>>>>>>> for
> > >>>>>
> > >>>>>>>>>> SQL/Table API (it is already supported as experimental
> > >> feature
> > >>>>> in
> > >>>>>>>>>> DataStream API [2]).
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> Please find more details in the FLIP wiki document [1].
> > >>>>>>>>>> Looking forward to your feedback.
> > >>>>>>>>>>
> > >>>>>>>>>> Regards,
> > >>>>>>>>>> Jeyhun
> > >>>>>>>>>>
> > >>>>>>>>>> [1]
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>
> > >>>>>>>>>
> > >>>>>
> > >>>>>>>>
> > >>>>>
> > >>>>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
> > >>>>>
> > >>>>>>>>>> [2]
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>
> > >>>>>>>>>
> > >>>>>
> > >>>>>>>>
> > >>>>>
> > >>>>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>
> > >>
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
> > >>>>>
> > >>>>>>>>>>
> > >>>>>
> > >>>>>>>>>
> > >>>>>
> > >>>>>>>>
> > >>>>>
> > >>>>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> --
> > >>>>>
> > >>>>> Best,
> > >>>>> Benchao Li
> > >>>>>
> > >>>>>
> > >>>
> > >>
> >
> >
>

Reply via email to