Hi Jark,

Seems that we have reached the agreement on the proposal. Could you please
help to assign the below jira ticket to me?
https://issues.apache.org/jira/browse/FLINK-20791

Jark Wu <imj...@gmail.com> 于2021年1月7日周四 上午10:25写道:

> Thanks for updating the design doc.
> It looks good to me.
>
> Best,
> Jark
>
> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <jingsongl...@gmail.com> wrote:
>
>> Sounds good to me.
>>
>> We don't have to worry about future changes, because it has covered all
>> the capabilities of calcite aggregation.
>>
>> Best,
>> Jingsong
>>
>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <liuyang0...@gmail.com>
>> wrote:
>>
>>> Hi Jark,
>>>
>>> Sounds good to me. For better scalability in the future, we could add
>>> the AggregateExpression.
>>> ```
>>>
>>> public class AggregateExpression implements ResolvedExpression {
>>>
>>>    private final FunctionDefinition functionDefinition;
>>>
>>>    private final List<FieldReferenceExpression> args;
>>>
>>>    private final @Nullable CallExpression filterExpression;
>>>
>>>    private final DataType resultType;
>>>
>>>    private final boolean distinct;
>>>
>>>    private final boolean approximate;
>>>
>>>
>>>
>>>    private final boolean ignoreNulls;
>>>
>>> }
>>> ```
>>>
>>> And we really only need one GroupingSets parameter for grouping. I have
>>> updated the related interface in the proposal.
>>> Appreciate the continued feedback and help.
>>>
>>> Jark Wu <imj...@gmail.com> 于2021年1月6日周三 下午9:34写道:
>>>
>>>> Hi Liu, Jingsong,
>>>>
>>>> Regarding the agg with filter, I think in theory we can support pushing
>>>> such a pattern into source.
>>>> We don't need to support it in the first version, but in the long term,
>>>> we can support it.
>>>> The designed interface should be future proof.
>>>>
>>>> Considering filter arg and distinct flag should be part of the
>>>> aggregate expression.
>>>> I'm wondering if CallExpression is a good representation for it.
>>>> What do you think about proposing the following `AggregateExpression`
>>>> to replace the `CallExpression`?
>>>>
>>>> class AggregateExpression implements ResolvedExpression {
>>>>     private final FunctionDefinition functionDefinition;
>>>>     private final List<FieldReferenceExpression> args;
>>>>     private final @Nullable CallExpression filterExpr;
>>>>     private final boolean distinct;
>>>> }
>>>>
>>>> Besides, we don't need both groupingFields and groupingSets.
>>>> `groupingSets` should be a superset of groupingFields.
>>>> Then the interface of SupportsAggregatePushDown can be:
>>>>
>>>> interface SupportsAggregatePushDown {
>>>>
>>>>   boolean applyAggregates(
>>>>     List<int[]> groupingSets,
>>>>     List<AggregateExpression> aggregates,
>>>>     DataType producedDataType);
>>>> }
>>>>
>>>> What do you think?
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <liuyang0...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jingsong, Jark,
>>>>>
>>>>> Thx so much for our discussion, and the cases mentioned above are
>>>>> really worthy for further discussion.
>>>>>
>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>>>> For the current Blink Planner, the optimized plan will be:
>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg
>>>>> -> Exchange -> FinalAgg.
>>>>> As there is a Calc above the TableSource, this pattern can't match the
>>>>> LocalAggPushDownRule in the current design.
>>>>>
>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>>>>> call_center group by rollup(cc_class, cc_employees);
>>>>> For the current Blink Planner, the optimized plan will be:
>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc.
>>>>> It's also not covered by the current LocalAggPushDownRule design.
>>>>>
>>>>> 3. I want to add a case which we haven't discussed yet. Aggregate with
>>>>> Having clause.
>>>>> eg: select COUNT(1) from call_center group by cc_class having
>>>>> max(cc_tax_percentage) > 0.2;
>>>>> For the current Blink Planner, the optimized plan will be:
>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>>>
>>>>> The core discussion points are summarized as follows:
>>>>> a) Aggregate is a more complex scenario than predicates or limits, and
>>>>> also depends on the different underlying storages.
>>>>> b) One rule seems can't completely cover all aggregate scenario, but
>>>>> whether SupportSAggregatePushDown interface can be a bit more general?
>>>>> c) Could the CallExpression express the semantics of CalCite
>>>>> AggregateCall?
>>>>>
>>>>> IMO: Completely push down aggregate is generally hard for distributed
>>>>> systems. Usually we need a GROUP BY and exactly
>>>>> matches the partition mode in downstream storage. At the same time,
>>>>> the benefit of remove the final aggregate is actually limited.
>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and IO
>>>>> benefits. But I also agree that
>>>>> the SupportsAggregatePushDown interface should be as generic as
>>>>> possible for future extensions, and meanwhile keep confidence
>>>>> in the interface we design.
>>>>>
>>>>> For core points (a): As the complexity of aggregate, one
>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / Exchange
>>>>> / FinalXXAgg"
>>>>> in physical phase. Seems that we can't solve all cases with only one
>>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>>>
>>>>> For core points (b & c): I think we can change the interface to be:
>>>>> ```
>>>>>
>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>>>> groupingSets);
>>>>>
>>>>> ```
>>>>>
>>>>>
>>>>> Simple Group: groupingSets.size() == 1 &&
>>>>> groupingSets.get(0).equals(groupingFields)
>>>>>
>>>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>>>> groupingFields.cardinality())
>>>>> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>>>
>>>>> Then we can handle the complex grouping case. The Connector developer
>>>>> of the downstream storage should determine
>>>>> whether it supports the associated grouping type. For the filter and
>>>>> having clause, they will convert to be related Calc RelNode,
>>>>> and no longer in the LocalAggregate node, the CallExpression may be
>>>>> sufficient to express the semantics of AggregateCall.
>>>>>
>>>>> What do you think? Looking forward to our further discussion.
>>>>>
>>>>>
>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月6日周三 下午2:24写道:
>>>>>
>>>>>> > I think filter expressions and grouping sets are semantic arguments
>>>>>> instead of utilities. If we want to push them into sources, the connector
>>>>>> developers should be aware of them.Wrapping them in a context implicitly 
>>>>>> is
>>>>>> error-prone that the existing connector will produce wrong results when
>>>>>> upgrading to new Flink versions.
>>>>>>
>>>>>> We can have some mechanism to check the upgrading.
>>>>>>
>>>>>> > I think for these cases, providing a new default method to override
>>>>>> might be a better choice.
>>>>>>
>>>>>> Then we will have three or more methods. For the API level, I really
>>>>>> don't like it...
>>>>>>
>>>>>> Best,
>>>>>> Jingsong
>>>>>>
>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>>
>>>>>>> I think filter expressions and grouping sets are semantic arguments
>>>>>>> instead of utilities.
>>>>>>> If we want to push them into sources, the connector developers
>>>>>>> should be aware of them.
>>>>>>> Wrapping them in a context implicitly is error-prone that the
>>>>>>> existing connector will produce wrong results
>>>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>>>> I think for these cases, providing a new default method to override
>>>>>>> might be a better choice.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE
>>>>>>>> d > 1)). Can we push it down? I'm not sure that a single call 
>>>>>>>> expression
>>>>>>>> can express it, and how we should embody it and convey it to users.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong
>>>>>>>>
>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <jingsongl...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Jark,
>>>>>>>>>
>>>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>>>
>>>>>>>>> So, this rule can do something more advanced. For example, we can
>>>>>>>>> push down group sets to source too, for the SQL: "GROUP BY GROUPING 
>>>>>>>>> SETS
>>>>>>>>> (f1, f2)". Then, we need to add more information to push down.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong
>>>>>>>>>
>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I think this may be over designed. We should have confidence in
>>>>>>>>>> the interface we design, the interface should be stable.
>>>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>>>> convenience.
>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do
>>>>>>>>>> you know any potential parameters?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jark
>>>>>>>>>>
>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <jingsongl...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Sebastian,
>>>>>>>>>>>
>>>>>>>>>>> Well, I mean:
>>>>>>>>>>>
>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType 
>>>>>>>>>>> producedDataType);`
>>>>>>>>>>> VS
>>>>>>>>>>> ```
>>>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>>>
>>>>>>>>>>> interface Aggregation {
>>>>>>>>>>>   int[] groupingFields();
>>>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>>>   DataType producedDataType();
>>>>>>>>>>> }
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the 
>>>>>>>>>>> future, so
>>>>>>>>>>> make the parameters interface, which is conducive to the future 
>>>>>>>>>>> expansion
>>>>>>>>>>> without destroying the compatibility of user implementation. If it 
>>>>>>>>>>> is the
>>>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jingsong
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>>>> liuyang0...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>>>
>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be
>>>>>>>>>>>> clear in the proposal.
>>>>>>>>>>>>
>>>>>>>>>>>> For the semantic problem, I think the main point is the
>>>>>>>>>>>> different returned data types
>>>>>>>>>>>> for the target aggregate function and the row format returned
>>>>>>>>>>>> by the underlying storage.
>>>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>>>> Need to let developers know that we need to handle the semantic
>>>>>>>>>>>> differences between
>>>>>>>>>>>> the underlying storage system and Flink in related connectors.
>>>>>>>>>>>> [Supplemented in proposal]
>>>>>>>>>>>>
>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE
>>>>>>>>>>>> rule set, and better to put it
>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>>>
>>>>>>>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>>>>>>>> understand your suggestion. Is it to add
>>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>>>>>> interface, and holds the
>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>>>> fields?
>>>>>>>>>>>>
>>>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>>>
>>>>>>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>>>
>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful
>>>>>>>>>>>>> discussion has solved
>>>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>>>
>>>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>>>
>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as far
>>>>>>>>>>>>> as I know,
>>>>>>>>>>>>> the semantics of each database is actually different, which
>>>>>>>>>>>>> may need to be
>>>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For example, the AVG output types of various databases may be
>>>>>>>>>>>>> different.
>>>>>>>>>>>>> For example, MySQL outputs double, this is different from
>>>>>>>>>>>>> Flink. What
>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and count,
>>>>>>>>>>>>> But we also
>>>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>>>
>>>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>>>
>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>>>> phase, which may
>>>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>>>
>>>>>>>>>>>>> ## About interface
>>>>>>>>>>>>>
>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>>>> `Aggregate`
>>>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this
>>>>>>>>>>>>> way, we can add
>>>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think the current design is very good, just put forward some
>>>>>>>>>>>>> ideas.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>>>> liuyang0...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>>>>>> TableSource can
>>>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>>>> > It's better to just return a boolean type to indicate
>>>>>>>>>>>>> whether all of
>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>>>> proposal]
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced
>>>>>>>>>>>>> data type of
>>>>>>>>>>>>> > the new table source to make sure that the new table source
>>>>>>>>>>>>> can
>>>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>>>>>> return type.
>>>>>>>>>>>>> > The reason for adding this parameter in this function is
>>>>>>>>>>>>> also to facilitate
>>>>>>>>>>>>> > the user to build the final output type. I have changed this
>>>>>>>>>>>>> parameter
>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>>>> changed to use
>>>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > final class Result {
>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>> acceptedAggregates;
>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>>>> > > }
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a
>>>>>>>>>>>>> boolean return type
>>>>>>>>>>>>> > > enough?
>>>>>>>>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>>>>>>>>> accepted,
>>>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>>>> aggregates are
>>>>>>>>>>>>> > > "accepted".
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced
>>>>>>>>>>>>> data type of the
>>>>>>>>>>>>> > > new source, or just the return type of all the agg
>>>>>>>>>>>>> functions?
>>>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users
>>>>>>>>>>>>> to concat a
>>>>>>>>>>>>> > final
>>>>>>>>>>>>> > > output type.
>>>>>>>>>>>>> > >     The return type of each agg function can be obtained
>>>>>>>>>>>>> from the
>>>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to
>>>>>>>>>>>>> `grouping` or
>>>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>>>> "grouping sets".
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > What do you think?
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Best,
>>>>>>>>>>>>> > > Jark
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <ykt...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>>>> > >>
>>>>>>>>>>>>> > >> Best,
>>>>>>>>>>>>> > >> Kurt
>>>>>>>>>>>>> > >>
>>>>>>>>>>>>> > >>
>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>> > >> wrote:
>>>>>>>>>>>>> > >>
>>>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is
>>>>>>>>>>>>> more like a
>>>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2
>>>>>>>>>>>>> which handle all in
>>>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator
>>>>>>>>>>>>> rather than logical
>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>>>>>>>> suggestions.
>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of
>>>>>>>>>>>>> upgrading the new
>>>>>>>>>>>>> > table
>>>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>>>> > >>>> > we really should consider the new interface for the
>>>>>>>>>>>>> new optimize
>>>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>>>> sooner or later. I
>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>>>> > >>>> > the ability interface for the
>>>>>>>>>>>>> SupportsAggregatePushDown definition
>>>>>>>>>>>>> > in
>>>>>>>>>>>>> > >>>> above
>>>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>>>> better choice, and
>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC
>>>>>>>>>>>>> connector, as we don't
>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>>>> should use idea 1
>>>>>>>>>>>>> > >>>> or
>>>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on
>>>>>>>>>>>>> the proposal,
>>>>>>>>>>>>> > our
>>>>>>>>>>>>> > >>>> team
>>>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in
>>>>>>>>>>>>> 1.11 and
>>>>>>>>>>>>> > proposed
>>>>>>>>>>>>> > >>>> a new
>>>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>>>> update your
>>>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>>>> better
>>>>>>>>>>>>> > >>>> representation
>>>>>>>>>>>>> > >>>> > than
>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it
>>>>>>>>>>>>> would be
>>>>>>>>>>>>> > easier
>>>>>>>>>>>>> > >>>> to
>>>>>>>>>>>>> > >>>> > know
>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors will
>>>>>>>>>>>>> be supported
>>>>>>>>>>>>> > in
>>>>>>>>>>>>> > >>>> the
>>>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>>>> > liuyang0...@gmail.com>
>>>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>>>>>> Planner.
>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently
>>>>>>>>>>>>> fully done at Flink
>>>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>>>>>> storage of Flink
>>>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>>>> > >>>> > has
>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will
>>>>>>>>>>>>> improve
>>>>>>>>>>>>> > >>>> performance
>>>>>>>>>>>>> > >>>> > from
>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>>>>>>>>> overhead.
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> >
>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese
>>>>>>>>>>>>> Academy of Science
>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com <
>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > --
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>> Science
>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> > >>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> --
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>> Science
>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > --
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > *With kind regards
>>>>>>>>>>>>> > ------------------------------------------------------------
>>>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> *With kind regards
>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *With kind regards
>>>>> ------------------------------------------------------------
>>>>> Sebastian Liu 刘洋
>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>> Mobile\WeChat: +86—15201613655
>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>> QQ: 3239559*
>>>>>
>>>>>
>>>
>>> --
>>>
>>> *With kind regards
>>> ------------------------------------------------------------
>>> Sebastian Liu 刘洋
>>> Institute of Computing Technology, Chinese Academy of Science
>>> Mobile\WeChat: +86—15201613655
>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>> QQ: 3239559*
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
QQ: 3239559*

Reply via email to