Hi Jane,

Thanks for your comments.


1. Concerning the `sourcePartitions()` method, the partition information
> returned during the optimization phase may not be the same as the partition
> information during runtime execution. For long-running jobs, partitions may
> be continuously created. Is this FLIP equipped to handle scenarios?


- Good point. This scenario is definitely supported.
Once a new partition is added, or in general, new splits are
discovered, PartitionAwareSplitAssigner::addSplits(Collection<FileSourceSplit>
newSplits)
method will be called. Inside that method, we are able to detect if a split
belongs to existing partitions or there is a new partition.
Once a new partition is detected, we add it to our existing mapping. Our
mapping looks like Map<Integer, Set<Integer>> subtaskToPartitionAssignment,
where
it maps each source subtaskID to zero or more partitions.

2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> understand that it is also necessary to verify whether the hash key within
> the Exchange node is consistent with the partition key defined in the table
> source that implements `SupportsPartitioning`.


- Yes, I overlooked that point, fixed. Actually, the rule is much
complicated. I tried to simplify it in the FLIP. Good point.


3. Could you elaborate on the desired physical plan and integration with
> `CompiledPlan` to enhance the overall functionality?


- For compiled plan, PartitioningSpec will be used, with a json tag
"Partitioning". As a result, in the compiled plan, the source operator will
have
"abilities" : [ { "type" : "Partitioning" } ] as part of the compiled plan.
More about the implementation details below:

--------------------------------
PartitioningSpec class
--------------------------------
@JsonTypeName("Partitioning")
public final class PartitioningSpec extends SourceAbilitySpecBase {
 // some code here
    @Override
    public void apply(DynamicTableSource tableSource, SourceAbilityContext
context) {
        if (tableSource instanceof SupportsPartitioning) {
            ((SupportsPartitioning<?>) tableSource).applyPartitionedRead();
        } else {
            throw new TableException(
                    String.format(
                            "%s does not support SupportsPartitioning.",
                            tableSource.getClass().getName()));
        }
    }
  // some code here
}

--------------------------------
SourceAbilitySpec class
--------------------------------
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include =
JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes({
    @JsonSubTypes.Type(value = FilterPushDownSpec.class),
    @JsonSubTypes.Type(value = LimitPushDownSpec.class),
    @JsonSubTypes.Type(value = PartitionPushDownSpec.class),
    @JsonSubTypes.Type(value = ProjectPushDownSpec.class),
    @JsonSubTypes.Type(value = ReadingMetadataSpec.class),
    @JsonSubTypes.Type(value = WatermarkPushDownSpec.class),
    @JsonSubTypes.Type(value = SourceWatermarkSpec.class),
    @JsonSubTypes.Type(value = AggregatePushDownSpec.class),
+  @JsonSubTypes.Type(value = PartitioningSpec.class)                   //
new added



Please let me know if that answers your questions or if you have other
comments.

Regards,
Jeyhun


On Tue, Mar 12, 2024 at 8:56 AM Jane Chan <qingyue....@gmail.com> wrote:

> Hi Jeyhun,
>
> Thank you for leading the discussion. I'm generally +1 with this proposal,
> along with some questions. Please see my comments below.
>
> 1. Concerning the `sourcePartitions()` method, the partition information
> returned during the optimization phase may not be the same as the partition
> information during runtime execution. For long-running jobs, partitions may
> be continuously created. Is this FLIP equipped to handle scenarios?
>
> 2. Regarding the `RemoveRedundantShuffleRule` optimization rule, I
> understand that it is also necessary to verify whether the hash key within
> the Exchange node is consistent with the partition key defined in the table
> source that implements `SupportsPartitioning`.
>
> 3. Could you elaborate on the desired physical plan and integration with
> `CompiledPlan` to enhance the overall functionality?
>
> Best,
> Jane
>
> On Tue, Mar 12, 2024 at 11:11 AM Jim Hughes <jhug...@confluent.io.invalid>
> wrote:
>
> > Hi Jeyhun,
> >
> > I like the idea!  Given FLIP-376[1], I wonder if it'd make sense to
> > generalize FLIP-434 to be about "pre-divided" data to cover "buckets" and
> > "partitions" (and maybe even situations where a data source is
> partitioned
> > and bucketed).
> >
> > Separate from that, the page mentions TPC-H Q1 as an example.  For a
> join,
> > any two tables joined on the same bucket key should provide a concrete
> > example of a join.  Systems like Kafka Streams/ksqlDB call this
> > "co-partitioning"; for those systems, it is a requirement placed on the
> > input sources.  For Flink, with FLIP-434, the proposed planner rule
> > could remove the shuffle.
> >
> > Definitely a fun idea; I look forward to hearing more!
> >
> > Cheers,
> >
> > Jim
> >
> >
> > 1.
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
> > 2.
> >
> >
> https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/#co-partitioning-requirements
> >
> > On Sun, Mar 10, 2024 at 3:38 PM Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> >
> > > Hi devs,
> > >
> > > I’d like to start a discussion on FLIP-434: Support optimizations for
> > > pre-partitioned data sources [1].
> > >
> > > The FLIP introduces taking advantage of pre-partitioned data sources
> for
> > > SQL/Table API (it is already supported as experimental feature in
> > > DataStream API [2]).
> > >
> > >
> > > Please find more details in the FLIP wiki document [1].
> > > Looking forward to your feedback.
> > >
> > > Regards,
> > > Jeyhun
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-434%3A+Support+optimizations+for+pre-partitioned+data+sources
> > > [2]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/experimental/
> > >
> >
>

Reply via email to