Hi Leonard,

Thanks a lot for your comments. Please find my answers below:

(1)The FLIP  motivation section says Kafka broker is already partitioned
> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
> by key fields is not the default partitioner of Kafka default
> partitioner[1] IIUC.


- I see your point. The main usecase for Kafka is for ksqlDB users.
Consider a ksqlDB query [1]

CREATE STREAM products_rekeyed
  WITH (PARTITIONS=6) AS
  SELECT *
   FROM products
   PARTITION BY product_id;

In this query, the output is re-partitioned to be keyed by product_id. In
fact, as the documentation [1] (also Jim mentioned above) mentions, this
repartitioning is a requirement for join queries.


(2) Considering the FLIP’s optimization scope aims to both Batch and
> Streaming pre-partitioned source, could you add a Streaming Source example
> to help me understand the  FLIP better? I think Kafka Source is a good
> candidates for streaming source example, file source is a good one for
> batch source and it really helped me to follow-up the FLIP.


- Currently, I do not have Kafka Source sample integration with this FLIP.
My idea was to integrate first to the Flink main repo (e.g., file source in
streaming & batch mode) and then to external connectors.
But I can try with Kafka Source and get back.

Regards,
Jeyhun

[1] https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/

On Wed, Apr 3, 2024 at 8:25 AM Leonard Xu <xbjt...@gmail.com> wrote:

