Hey, Jeyhun 

Thanks for kicking off this discussion. I have two questions about streaming 
sources:

(1)The FLIP  motivation section says Kafka broker is already partitioned w.r.t. 
some key[s] , Is this the main use case in Kafka world? Partitioning by key 
fields is not the default partitioner of Kafka default partitioner[1] IIUC.

(2) Considering the FLIP’s optimization scope aims to both Batch and Streaming 
pre-partitioned source, could you add a Streaming Source example to help me 
understand the  FLIP better? I think Kafka Source is a good candidates for 
streaming source example, file source is a good one for batch source and it 
really helped me to follow-up the FLIP.

Best,
Leonard
[1]https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L31



> 2024年4月3日 上午5:53,Jeyhun Karimov <je.kari...@gmail.com> 写道:
> 
> Hi Lincoln,
> 
> Thanks a lot for your comments. Please find my answers below.
> 
> 
> 1. Is this flip targeted only at batch scenarios or does it include
>> streaming?
>> (The flip and the discussion did not explicitly mention this, but in the
>> draft pr, I only
>> saw the implementation for batch scenarios
>> 
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
>> <
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
>>> 
>> )
>> If we expect this also apply to streaming, then we need to consider the
>> stricter
>> shuffle restrictions of streaming compared to batch (if support is
>> considered,
>> more discussion is needed here, let’s not expand for now). If it only
>> applies to batch,
>> it is recommended to clarify in the flip.
> 
> 
> - The FLIP targets both streaming and batch scenarios.
> Could you please elaborate more on what you mean by additional
> restrictions?
> 
> 
> 2. In the current implementation, the optimized plan seems to have some
>> problems.
>> As described in the class comments:
>> 
>> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
> 
> BatchPhysicalHashAggregate (local)
> 
>   +- BatchPhysicalLocalHashAggregate (local)
>>      +- BatchPhysicalTableSourceScan
>> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
>> one-phase
>> hashAgg, localAgg is not necessary, which is the scenario currently handled
>> by
>> `RemoveRedundantLocalHashAggRule` and other rules)
> 
> 
> - Yes, you are completely right. Note that the PR you referenced is just a
> quick PoC.
> Redundant operators you mentioned exist because
> `RemoveRedundantShuffleRule` just removes the Exchange operator,
> without modifying upstream/downstream operators.
> As I mentioned, the implementation is just a PoC and the end implementation
> will make sure that existing redundancy elimination rules remove redundant
> operators.
> 
> 
> Also, in the draft pr,
>> the optimization of `testShouldEliminatePartitioning1` &
>> `testShouldEliminatePartitioning2`
>> seems didn't take effect?
>> 
>> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
> 
> 
> -  Note that in this example, Exchange operator have a
> property KEEP_INPUT_AS_IS that indicates that data distribution is the same
> as its input.
> Since we have redundant operators (as shown above, two aggregate operators)
> one of the rules (not in this FLIP)
> adds this Exchange operator with KEEP_INPUT_AS_IS in between.
> Similar to my comment above, the end implementation will be except from
> redundant operators.
> 
> In conjunction with question 2, I am wondering if we have a better choice
>> (of course, not simply adding the current `PHYSICAL_OPT_RULES`'s
>> `RemoveRedundantLocalXXRule`s
>> to the `PHYSICAL_REWRITE`).
>> For example, let the source actively provide some traits (including
>> `FlinkRelDistribution`
>> and `RelCollation`) to the planner. The advantage of doing this is to
>> directly reuse the
>> current shuffle remove optimization (as `FlinkExpandConversionRule`
>> implemented),
>> and according to the data distribution characteristics provided by the
>> source, the planner
>> may choose a physical operator with a cheaper costs (for example, according
>> to `RelCollation`,
>> the planner can use sortAgg, no need for a separate local sort operation).
>> WDYT?
> 
> 
> - Good point. Makes sense to me. I will check FlinkExpandConversionRule to
> be utilized in the implementation.
> 
> 
> Regards,
> Jeyhun
> 
> 
> 
> On Tue, Apr 2, 2024 at 6:01 PM Lincoln Lee <lincoln.8...@gmail.com> wrote:
> 
>> Hi Jeyhun,
>> 
>> Thank you for driving this, it would be very useful optimization!
>> 
>> Sorry for joining the discussion now(I originally planned to reply earlier,
>> but
>> happened to be during my vacation). I have two questions:
>> 
>> 1. Is this flip targeted only at batch scenarios or does it include
>> streaming?
>> (The flip and the discussion did not explicitly mention this, but in the
>> draft pr, I only
>> saw the implementation for batch scenarios
>> 
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8
>> <
>> https://github.com/apache/flink/pull/24437/files#diff-a6d71dd7d9bf0e7776404f54473b504e1de1240e93f820214fa5d1f082fb30c8%EF%BC%89
>>> 
>> )
>> If we expect this also apply to streaming, then we need to consider the
>> stricter
>> shuffle restrictions of streaming compared to batch (if support is
>> considered,
>> more discussion is needed here, let’s not expand for now). If it only
>> applies to batch,
>> it is recommended to clarify in the flip.
>> 
>> 2. In the current implementation, the optimized plan seems to have some
>> problems.
>> As described in the class comments:
>> 
>> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantShuffleRule.java#L60
>> 
>> BatchPhysicalHashAggregate (local)
>> 
>>   +- BatchPhysicalLocalHashAggregate (local)
>> 
>>      +- BatchPhysicalTableSourceScan
>> The `BatchPhysicalLocalHashAggregate` here is redundant (in the case of
>> one-phase
>> hashAgg, localAgg is not necessary, which is the scenario currently handled
>> by
>> `RemoveRedundantLocalHashAggRule` and other rules).  Also, in the draft pr,
>> the
>> optimization of `testShouldEliminatePartitioning1` &
>> `testShouldEliminatePartitioning2`
>> seems didn't take effect?
>> 
>> https://github.com/apache/flink/blob/d6e3b51fdb9a2e565709e8d7bc619234b3768ed1/flink-table/flink-table-planner/src/test/resources/org/apache/flink/connector/file/table/BatchFileSystemTableSourceTest.xml#L38
>> 
>> In conjunction with question 2, I am wondering if we have a better choice
>> (of course,
>> not simply adding the current `PHYSICAL_OPT_RULES`'s
>> `RemoveRedundantLocalXXRule`s
>> to the `PHYSICAL_REWRITE`).
>> For example, let the source actively provide some traits (including
>> `FlinkRelDistribution`
>> and `RelCollation`) to the planner. The advantage of doing this is to
>> directly reuse the
>> current shuffle remove optimization (as `FlinkExpandConversionRule`
>> implemented),
>> and according to the data distribution characteristics provided by the
>> source, the planner
>> may choose a physical operator with a cheaper costs (for example, according
>> to `RelCollation`,
>> the planner can use sortAgg, no need for a separate local sort operation).
>> WDYT?
>> 
>> 
>> Best,
>> Lincoln Lee
>> 
>> 
>> Jeyhun Karimov <je.kari...@gmail.com> 于2024年4月1日周一 18:00写道:
>> 
>>> Hi everyone,
>>> 
>>> Thanks for your valuable feedback!
>>> 
>>> The discussion on this FLIP has been going on for a while.
>>> I would like to start a vote after 48 hours.
>>> 
>>> Please let me know if you have any concerns or any further
>>> questions/comments.
>>> 
>>> Regards,
>>> Jeyhun
>>> 
>>> 
>>> On Thu, Mar 21, 2024 at 6:01 PM Jeyhun Karimov <je.kari...@gmail.com>
>>> wrote:
>>> 
>>>> Hi Lorenzo,
>>>> 
>>>> Thanks a lot for your comments. Please find my answers below:
>>>> 
>>>> 
>>>> For the interface `SupportsPartitioning`, why returning `Optional`?
>>>>> If one decides to implement that, partitions must exist (at maximum,
>>>>> return and empty list). Returning `Optional` seem just to complicate
>> the
>>>>> logic of the code using that interface.
>>>> 
>>>> 
>>>> - The reasoning behind the use of Optional is that sometimes (e.g., in
>>>> HiveTableSource) the partitioning info is in catalog.
>>>>  Therefore, we return Optional.empty(), so that the list of partitions
>>> is
>>>> queried from the catalog.
>>>> 
>>>> 
>>>> I foresee the using code doing something like: "if the source supports
>>>>> partitioning, get the partitions, but if they don't exist, raise a
>>> runtime
>>>>> exception". Let's simply make that safe at compile time and guarantee
>>> the
>>>>> code that partitions exist.
>>>> 
>>>> 
>>>> - Yes, once partitions cannot be found, neither from catalog nor from
>> the
>>>> interface implementation, then we raise an exception during query
>> compile
>>>> time.
>>>> 
>>>> 
>>>> Another thing is that you show Hive-like partitioning in your FS
>>>>> structure, do you think it makes sense to add a note about
>>> auto-discovery
>>>>> of partitions?
>>>> 
>>>> 
>>>> - Yes, the FLIP contains just an example partitioning for filesystem
>>>> connector. Each connector already "knows" about autodiscovery of its
>>>> partitions. And we rely on this fact.
>>>>  For example, partition discovery is different between kafka and
>>>> filesystem sources. So, we do not handle the manual discovery of
>>>> partitions. Please correct me if I misunderstood your question.
>>>> 
>>>> 
>>>> In other terms, it looks a bit counterintuitive that the user
>>> implementing
>>>>> the source has to specify which partitions exist statically (and they
>>> can
>>>>> change at runtime), while the source itself knows the data provider
>> and
>>> can
>>>>> directly implement a method `discoverPartitions`. Then Flink would
>> take
>>>>> care of invoking that method when needed.
>>>> 
>>>> 
>>>> We utilize table option SOURCE_MONITOR_INTERVAL to check whether
>>>> partitions are static or not. So, a user still should give Flink a hint
>>>> about partitions being static or not. With static partitions Flink can
>> do
>>>> more optimizations.
>>>> 
>>>> Please let me know if my replies answer your questions or if you have
>>> more
>>>> comments.
>>>> 
>>>> Regards,
>>>> Jeyhun
>>>> 
>>>> 
>>>> 
>>>> On Thu, Mar 21, 2024 at 10:03 AM <lorenzo.affe...@ververica.com>
>> wrote:
>>>> 
>>>>> Hello Jeyhun,
>>>>> I really like the proposal and definitely makes sense to me.
>>>>> 
>>>>> I have a couple of nits here and there:
>>>>> 
>>>>> For the interface `SupportsPartitioning`, why returning `Optional`?
>>>>> If one decides to implement that, partitions must exist (at maximum,
>>>>> return and empty list). Returning `Optional` seem just to complicate
>> the
>>>>> logic of the code using that interface.
>>>>> 
>>>>> I foresee the using code doing something like: "if the source supports
>>>>> partitioning, get the partitions, but if they don't exist, raise a
>>> runtime
>>>>> exception". Let's simply make that safe at compile time and guarantee
>>> the
>>>>> code that partitions exist.
>>>>> 
>>>>> Another thing is that you show Hive-like partitioning in your FS
>>>>> structure, do you think it makes sense to add a note about
>>> auto-discovery
>>>>> of partitions?
>>>>> 
>>>>> In other terms, it looks a bit counterintuitive that the user
>>>>> implementing the source has to specify which partitions exist
>> statically
>>>>> (and they can change at runtime), while the source itself knows the
>> data
>>>>> provider and can directly implement a method `discoverPartitions`.
>> Then
>>>>> Flink would take care of invoking that method when needed.
>>>>> On Mar 15, 2024 at 22:09 +0100, Jeyhun Karimov <je.kari...@gmail.com
>>> ,
>>>>> wrote:
>>>>> 
>>>>> Hi Benchao,
>>>>> 
>>>>> Thanks for your comments.
>>>>> 
>>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
>>>>> 
>>>>> if we cannot have a good greatest common divisor, like 127 + 128,
>>>>> could we just utilize one side's pre-partitioned attribute, and let
>>>>> another side just do the shuffle?
>>>>> 
>>>>> 
>>>>> 
>>>>> There are two cases we need to consider:
>>>>> 
>>>>> 1. Static Partition (no partitions are added during the query
>> execution)
>>>>> is
>>>>> enabled AND both sources implement "SupportsPartitionPushdown"
>>>>> 
>>>>> In this case, we are sure that no new partitions will be added at
>>> runtime.
>>>>> So, we have a chance equalize both sources' partitions and
>> parallelism,
>>>>> IFF
>>>>> both sources implement "SupportsPartitionPushdown" interface.
>>>>> To achieve so, first we will fetch the existing partitions from
>> source1
>>>>> (say p_s1) and from source2 (say p_s2).
>>>>> Then, we find the intersection of these two partition sets (say
>>>>> p_intersect) and pushdown these partitions:
>>>>> 
>>>>> SupportsPartitionPushDown::applyPartitions(p_intersect) // make sure
>>> that
>>>>> only specific partitions are read
>>>>> SupportsPartitioning::applyPartitionedRead(p_intersect) // partitioned
>>>>> read
>>>>> with filtered partitions
>>>>> 
>>>>> Lastly, we need to change the parallelism of 1) source1, 2) source2,
>> and
>>>>> 3)
>>>>> all of their downstream operators until (and including) their first
>>> common
>>>>> ancestor (e.g., join) to be equal to the number of partitions (size of
>>>>> p_intersect).
>>>>> 
>>>>> 2. All other cases
>>>>> 
>>>>> In all other cases, the parallelism of both sources and their
>> downstream
>>>>> operators until their common ancestor would be equal to the MIN(p_s1,
>>>>> p_s2).
>>>>> That is, minimum of the partition size of source1 and partition size
>> of
>>>>> source2 will be selected as the parallelism.
>>>>> Coming back to your example, if source1 parallelism is 127 and source2
>>>>> parallelism is 128, then we will first check the partition size of
>>> source1
>>>>> and source2.
>>>>> Say partition size of source1 is 100 and partition size of source2 is
>>> 90.
>>>>> Then, we would set the parallelism for source1, source2, and all of
>>> their
>>>>> downstream operators until (and including) the join operator
>>>>> to 90 (min(100, 90)).
>>>>> We also plan to implement a cost based decision instead of the
>>> rule-based
>>>>> one (the ones explained above - MIN rule).
>>>>> One possible result of the cost based estimation is to keep the
>>> partitions
>>>>> on one side and perform the shuffling on another source.
>>>>> 
>>>>> 
>>>>> 
>>>>> 2. In our current shuffle remove design (FlinkExpandConversionRule),
>>>>> 
>>>>> we don't consider parallelism, we just remove unnecessary shuffles
>>>>> according to the distribution columns. After this FLIP, the
>>>>> parallelism may be bundled with source's partitions, then how will
>>>>> this optimization accommodate with FlinkExpandConversionRule, will you
>>>>> also change downstream operator's parallelisms if we want to also
>>>>> remove subsequent shuffles?
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> - From my understanding of FlinkExpandConversionRule, its removal
>> logic
>>> is
>>>>> agnostic to operator parallelism.
>>>>> So, if FlinkExpandConversionRule decides to remove a shuffle
>> operation,
>>>>> then this FLIP will search another possible shuffle (the one closest
>> to
>>>>> the
>>>>> source) to remove.
>>>>> If there is such an opportunity, this FLIP will remove the shuffle.
>> So,
>>>>> from my understanding FlinkExpandConversionRule and this optimization
>>> rule
>>>>> can work together safely.
>>>>> Please correct me if I misunderstood your question.
>>>>> 
>>>>> 
>>>>> 
>>>>> Regarding the new optimization rule, have you also considered to allow
>>>>> 
>>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For
>>>>> example, source is pre-partitioned by a, b columns, if we are
>>>>> consuming this source, and do a aggregate on a, b, c, can we utilize
>>>>> this optimization?
>>>>> 
>>>>> 
>>>>> 
>>>>> - Good point. Yes, there are some cases that non-strict mode will
>> apply.
>>>>> For example:
>>>>> 
>>>>> - pre-partitioned columns and aggregate columns are the same but have
>>>>> different order (e.g., source pre-partitioned w.r.t. a,b and aggregate
>>> has
>>>>> a GROUP BY b,a)
>>>>> - columns in the Exchange operator is a list-prefix of pre-partitoned
>>>>> columns of source (e.g., source is pre-partitioned w.r.t. a,b,c and
>>>>> Exchange's partition columns are a,b)
>>>>> 
>>>>> Please let me know if the above answers your questions or if you have
>>> any
>>>>> other comments.
>>>>> 
>>>>> Regards,
>>>>> Jeyhun
>>>>> 
>>>>> On Thu, Mar 14, 2024 at 12:48 PM Benchao Li <libenc...@apache.org>
>>> wrote:
>>>>> 
>>>>> Thanks Jeyhun for bringing up this discussion, it is really exiting,
>>>>> +1 for the general idea.
>>>>> 
>>>>> We also introduced a similar concept in Flink Batch internally to cope
>>>>> with bucketed tables in Hive, it is a very important improvement.
>>>>> 
>>>>>> One thing to note is that for join queries, the parallelism of each
>>> join
>>>>>> source might be different. This might result in
>>>>>> inconsistencies while using the pre-partitioned/pre-divided data
>>> (e.g.,
>>>>>> different mappings of partitions to source operators).
>>>>>> Therefore, it is the job of planner to detect this and adjust the
>>>>>> parallelism. With that having in mind,
>>>>>> the rest (how the split assigners perform) is consistent among many
>>>>>> sources.
>>>>> 
>>>>> 
>>>>> Could you elaborate a little more on this. I added my two cents here
>>>>> about this part:
>>>>> 1. What the parallelism would you take? E.g., 128 + 256 => 128? What
>>>>> if we cannot have a good greatest common divisor, like 127 + 128,
>>>>> could we just utilize one side's pre-partitioned attribute, and let
>>>>> another side just do the shuffle?
>>>>> 2. In our current shuffle remove design (FlinkExpandConversionRule),
>>>>> we don't consider parallelism, we just remove unnecessary shuffles
>>>>> according to the distribution columns. After this FLIP, the
>>>>> parallelism may be bundled with source's partitions, then how will
>>>>> this optimization accommodate with FlinkExpandConversionRule, will you
>>>>> also change downstream operator's parallelisms if we want to also
>>>>> remove subsequent shuffles?
>>>>> 
>>>>> 
>>>>> Regarding the new optimization rule, have you also considered to allow
>>>>> some non-strict mode like FlinkRelDistribution#requireStrict? For
>>>>> example, source is pre-partitioned by a, b columns, if we are
>>>>> consuming this source, and do a aggregate on a, b, c, can we utilize
>>>>> this optimization?
>>>>> 
>>>>> Jane Chan <qingyue....@gmail.com> 于2024年3月14日周四 15:24写道:
>>>>> 
>>>>>> 
>>>>>> Hi Jeyhun,
>>>>>> 
>>>>>> Thanks for your clarification.
>>>>>> 
>>>>> 
>>>>>>> Once a new partition is detected, we add it to our existing
>> mapping.
>>>>> 
>>>>> Our
>>>>> 
>>>>>> mapping looks like Map<Integer, Set<Integer>>
>>>>> 
>>>>> subtaskToPartitionAssignment,
>>>>> 
>>>>>> where it maps each source subtaskID to zero or more partitions.
>>>>>> 
>>>>>> I understand your point. **It would be better if you could sync the
>>>>>> content to the FLIP**.
>>>>>> 
>>>>>> Another thing is I'm curious about what the physical plan looks
>> like.
>>> Is
>>>>>> there any specific info that will be added to the table source (like
>>>>>> filter/project pushdown)? It would be great if you could attach an
>>>>> 
>>>>> example
>>>>> 
>>>>>> to the FLIP.
>>>>>> 
>>>>>> Bests,
>>>>>> Jane
>>>>>> 
>>>>>> On Wed, Mar 13, 2024 at 9:11 PM Jeyhun Karimov <
>> je.kari...@gmail.com>
>>>>> 
>>>>> wrote:
>>>>> 
>>>>>> 
>>>>> 
>>>>>>> Hi Jane,
>>>>>>> 
>>>>>>> Thanks for your comments.
>>>>>>> 
>>>>>>> 
>>>>>>> 1. Concerning the `sourcePartitions()` method, the partition
>>>>> 
>>>>> information
>>>>> 
>>>>>>>> returned during the optimization phase may not be the same as
>> the
>>>>> 
>>>>>>> partition
>>>>> 
>>>>>>>> information during runtime execution. For long-running jobs,
>>>>> 
>>>>> partitions
>>>>> 
>>>>>>> may
>>>>> 
>>>>>>>> be continuously created. Is this FLIP equipped to handle
>>> scenarios?
>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> - Good point. This scenario is definitely supported.
>>>>>>> Once a new partition is added, or in general, new splits are
>>>>>>> discovered,
>>>>>>> PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit>
>>>>>>> newSplits)
>>>>>>> method will be called. Inside that method, we are able to detect
>> if
>>> a
>>>>> 
>>>>> split
>>>>> 
>>>>>>> belongs to existing partitions or there is a new partition.
>>>>>>> Once a new partition is detected, we add it to our existing
>> mapping.
>>>>> 
>>>>> Our
>>>>> 
>>>>>>> mapping looks like Map<Integer, Set<Integer>>
>>>>> 
>>>>> subtaskToPartitionAssignment,
>>>>> 
>>>>>>> where
>>>>>>> it maps each source subtaskID to zero or more partitions.
>>>>>>> 
>>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
>>>>> 
>>>>>>>> understand that it is also necessary to verify whether the hash
>>> key
>>>>> 
>>>>>>> within
>>>>> 
>>>>>>>> the Exchange node is consistent with the partition key defined
>> in
>>>>> the
>>>>> 
>>>>>>> table
>>>>> 
>>>>>>>> source that implements `SupportsPartitioning`.
>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> - Yes, I overlooked that point, fixed. Actually, the rule is much
>>>>>>> complicated. I tried to simplify it in the FLIP. Good point.
>>>>>>> 
>>>>>>> 
>>>>>>> 3. Could you elaborate on the desired physical plan and
>> integration
>>>>> 
>>>>> with
>>>>> 
>>>>>>>> `CompiledPlan` to enhance the overall functionality?
>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> - For compiled plan, PartitioningSpec will be used, with a json
>> tag
>>>>>>> "Partitioning". As a result, in the compiled plan, the source
>>> operator
>>>>> 
>>>>> will
>>>>> 
>>>>>>> have
>>>>>>> "abilities" : [ { "type" : "Partitioning" } ] as part of the
>>> compiled
>>>>> 
>>>>> plan.
>>>>> 
>>>>>>> More about the implementation details below:
>>>>>>> 
>>>>>>> --------------------------------
>>>>>>> PartitioningSpec class
>>>>>>> --------------------------------
>>>>>>> @JsonTypeName("Partitioning")
>>>>>>> public final class PartitioningSpec extends SourceAbilitySpecBase
>> {
>>>>>>> // some code here
>>>>>>> @Override
>>>>>>> public void apply(DynamicTableSource tableSource,
>>>>> 
>>>>> SourceAbilityContext
>>>>> 
>>>>>>> context) {
>>>>>>> if (tableSource instanceof SupportsPartitioning) {
>>>>>>> ((SupportsPartitioning<?>)
>>>>> 
>>>>> tableSource).applyPartitionedRead();
>>>>> 
>>>>>>> } else {
>>>>>>> throw new TableException(
>>>>>>> String.format(
>>>>>>> "%s does not support
>>>>> 
>>>>> SupportsPartitioning.",
>>>>> 
>>>>>>> tableSource.getClass().getName()));
>>>>>>> }
>>>>>>> }
>>>>>>> // some code here
>>>>>>> }
>>>>>>> 
>>>>>>> --------------------------------
>>>>>>> SourceAbilitySpec class
>>>>>>> --------------------------------
>>>>>>> @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
>>>>>>> JsonTypeInfo.As.PROPERTY, property = "type")
>>>>>>> @JsonSubTypes({
>>>>>>> @JsonSubTypes.Type(value = FilterPushDownSpec.class),
>>>>>>> @JsonSubTypes.Type(value = LimitPushDownSpec.class),
>>>>>>> @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
>>>>>>> @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
>>>>>>> @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
>>>>>>> @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
>>>>>>> @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
>>>>>>> @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
>>>>>>> + @JsonSubTypes.Type(value = PartitioningSpec.class)
>>>>> 
>>>>> //
>>>>> 
>>>>>>> new added
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Please let me know if that answers your questions or if you have
>>> other
>>>>>>> comments.
>>>>>>> 
>>>>>>> Regards,
>>>>>>> Jeyhun
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com>
>>>>> 
>>>>> wrote:
>>>>> 
>>>>>>> 
>>>>> 
>>>>>>>> Hi Jeyhun,
>>>>>>>> 
>>>>>>>> Thank you for leading the discussion. I'm generally +1 with this
>>>>> 
>>>>>>> proposal,
>>>>> 
>>>>>>>> along with some questions. Please see my comments below.
>>>>>>>> 
>>>>>>>> 1. Concerning the `sourcePartitions()` method, the partition
>>>>> 
>>>>> information
>>>>> 
>>>>>>>> returned during the optimization phase may not be the same as
>> the
>>>>> 
>>>>>>> partition
>>>>> 
>>>>>>>> information during runtime execution. For long-running jobs,
>>>>> 
>>>>> partitions
>>>>> 
>>>>>>> may
>>>>> 
>>>>>>>> be continuously created. Is this FLIP equipped to handle
>>> scenarios?
>>>>>>>> 
>>>>>>>> 2. Regarding the `RemoveRedundantShuffleRule` optimization
>> rule, I
>>>>>>>> understand that it is also necessary to verify whether the hash
>>> key
>>>>> 
>>>>>>> within
>>>>> 
>>>>>>>> the Exchange node is consistent with the partition key defined
>> in
>>>>> the
>>>>> 
>>>>>>> table
>>>>> 
>>>>>>>> source that implements `SupportsPartitioning`.
>>>>>>>> 
>>>>>>>> 3. Could you elaborate on the desired physical plan and
>>> integration
>>>>> 
>>>>> with
>>>>> 
>>>>>>>> `CompiledPlan` to enhance the overall functionality?
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Jane
>>>>>>>> 
>>>>>>>> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes
>>>>> 
>>>>> <jhug...@confluent.io.invalid
>>>>> 
>>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>> 
>>>>>>>>> Hi Jeyhun,
>>>>>>>>> 
>>>>>>>>> I like the idea! Given FLIP-376[1], I wonder if it'd make
>> sense
>>> to
>>>>>>>>> generalize FLIP-434 to be about "pre-divided" data to cover
>>>>> 
>>>>> "buckets"
>>>>> 
>>>>>>> and
>>>>> 
>>>>>>>>> "partitions" (and maybe even situations where a data source is
>>>>> 
>>>>>>>> partitioned
>>>>> 
>>>>>>>>> and bucketed).
>>>>>>>>> 
>>>>>>>>> Separate from that, the page mentions TPC-H Q1 as an example.
>>> For
>>>>> 
>>>>> a
>>>>> 
>>>>>>>> join,
>>>>> 
>>>>>>>>> any two tables joined on the same bucket key should provide a
>>>>> 
>>>>> concrete
>>>>> 
>>>>>>>>> example of a join. Systems like Kafka Streams/ksqlDB call this
>>>>>>>>> "co-partitioning"; for those systems, it is a requirement
>> placed
>>>>> 
>>>>> on the
>>>>> 
>>>>>>>>> input sources. For Flink, with FLIP-434, the proposed planner
>>> rule
>>>>>>>>> could remove the shuffle.
>>>>>>>>> 
>>>>>>>>> Definitely a fun idea; I look forward to hearing more!
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> 
>>>>>>>>> Jim
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 1.
>>>>>>>>> 
>>>>>>>>> 
>>>>> 
>>>>>>>> 
>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
>>>>> 
>>>>>>>>> 2.
>>>>>>>>> 
>>>>>>>>> 
>>>>> 
>>>>>>>> 
>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
>>>>> 
>>>>>>>>> 
>>>>>>>>> On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <
>>>>> 
>>>>> je.kari...@gmail.com>
>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>> 
>>>>>>>>>> Hi devs,
>>>>>>>>>> 
>>>>>>>>>> I’d like to start a discussion on FLIP-434: Support
>>>>> 
>>>>> optimizations for
>>>>> 
>>>>>>>>>> pre-partitioned data sources [1].
>>>>>>>>>> 
>>>>>>>>>> The FLIP introduces taking advantage of pre-partitioned data
>>>>> 
>>>>> sources
>>>>> 
>>>>>>>> for
>>>>> 
>>>>>>>>>> SQL/Table API (it is already supported as experimental
>> feature
>>>>> in
>>>>>>>>>> DataStream API [2]).
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Please find more details in the FLIP wiki document [1].
>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>> 
>>>>>>>>>> Regards,
>>>>>>>>>> Jeyhun
>>>>>>>>>> 
>>>>>>>>>> [1]
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>> 
>>>>> 
>>>>>>>> 
>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
>>>>> 
>>>>>>>>>> [2]
>>>>>>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>> 
>>>>> 
>>>>>>>> 
>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
>>>>> 
>>>>>>>>>> 
>>>>> 
>>>>>>>>> 
>>>>> 
>>>>>>>> 
>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> 
>>>>> Best,
>>>>> Benchao Li
>>>>> 
>>>>> 
>>> 
>> 

Reply via email to