Hi Hang,

Thanks for the comments.

I have a question about the part `Additional option to disable this
> optimization`. Is this option a source configuration or a table
> configuration?


- It is a source configuration.

Besides that, there is a little mistake if I do not understand wrongly.
> Should `Check if upstream_any is pre-partitioned data source AND contains
> the same partition keys as the source` be changed as `Check if upstream_any
> is pre-partitioned data source AND contains the same partition keys as
> downstream_any` ?


- Yes, 'source' should be 'exchange' here.

I materialized both points to the FLIP.

Please let me know if that answers your questions or if you have any other
comments.

Regards,
Jeyhun


On Thu, Mar 14, 2024 at 8:10 AM Hang Ruan <ruanhang1...@gmail.com> wrote:

> Hi, Jeyhun.
>
> Thanks for the FLIP. Totally +1 for it.
>
> I have a question about the part `Additional option to disable this
> optimization`. Is this option a source configuration or a table
> configuration?
>
> Besides that, there is a little mistake if I do not understand wrongly.
> Should `Check if upstream_any is pre-partitioned data source AND contains
> the same partition keys as the source` be changed as `Check if upstream_any
> is pre-partitioned data source AND contains the same partition keys as
> downstream_any` ?
>
> Best,
> Hang
>
> Jeyhun Karimov <je.kari...@gmail.com> 于2024年3月13日周三 21:11写道:
>
> > Hi Jane,
> >
> > Thanks for your comments.
> >
> >
> > 1. Concerning the `sourcePartitions()` method, the partition information
> > > returned during the optimization phase may not be the same as the
> > partition
> > > information during runtime execution. For long-running jobs, partitions
> > may
> > > be continuously created. Is this FLIP equipped to handle scenarios?
> >
> >
> > - Good point. This scenario is definitely supported.
> > Once a new partition is added, or in general, new splits are
> > discovered,
> > PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit>
> > newSplits)
> > method will be called. Inside that method, we are able to detect if a
> split
> > belongs to existing partitions or there is a new partition.
> > Once a new partition is detected, we add it to our existing mapping. Our
> > mapping looks like Map<Integer, Set<Integer>>
> subtaskToPartitionAssignment,
> > where
> > it maps each source subtaskID to zero or more partitions.
> >
> > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > > understand that it is also necessary to verify whether the hash key
> > within
> > > the Exchange node is consistent with the partition key defined in the
> > table
> > > source that implements `SupportsPartitioning`.
> >
> >
> > - Yes, I overlooked that point, fixed. Actually, the rule is much
> > complicated. I tried to simplify it in the FLIP. Good point.
> >
> >
> > 3. Could you elaborate on the desired physical plan and integration with
> > > `CompiledPlan` to enhance the overall functionality?
> >
> >
> > - For compiled plan, PartitioningSpec will be used, with a json tag
> > "Partitioning". As a result, in the compiled plan, the source operator
> will
> > have
> > "abilities" : [ { "type" : "Partitioning" } ] as part of the compiled
> plan.
> > More about the implementation details below:
> >
> > --------------------------------
> > PartitioningSpec class
> > --------------------------------
> > @JsonTypeName("Partitioning")
> > public final class PartitioningSpec extends SourceAbilitySpecBase {
> >  // some code here
> >     @Override
> >     public void apply(DynamicTableSource tableSource,
> SourceAbilityContext
> > context) {
> >         if (tableSource instanceof SupportsPartitioning) {
> >             ((SupportsPartitioning<?>)
> tableSource).applyPartitionedRead();
> >         } else {
> >             throw new TableException(
> >                     String.format(
> >                             "%s does not support SupportsPartitioning.",
> >                             tableSource.getClass().getName()));
> >         }
> >     }
> >   // some code here
> > }
> >
> > --------------------------------
> > SourceAbilitySpec class
> > --------------------------------
> > @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
> > JsonTypeInfo.As.PROPERTY, property = "type")
> > @JsonSubTypes({
> >     @JsonSubTypes.Type(value = FilterPushDownSpec.class),
> >     @JsonSubTypes.Type(value = LimitPushDownSpec.class),
> >     @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
> >     @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
> >     @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
> >     @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
> >     @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
> >     @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
> > +  @JsonSubTypes.Type(value = PartitioningSpec.class)
>  //
> > new added
> >
> >
> >
> > Please let me know if that answers your questions or if you have other
> > comments.
> >
> > Regards,
> > Jeyhun
> >
> >
> > On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com> wrote:
> >
> > > Hi Jeyhun,
> > >
> > > Thank you for leading the discussion. I'm generally +1 with this
> > proposal,
> > > along with some questions. Please see my comments below.
> > >
> > > 1. Concerning the `sourcePartitions()` method, the partition
> information
> > > returned during the optimization phase may not be the same as the
> > partition
> > > information during runtime execution. For long-running jobs, partitions
> > may
> > > be continuously created. Is this FLIP equipped to handle scenarios?
> > >
> > > 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> > > understand that it is also necessary to verify whether the hash key
> > within
> > > the Exchange node is consistent with the partition key defined in the
> > table
> > > source that implements `SupportsPartitioning`.
> > >
> > > 3. Could you elaborate on the desired physical plan and integration
> with
> > > `CompiledPlan` to enhance the overall functionality?
> > >
> > > Best,
> > > Jane
> > >
> > > On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes
> <jhug...@confluent.io.invalid
> > >
> > > wrote:
> > >
> > > > Hi Jeyhun,
> > > >
> > > > I like the idea!  Given FLIP-376[1], I wonder if it'd make sense to
> > > > generalize FLIP-434 to be about "pre-divided" data to cover "buckets"
> > and
> > > > "partitions" (and maybe even situations where a data source is
> > > partitioned
> > > > and bucketed).
> > > >
> > > > Separate from that, the page mentions TPC-H Q1 as an example.  For a
> > > join,
> > > > any two tables joined on the same bucket key should provide a
> concrete
> > > > example of a join.  Systems like Kafka Streams/ksqlDB call this
> > > > "co-partitioning"; for those systems, it is a requirement placed on
> the
> > > > input sources.  For Flink, with FLIP-434, the proposed planner rule
> > > > could remove the shuffle.
> > > >
> > > > Definitely a fun idea; I look forward to hearing more!
> > > >
> > > > Cheers,
> > > >
> > > > Jim
> > > >
> > > >
> > > > 1.
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> > > > 2.
> > > >
> > > >
> > >
> >
> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
> > > >
> > > > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <je.kari...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi devs,
> > > > >
> > > > > I’d like to start a discussion on FLIP-434: Support optimizations
> for
> > > > > pre-partitioned data sources [1].
> > > > >
> > > > > The FLIP introduces taking advantage of pre-partitioned data
> sources
> > > for
> > > > > SQL/Table API (it is already supported as experimental feature in
> > > > > DataStream API [2]).
> > > > >
> > > > >
> > > > > Please find more details in the FLIP wiki document [1].
> > > > > Looking forward to your feedback.
> > > > >
> > > > > Regards,
> > > > > Jeyhun
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
> > > > >
> > > >
> > >
> >
>

Reply via email to