> Hey, Jeyhun
>
> Thanks for kicking off this discussion. I have two questions about
> streaming sources:
>
> (1)The FLIP  motivation section says Kafka broker is already partitioned
> w.r.t. some key[s] , Is this the main use case in Kafka world? Partitioning
> by key fields is not the default partitioner of Kafka default
> partitioner[1] IIUC.
>
> (2) Considering the FLIP’s optimization scope aims to both Batch and
> Streaming pre-partitioned source, could you add a Streaming Source example
> to help me understand the  FLIP better? I think Kafka Source is a good
> candidates for streaming source example, file source is a good one for
> batch source and it really helped me to follow-up the FLIP.
>
> Best,
> Leonard
> [1]
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31
>
>
>
> > 2024年4月3日 上午5:53,Jeyhun Karimov <je.kari...@gmail.com> 写道:
> >
> > Hi Lincoln,
> >
> > Thanks a lot for your comments. Please find my answers below.
> >
> >
> > 1. Is this flip targeted only at batch scenarios or does it include
> >> streaming?
> >> (The flip and the discussion did not explicitly mention this, but in the
> >> draft pr, I only
> >> saw the implementation for batch scenarios
> >>
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> >> <
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> >>>
> >> )
> >> If we expect this also apply to streaming, then we need to consider the
> >> stricter
> >> shuffle restrictions of streaming compared to batch (if support is
> >> considered,
> >> more discussion is needed here, let’s not expand for now). If it only
> >> applies to batch,
> >> it is recommended to clarify in the flip.
> >
> >
> > - The FLIP targets both streaming and batch scenarios.
> > Could you please elaborate more on what you mean by additional
> > restrictions?
> >
> >
> > 2. In the current implementation, the optimized plan seems to have some
> >> problems.
> >> As described in the class comments:
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> >
> > BatchPhysicalHashAggregate (local)
> >
> >   +- BatchPhysicalLocalHashAggregate (local)
> >>      +- BatchPhysicalTableSourceScan
> >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
> >> one-phase
> >> hashAgg, localAgg is not necessary, which is the scenario currently
> handled
> >> by
> >> `RemoveRedundantLocalHashAggRule` and other rules)
> >
> >
> > - Yes, you are completely right. Note that the PR you referenced is just
> a
> > quick PoC.
> > Redundant operators you mentioned exist because
> > `RemoveRedundantShuffleRule` just removes the Exchange operator,
> > without modifying upstream/downstream operators.
> > As I mentioned, the implementation is just a PoC and the end
> implementation
> > will make sure that existing redundancy elimination rules remove
> redundant
> > operators.
> >
> >
> > Also, in the draft pr,
> >> the optimization of `testShouldEliminatePartitioning1` &
> >> `testShouldEliminatePartitioning2`
> >> seems didn't take effect?
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
> >
> >
> > -  Note that in this example, Exchange operator have a
> > property KEEP_INPUT_AS_IS that indicates that data distribution is the
> same
> > as its input.
> > Since we have redundant operators (as shown above, two aggregate
> operators)
> > one of the rules (not in this FLIP)
> > adds this Exchange operator with KEEP_INPUT_AS_IS in between.
> > Similar to my comment above, the end implementation will be except from
> > redundant operators.
> >
> > In conjunction with question 2, I am wondering if we have a better choice
> >> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s
> >> `RemoveRedundantLocalXXRule`s
> >> to the `PHYSICAL_REWRITE`).
> >> For example, let the source actively provide some traits (including
> >> `FlinkRelDistribution`
> >> and `RelCollation`) to the planner. The advantage of doing this is to
> >> directly reuse the
> >> current shuffle remove optimization (as `FlinkExpandConversionRule`
> >> implemented),
> >> and according to the data distribution characteristics provided by the
> >> source, the planner
> >> may choose a physical operator with a cheaper costs (for example,
> according
> >> to `RelCollation`,
> >> the planner can use sortAgg, no need for a separate local sort
> operation).
> >> WDYT?
> >
> >
> > - Good point. Makes sense to me. I will check FlinkExpandConversionRule
> to
> > be utilized in the implementation.
> >
> >
> > Regards,
> > Jeyhun
> >
> >
> >
> > On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee <lincoln.8...@gmail.com>
> wrote:
> >
> >> Hi Jeyhun,
> >>
> >> Thank you for driving this, it would be very useful optimization!
> >>
> >> Sorry for joining the discussion now(I originally planned to reply
> earlier,
> >> but
> >> happened to be during my vacation). I have two questions:
> >>
> >> 1. Is this flip targeted only at batch scenarios or does it include
> >> streaming?
> >> (The flip and the discussion did not explicitly mention this, but in the
> >> draft pr, I only
> >> saw the implementation for batch scenarios
> >>
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
> >> <
> >>
> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
> >>>
> >> )
> >> If we expect this also apply to streaming, then we need to consider the
> >> stricter
> >> shuffle restrictions of streaming compared to batch (if support is
> >> considered,
> >> more discussion is needed here, let’s not expand for now). If it only
> >> applies to batch,
> >> it is recommended to clarify in the flip.
> >>
> >> 2. In the current implementation, the optimized plan seems to have some
> >> problems.
> >> As described in the class comments:
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> >>
> >> BatchPhysicalHashAggregate (local)
> >>
> >>   +- BatchPhysicalLocalHashAggregate (local)
> >>
> >>      +- BatchPhysicalTableSourceScan
> >> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
> >> one-phase
> >> hashAgg, localAgg is not necessary, which is the scenario currently
> handled
> >> by
> >> `RemoveRedundantLocalHashAggRule` and other rules).  Also, in the draft
> pr,
> >> the
> >> optimization of `testShouldEliminatePartitioning1` &
> >> `testShouldEliminatePartitioning2`
> >> seems didn't take effect?
> >>
> >>
> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
> >>
> >> In conjunction with question 2, I am wondering if we have a better
> choice
> >> (of course,
> >> not simply adding the current `PHYSICAL_OPT_RULES`'s
> >> `RemoveRedundantLocalXXRule`s
> >> to the `PHYSICAL_REWRITE`).
> >> For example, let the source actively provide some traits (including
> >> `FlinkRelDistribution`
> >> and `RelCollation`) to the planner. The advantage of doing this is to
> >> directly reuse the
> >> current shuffle remove optimization (as `FlinkExpandConversionRule`
> >> implemented),
> >> and according to the data distribution characteristics provided by the
> >> source, the planner
> >> may choose a physical operator with a cheaper costs (for example,
> according
> >> to `RelCollation`,
> >> the planner can use sortAgg, no need for a separate local sort
> operation).
> >> WDYT?
> >>
> >>
> >> Best,
> >> Lincoln Lee
> >>
> >>
> >> Jeyhun Karimov <je.kari...@gmail.com> 于2024年4月1日周一 18:00写道:
> >>
> >>> Hi everyone,
> >>>
> >>> Thanks for your valuable feedback!
> >>>
> >>> The discussion on this FLIP has been going on for a while.
> >>> I would like to start a vote after 48 hours.
> >>>
> >>> Please let me know if you have any concerns or any further
> >>> questions/comments.
> >>>
> >>> Regards,
> >>> Jeyhun
> >>>
> >>>
> >>> On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov <je.kari...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Lorenzo,
> >>>>
> >>>> Thanks a lot for your comments. Please find my answers below:
> >>>>
> >>>>
> >>>> For the interface `SupportsPartitioning`, why returning `Optional`?
> >>>>> If one decides to implement that, partitions must exist (at maximum,
> >>>>> return and empty list). Returning `Optional` seem just to complicate
> >> the
> >>>>> logic of the code using that interface.
> >>>>
> >>>>
> >>>> - The reasoning behind the use of Optional is that sometimes (e.g., in
> >>>> HiveTableSource) the partitioning info is in catalog.
> >>>>  Therefore, we return Optional.empty(), so that the list of partitions
> >>> is
> >>>> queried from the catalog.
> >>>>
> >>>>
> >>>> I foresee the using code doing something like: "if the source supports
> >>>>> partitioning, get the partitions, but if they don't exist, raise a
> >>> runtime
> >>>>> exception". Let's simply make that safe at compile time and guarantee
> >>> the
> >>>>> code that partitions exist.
> >>>>
> >>>>
> >>>> - Yes, once partitions cannot be found, neither from catalog nor from
> >> the
> >>>> interface implementation, then we raise an exception during query
> >> compile
> >>>> time.
> >>>>
> >>>>
> >>>> Another thing is that you show Hive-like partitioning in your FS
> >>>>> structure, do you think it makes sense to add a note about
> >>> auto-discovery
> >>>>> of partitions?
> >>>>
> >>>>
> >>>> - Yes, the FLIP contains just an example partitioning for filesystem
> >>>> connector. Each connector already "knows" about autodiscovery of its
> >>>> partitions. And we rely on this fact.
> >>>>  For example, partition discovery is different between kafka and
> >>>> filesystem sources. So, we do not handle the manual discovery of
> >>>> partitions. Please correct me if I misunderstood your question.
> >>>>
> >>>>
> >>>> In other terms, it looks a bit counterintuitive that the user
> >>> implementing
> >>>>> the source has to specify which partitions exist statically (and they
> >>> can
> >>>>> change at runtime), while the source itself knows the data provider
> >> and
> >>> can
> >>>>> directly implement a method `discoverPartitions`. Then Flink would
> >> take
> >>>>> care of invoking that method when needed.
> >>>>
> >>>>
> >>>> We utilize table option SOURCE_MONITOR_INTERVAL to check whether
> >>>> partitions are static or not. So, a user still should give Flink a
> hint
> >>>> about partitions being static or not. With static partitions Flink can
> >> do
> >>>> more optimizations.
> >>>>
> >>>> Please let me know if my replies answer your questions or if you have
> >>> more
> >>>> comments.
> >>>>
> >>>> Regards,
> >>>> Jeyhun
> >>>>
> >>>>
> >>>>
> >>>> On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com>
> >> wrote:
> >>>>
> >>>>> Hello Jeyhun,
> >>>>> I really like the proposal and definitely makes sense to me.
> >>>>>
> >>>>> I have a couple of nits here and there:
> >>>>>
> >>>>> For the interface `SupportsPartitioning`, why returning `Optional`?
> >>>>> If one decides to implement that, partitions must exist (at maximum,
> >>>>> return and empty list). Returning `Optional` seem just to complicate
> >> the
> >>>>> logic of the code using that interface.
> >>>>>
> >>>>> I foresee the using code doing something like: "if the source
> supports
> >>>>> partitioning, get the partitions, but if they don't exist, raise a
> >>> runtime
> >>>>> exception". Let's simply make that safe at compile time and guarantee
> >>> the
> >>>>> code that partitions exist.
> >>>>>
> >>>>> Another thing is that you show Hive-like partitioning in your FS
> >>>>> structure, do you think it makes sense to add a note about
> >>> auto-discovery
> >>>>> of partitions?
> >>>>>
> >>>>> In other terms, it looks a bit counterintuitive that the user
> >>>>> implementing the source has to specify which partitions exist
> >> statically
> >>>>> (and they can change at runtime), while the source itself knows the
> >> data
> >>>>> provider and can directly implement a method `discoverPartitions`.
> >> Then
> >>>>> Flink would take care of invoking that method when needed.
> >>>>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov <je.kari...@gmail.com
> >>> ,
> >>>>> wrote:
> >>>>>
> >>>>> Hi Benchao,
> >>>>>
> >>>>> Thanks for your comments.
> >>>>>
> >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> >>>>>
> >>>>> if we cannot have a good greatest common divisor, like 127 + 128,
> >>>>> could we just utilize one side's pre-partitioned attribute, and let
> >>>>> another side just do the shuffle?
> >>>>>
> >>>>>
> >>>>>
> >>>>> There are two cases we need to consider:
> >>>>>
> >>>>> 1. Static Partition (no partitions are added during the query
> >> execution)
> >>>>> is
> >>>>> enabled AND both sources implement "SupportsPartitionPushdown"
> >>>>>
> >>>>> In this case, we are sure that no new partitions will be added at
> >>> runtime.
> >>>>> So, we have a chance equalize both sources' partitions and
> >> parallelism,
> >>>>> IFF
> >>>>> both sources implement "SupportsPartitionPushdown" interface.
> >>>>> To achieve so, first we will fetch the existing partitions from
> >> source1
> >>>>> (say p_s1) and from source2 (say p_s2).
> >>>>> Then, we find the intersection of these two partition sets (say
> >>>>> p_intersect) and pushdown these partitions:
> >>>>>
> >>>>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure
> >>> that
> >>>>> only specific partitions are read
> >>>>> SupportsPartitioning::applyPartitionedRead(p_intersect) //
> partitioned
> >>>>> read
> >>>>> with filtered partitions
> >>>>>
> >>>>> Lastly, we need to change the parallelism of 1) source1, 2) source2,
> >> and
> >>>>> 3)
> >>>>> all of their downstream operators until (and including) their first
> >>> common
> >>>>> ancestor (e.g., join) to be equal to the number of partitions (size
> of
> >>>>> p_intersect).
> >>>>>
> >>>>> 2. All other cases
> >>>>>
> >>>>> In all other cases, the parallelism of both sources and their
> >> downstream
> >>>>> operators until their common ancestor would be equal to the MIN(p_s1,
> >>>>> p_s2).
> >>>>> That is, minimum of the partition size of source1 and partition size
> >> of
> >>>>> source2 will be selected as the parallelism.
> >>>>> Coming back to your example, if source1 parallelism is 127 and
> source2
> >>>>> parallelism is 128, then we will first check the partition size of
> >>> source1
> >>>>> and source2.
> >>>>> Say partition size of source1 is 100 and partition size of source2 is
> >>> 90.
> >>>>> Then, we would set the parallelism for source1, source2, and all of
> >>> their
> >>>>> downstream operators until (and including) the join operator
> >>>>> to 90 (min(100, 90)).
> >>>>> We also plan to implement a cost based decision instead of the
> >>> rule-based
> >>>>> one (the ones explained above - MIN rule).
> >>>>> One possible result of the cost based estimation is to keep the
> >>> partitions
> >>>>> on one side and perform the shuffling on another source.
> >>>>>
> >>>>>
> >>>>>
> >>>>> 2. In our current shuffle remove design (FlinkExpandConversionRule),
> >>>>>
> >>>>> we don't consider parallelism, we just remove unnecessary shuffles
> >>>>> according to the distribution columns. After this FLIP, the
> >>>>> parallelism may be bundled with source's partitions, then how will
> >>>>> this optimization accommodate with FlinkExpandConversionRule, will
> you
> >>>>> also change downstream operator's parallelisms if we want to also
> >>>>> remove subsequent shuffles?
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> - From my understanding of FlinkExpandConversionRule, its removal
> >> logic
> >>> is
> >>>>> agnostic to operator parallelism.
> >>>>> So, if FlinkExpandConversionRule decides to remove a shuffle
> >> operation,
> >>>>> then this FLIP will search another possible shuffle (the one closest
> >> to
> >>>>> the
> >>>>> source) to remove.
> >>>>> If there is such an opportunity, this FLIP will remove the shuffle.
> >> So,
> >>>>> from my understanding FlinkExpandConversionRule and this optimization
> >>> rule
> >>>>> can work together safely.
> >>>>> Please correct me if I misunderstood your question.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Regarding the new optimization rule, have you also considered to
> allow
> >>>>>
> >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For
> >>>>> example, source is pre-partitioned by a, b columns, if we are
> >>>>> consuming this source, and do a aggregate on a, b, c, can we utilize
> >>>>> this optimization?
> >>>>>
> >>>>>
> >>>>>
> >>>>> - Good point. Yes, there are some cases that non-strict mode will
> >> apply.
> >>>>> For example:
> >>>>>
> >>>>> - pre-partitioned columns and aggregate columns are the same but have
> >>>>> different order (e.g., source pre-partitioned w.r.t. a,b and
> aggregate
> >>> has
> >>>>> a GROUP BY b,a)
> >>>>> - columns in the Exchange operator is a list-prefix of pre-partitoned
> >>>>> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and
> >>>>> Exchange's partition columns are a,b)
> >>>>>
> >>>>> Please let me know if the above answers your questions or if you have
> >>> any
> >>>>> other comments.
> >>>>>
> >>>>> Regards,
> >>>>> Jeyhun
> >>>>>
> >>>>> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org>
> >>> wrote:
> >>>>>
> >>>>> Thanks Jeyhun for bringing up this discussion, it is really exiting,
> >>>>> +1 for the general idea.
> >>>>>
> >>>>> We also introduced a similar concept in Flink Batch internally to
> cope
> >>>>> with bucketed tables in Hive, it is a very important improvement.
> >>>>>
> >>>>>> One thing to note is that for join queries, the parallelism of each
> >>> join
> >>>>>> source might be different. This might result in
> >>>>>> inconsistencies while using the pre-partitioned/pre-divided data
> >>> (e.g.,
> >>>>>> different mappings of partitions to source operators).
> >>>>>> Therefore, it is the job of planner to detect this and adjust the
> >>>>>> parallelism. With that having in mind,
> >>>>>> the rest (how the split assigners perform) is consistent among many
> >>>>>> sources.
> >>>>>
> >>>>>
> >>>>> Could you elaborate a little more on this. I added my two cents here
> >>>>> about this part:
> >>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
> >>>>> if we cannot have a good greatest common divisor, like 127 + 128,
> >>>>> could we just utilize one side's pre-partitioned attribute, and let
> >>>>> another side just do the shuffle?
> >>>>> 2. In our current shuffle remove design (FlinkExpandConversionRule),
> >>>>> we don't consider parallelism, we just remove unnecessary shuffles
> >>>>> according to the distribution columns. After this FLIP, the
> >>>>> parallelism may be bundled with source's partitions, then how will
> >>>>> this optimization accommodate with FlinkExpandConversionRule, will
> you
> >>>>> also change downstream operator's parallelisms if we want to also
> >>>>> remove subsequent shuffles?
> >>>>>
> >>>>>
> >>>>> Regarding the new optimization rule, have you also considered to
> allow
> >>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For
> >>>>> example, source is pre-partitioned by a, b columns, if we are
> >>>>> consuming this source, and do a aggregate on a, b, c, can we utilize
> >>>>> this optimization?
> >>>>>
> >>>>> Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道:
> >>>>>
> >>>>>>
> >>>>>> Hi Jeyhun,
> >>>>>>
> >>>>>> Thanks for your clarification.
> >>>>>>
> >>>>>
> >>>>>>> Once a new partition is detected, we add it to our existing
> >> mapping.
> >>>>>
> >>>>> Our
> >>>>>
> >>>>>> mapping looks like Map<Integer, Set<Integer>>
> >>>>>
> >>>>> subtaskToPartitionAssignment,
> >>>>>
> >>>>>> where it maps each source subtaskID to zero or more partitions.
> >>>>>>
> >>>>>> I understand your point. **It would be better if you could sync the
> >>>>>> content to the FLIP**.
> >>>>>>
> >>>>>> Another thing is I'm curious about what the physical plan looks
> >> like.
> >>> Is
> >>>>>> there any specific info that will be added to the table source (like
> >>>>>> filter/project pushdown)? It would be great if you could attach an
> >>>>>
> >>>>> example
> >>>>>
> >>>>>> to the FLIP.
> >>>>>>
> >>>>>> Bests,
> >>>>>> Jane
> >>>>>>
> >>>>>> On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov <
> >> je.kari...@gmail.com>
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>>
> >>>>>
> >>>>>>> Hi Jane,
> >>>>>>>
> >>>>>>> Thanks for your comments.
> >>>>>>>
> >>>>>>>
> >>>>>>> 1. Concerning the `sourcePartitions()` method, the partition
> >>>>>
> >>>>> information
> >>>>>
> >>>>>>>> returned during the optimization phase may not be the same as
> >> the
> >>>>>
> >>>>>>> partition
> >>>>>
> >>>>>>>> information during runtime execution. For long-running jobs,
> >>>>>
> >>>>> partitions
> >>>>>
> >>>>>>> may
> >>>>>
> >>>>>>>> be continuously created. Is this FLIP equipped to handle
> >>> scenarios?
> >>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> - Good point. This scenario is definitely supported.
> >>>>>>> Once a new partition is added, or in general, new splits are
> >>>>>>> discovered,
> >>>>>>> PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit>
> >>>>>>> newSplits)
> >>>>>>> method will be called. Inside that method, we are able to detect
> >> if
> >>> a
> >>>>>
> >>>>> split
> >>>>>
> >>>>>>> belongs to existing partitions or there is a new partition.
> >>>>>>> Once a new partition is detected, we add it to our existing
> >> mapping.
> >>>>>
> >>>>> Our
> >>>>>
> >>>>>>> mapping looks like Map<Integer, Set<Integer>>
> >>>>>
> >>>>> subtaskToPartitionAssignment,
> >>>>>
> >>>>>>> where
> >>>>>>> it maps each source subtaskID to zero or more partitions.
> >>>>>>>
> >>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> >>>>>
> >>>>>>>> understand that it is also necessary to verify whether the hash
> >>> key
> >>>>>
> >>>>>>> within
> >>>>>
> >>>>>>>> the Exchange node is consistent with the partition key defined
> >> in
> >>>>> the
> >>>>>
> >>>>>>> table
> >>>>>
> >>>>>>>> source that implements `SupportsPartitioning`.
> >>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> - Yes, I overlooked that point, fixed. Actually, the rule is much
> >>>>>>> complicated. I tried to simplify it in the FLIP. Good point.
> >>>>>>>
> >>>>>>>
> >>>>>>> 3. Could you elaborate on the desired physical plan and
> >> integration
> >>>>>
> >>>>> with
> >>>>>
> >>>>>>>> `CompiledPlan` to enhance the overall functionality?
> >>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> - For compiled plan, PartitioningSpec will be used, with a json
> >> tag
> >>>>>>> "Partitioning". As a result, in the compiled plan, the source
> >>> operator
> >>>>>
> >>>>> will
> >>>>>
> >>>>>>> have
> >>>>>>> "abilities" : [ { "type" : "Partitioning" } ] as part of the
> >>> compiled
> >>>>>
> >>>>> plan.
> >>>>>
> >>>>>>> More about the implementation details below:
> >>>>>>>
> >>>>>>> --------------------------------
> >>>>>>> PartitioningSpec class
> >>>>>>> --------------------------------
> >>>>>>> @JsonTypeName("Partitioning")
> >>>>>>> public final class PartitioningSpec extends SourceAbilitySpecBase
> >> {
> >>>>>>> // some code here
> >>>>>>> @Override
> >>>>>>> public void apply(DynamicTableSource tableSource,
> >>>>>
> >>>>> SourceAbilityContext
> >>>>>
> >>>>>>> context) {
> >>>>>>> if (tableSource instanceof SupportsPartitioning) {
> >>>>>>> ((SupportsPartitioning<?>)
> >>>>>
> >>>>> tableSource).applyPartitionedRead();
> >>>>>
> >>>>>>> } else {
> >>>>>>> throw new TableException(
> >>>>>>> String.format(
> >>>>>>> "%s does not support
> >>>>>
> >>>>> SupportsPartitioning.",
> >>>>>
> >>>>>>> tableSource.getClass().getName()));
> >>>>>>> }
> >>>>>>> }
> >>>>>>> // some code here
> >>>>>>> }
> >>>>>>>
> >>>>>>> --------------------------------
> >>>>>>> SourceAbilitySpec class
> >>>>>>> --------------------------------
> >>>>>>> @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
> >>>>>>> JsonTypeInfo.As.PROPERTY, property = "type")
> >>>>>>> @JsonSubTypes({
> >>>>>>> @JsonSubTypes.Type(value = FilterPushDownSpec.class),
> >>>>>>> @JsonSubTypes.Type(value = LimitPushDownSpec.class),
> >>>>>>> @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
> >>>>>>> @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
> >>>>>>> @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
> >>>>>>> @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
> >>>>>>> @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
> >>>>>>> @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
> >>>>>>> + @JsonSubTypes.Type(value = PartitioningSpec.class)
> >>>>>
> >>>>> //
> >>>>>
> >>>>>>> new added
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> Please let me know if that answers your questions or if you have
> >>> other
> >>>>>>> comments.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Jeyhun
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com>
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>>>
> >>>>>
> >>>>>>>> Hi Jeyhun,
> >>>>>>>>
> >>>>>>>> Thank you for leading the discussion. I'm generally +1 with this
> >>>>>
> >>>>>>> proposal,
> >>>>>
> >>>>>>>> along with some questions. Please see my comments below.
> >>>>>>>>
> >>>>>>>> 1. Concerning the `sourcePartitions()` method, the partition
> >>>>>
> >>>>> information
> >>>>>
> >>>>>>>> returned during the optimization phase may not be the same as
> >> the
> >>>>>
> >>>>>>> partition
> >>>>>
> >>>>>>>> information during runtime execution. For long-running jobs,
> >>>>>
> >>>>> partitions
> >>>>>
> >>>>>>> may
> >>>>>
> >>>>>>>> be continuously created. Is this FLIP equipped to handle
> >>> scenarios?
> >>>>>>>>
> >>>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization
> >> rule, I
> >>>>>>>> understand that it is also necessary to verify whether the hash
> >>> key
> >>>>>
> >>>>>>> within
> >>>>>
> >>>>>>>> the Exchange node is consistent with the partition key defined
> >> in
> >>>>> the
> >>>>>
> >>>>>>> table
> >>>>>
> >>>>>>>> source that implements `SupportsPartitioning`.
> >>>>>>>>
> >>>>>>>> 3. Could you elaborate on the desired physical plan and
> >>> integration
> >>>>>
> >>>>> with
> >>>>>
> >>>>>>>> `CompiledPlan` to enhance the overall functionality?
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Jane
> >>>>>>>>
> >>>>>>>> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes
> >>>>>
> >>>>> <jhug...@confluent.io.invalid
> >>>>>
> >>>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>
> >>>>>>>>> Hi Jeyhun,
> >>>>>>>>>
> >>>>>>>>> I like the idea! Given FLIP-376[1], I wonder if it'd make
> >> sense
> >>> to
> >>>>>>>>> generalize FLIP-434 to be about "pre-divided" data to cover
> >>>>>
> >>>>> "buckets"
> >>>>>
> >>>>>>> and
> >>>>>
> >>>>>>>>> "partitions" (and maybe even situations where a data source is
> >>>>>
> >>>>>>>> partitioned
> >>>>>
> >>>>>>>>> and bucketed).
> >>>>>>>>>
> >>>>>>>>> Separate from that, the page mentions TPC-H Q1 as an example.
> >>> For
> >>>>>
> >>>>> a
> >>>>>
> >>>>>>>> join,
> >>>>>
> >>>>>>>>> any two tables joined on the same bucket key should provide a
> >>>>>
> >>>>> concrete
> >>>>>
> >>>>>>>>> example of a join. Systems like Kafka Streams/ksqlDB call this
> >>>>>>>>> "co-partitioning"; for those systems, it is a requirement
> >> placed
> >>>>>
> >>>>> on the
> >>>>>
> >>>>>>>>> input sources. For Flink, with FLIP-434, the proposed planner
> >>> rule
> >>>>>>>>> could remove the shuffle.
> >>>>>>>>>
> >>>>>>>>> Definitely a fun idea; I look forward to hearing more!
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Jim
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 1.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>
> >>>>>>>>
> >>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> >>>>>
> >>>>>>>>> 2.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>
> >>>>>>>>
> >>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
> >>>>>
> >>>>>>>>>
> >>>>>>>>> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <
> >>>>>
> >>>>> je.kari...@gmail.com>
> >>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>
> >>>>>>>>>> Hi devs,
> >>>>>>>>>>
> >>>>>>>>>> I’d like to start a discussion on FLIP-434: Support
> >>>>>
> >>>>> optimizations for
> >>>>>
> >>>>>>>>>> pre-partitioned data sources [1].
> >>>>>>>>>>
> >>>>>>>>>> The FLIP introduces taking advantage of pre-partitioned data
> >>>>>
> >>>>> sources
> >>>>>
> >>>>>>>> for
> >>>>>
> >>>>>>>>>> SQL/Table API (it is already supported as experimental
> >> feature
> >>>>> in
> >>>>>>>>>> DataStream API [2]).
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Please find more details in the FLIP wiki document [1].
> >>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Jeyhun
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>
> >>>>>
> >>>>>>>>
> >>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
> >>>>>
> >>>>>>>>>> [2]
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>
> >>>>>
> >>>>>>>>
> >>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
> >>>>>
> >>>>>>>>>>
> >>>>>
> >>>>>>>>>
> >>>>>
> >>>>>>>>
> >>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>>
> >>>>> Best,
> >>>>> Benchao Li
> >>>>>
> >>>>>
> >>>
> >>
>
>

Reply via email to