Hi Sebastian,

I assigned the issue to you. But I suggest creating sub-tasks under this
issue. Because I think this would be a big contribution.
For example, you can split it into:
1. Introduce SupportsAggregatePushDown interface
2. Support SupportsAggregatePushDown in planner
3. Support SupportsAggregatePushDown for JDBC source
4. ...

Best,
Jark

On Thu, 7 Jan 2021 at 23:27, Sebastian Liu <liuyang0...@gmail.com> wrote:

> Hi Jark,
>
> Seems that we have reached the agreement on the proposal. Could you
> please help to assign the below jira ticket to me?
> https://issues.apache.org/jira/browse/FLINK-20791
>
> Jark Wu <imj...@gmail.com> 于2021年1月7日周四 上午10:25写道:
>
>> Thanks for updating the design doc.
>> It looks good to me.
>>
>> Best,
>> Jark
>>
>> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <jingsongl...@gmail.com> wrote:
>>
>>> Sounds good to me.
>>>
>>> We don't have to worry about future changes, because it has covered all
>>> the capabilities of calcite aggregation.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <liuyang0...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jark,
>>>>
>>>> Sounds good to me. For better scalability in the future, we could add
>>>> the AggregateExpression.
>>>> ```
>>>>
>>>> public class AggregateExpression implements ResolvedExpression {
>>>>
>>>>    private final FunctionDefinition functionDefinition;
>>>>
>>>>    private final List<FieldReferenceExpression> args;
>>>>
>>>>    private final @Nullable CallExpression filterExpression;
>>>>
>>>>    private final DataType resultType;
>>>>
>>>>    private final boolean distinct;
>>>>
>>>>    private final boolean approximate;
>>>>
>>>>
>>>>
>>>>    private final boolean ignoreNulls;
>>>>
>>>> }
>>>> ```
>>>>
>>>> And we really only need one GroupingSets parameter for grouping. I have
>>>> updated the related interface in the proposal.
>>>> Appreciate the continued feedback and help.
>>>>
>>>> Jark Wu <imj...@gmail.com> 于2021年1月6日周三 下午9:34写道:
>>>>
>>>>> Hi Liu, Jingsong,
>>>>>
>>>>> Regarding the agg with filter, I think in theory we can support
>>>>> pushing such a pattern into source.
>>>>> We don't need to support it in the first version, but in the long
>>>>> term, we can support it.
>>>>> The designed interface should be future proof.
>>>>>
>>>>> Considering filter arg and distinct flag should be part of the
>>>>> aggregate expression.
>>>>> I'm wondering if CallExpression is a good representation for it.
>>>>> What do you think about proposing the following `AggregateExpression`
>>>>> to replace the `CallExpression`?
>>>>>
>>>>> class AggregateExpression implements ResolvedExpression {
>>>>>     private final FunctionDefinition functionDefinition;
>>>>>     private final List<FieldReferenceExpression> args;
>>>>>     private final @Nullable CallExpression filterExpr;
>>>>>     private final boolean distinct;
>>>>> }
>>>>>
>>>>> Besides, we don't need both groupingFields and groupingSets.
>>>>> `groupingSets` should be a superset of groupingFields.
>>>>> Then the interface of SupportsAggregatePushDown can be:
>>>>>
>>>>> interface SupportsAggregatePushDown {
>>>>>
>>>>>   boolean applyAggregates(
>>>>>     List<int[]> groupingSets,
>>>>>     List<AggregateExpression> aggregates,
>>>>>     DataType producedDataType);
>>>>> }
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <liuyang0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jingsong, Jark,
>>>>>>
>>>>>> Thx so much for our discussion, and the cases mentioned above are
>>>>>> really worthy for further discussion.
>>>>>>
>>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg
>>>>>> -> Exchange -> FinalAgg.
>>>>>> As there is a Calc above the TableSource, this pattern can't match
>>>>>> the LocalAggPushDownRule in the current design.
>>>>>>
>>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>>>>>> call_center group by rollup(cc_class, cc_employees);
>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc.
>>>>>> It's also not covered by the current LocalAggPushDownRule design.
>>>>>>
>>>>>> 3. I want to add a case which we haven't discussed yet.
>>>>>> Aggregate with Having clause.
>>>>>> eg: select COUNT(1) from call_center group by cc_class having
>>>>>> max(cc_tax_percentage) > 0.2;
>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>>>>
>>>>>> The core discussion points are summarized as follows:
>>>>>> a) Aggregate is a more complex scenario than predicates or limits,
>>>>>> and also depends on the different underlying storages.
>>>>>> b) One rule seems can't completely cover all aggregate scenario, but
>>>>>> whether SupportSAggregatePushDown interface can be a bit more general?
>>>>>> c) Could the CallExpression express the semantics of CalCite
>>>>>> AggregateCall?
>>>>>>
>>>>>> IMO: Completely push down aggregate is generally hard for distributed
>>>>>> systems. Usually we need a GROUP BY and exactly
>>>>>> matches the partition mode in downstream storage. At the same time,
>>>>>> the benefit of remove the final aggregate is actually limited.
>>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and IO
>>>>>> benefits. But I also agree that
>>>>>> the SupportsAggregatePushDown interface should be as generic as
>>>>>> possible for future extensions, and meanwhile keep confidence
>>>>>> in the interface we design.
>>>>>>
>>>>>> For core points (a): As the complexity of aggregate, one
>>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / 
>>>>>> Exchange
>>>>>> / FinalXXAgg"
>>>>>> in physical phase. Seems that we can't solve all cases with only one
>>>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>>>>
>>>>>> For core points (b & c): I think we can change the interface to be:
>>>>>> ```
>>>>>>
>>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>>>>> groupingSets);
>>>>>>
>>>>>> ```
>>>>>>
>>>>>>
>>>>>> Simple Group: groupingSets.size() == 1 &&
>>>>>> groupingSets.get(0).equals(groupingFields)
>>>>>>
>>>>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>>>>> groupingFields.cardinality())
>>>>>> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>>>>
>>>>>> Then we can handle the complex grouping case. The Connector developer
>>>>>> of the downstream storage should determine
>>>>>> whether it supports the associated grouping type. For the filter and
>>>>>> having clause, they will convert to be related Calc RelNode,
>>>>>> and no longer in the LocalAggregate node, the CallExpression may be
>>>>>> sufficient to express the semantics of AggregateCall.
>>>>>>
>>>>>> What do you think? Looking forward to our further discussion.
>>>>>>
>>>>>>
>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月6日周三 下午2:24写道:
>>>>>>
>>>>>>> > I think filter expressions and grouping sets are semantic
>>>>>>> arguments instead of utilities. If we want to push them into sources, 
>>>>>>> the
>>>>>>> connector developers should be aware of them.Wrapping them in a context
>>>>>>> implicitly is error-prone that the existing connector will produce wrong
>>>>>>> results when upgrading to new Flink versions.
>>>>>>>
>>>>>>> We can have some mechanism to check the upgrading.
>>>>>>>
>>>>>>> > I think for these cases, providing a new default method to
>>>>>>> override might be a better choice.
>>>>>>>
>>>>>>> Then we will have three or more methods. For the API level, I really
>>>>>>> don't like it...
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong
>>>>>>>
>>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I think filter expressions and grouping sets are semantic arguments
>>>>>>>> instead of utilities.
>>>>>>>> If we want to push them into sources, the connector developers
>>>>>>>> should be aware of them.
>>>>>>>> Wrapping them in a context implicitly is error-prone that the
>>>>>>>> existing connector will produce wrong results
>>>>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>>>>> I think for these cases, providing a new default method to override
>>>>>>>> might be a better choice.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm also curious about aggregate with filter (COUNT(1)
>>>>>>>>> FILTER(WHERE d > 1)). Can we push it down? I'm not sure that a single 
>>>>>>>>> call
>>>>>>>>> expression can express it, and how we should embody it and convey it 
>>>>>>>>> to
>>>>>>>>> users.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong
>>>>>>>>>
>>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <jingsongl...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jark,
>>>>>>>>>>
>>>>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>>>>
>>>>>>>>>> So, this rule can do something more advanced. For example, we can
>>>>>>>>>> push down group sets to source too, for the SQL: "GROUP BY GROUPING 
>>>>>>>>>> SETS
>>>>>>>>>> (f1, f2)". Then, we need to add more information to push down.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jingsong
>>>>>>>>>>
>>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think this may be over designed. We should have confidence in
>>>>>>>>>>> the interface we design, the interface should be stable.
>>>>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>>>>> convenience.
>>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do
>>>>>>>>>>> you know any potential parameters?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jark
>>>>>>>>>>>
>>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <jingsongl...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Sebastian,
>>>>>>>>>>>>
>>>>>>>>>>>> Well, I mean:
>>>>>>>>>>>>
>>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType 
>>>>>>>>>>>> producedDataType);`
>>>>>>>>>>>> VS
>>>>>>>>>>>> ```
>>>>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>>>>
>>>>>>>>>>>> interface Aggregation {
>>>>>>>>>>>>   int[] groupingFields();
>>>>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>>>>   DataType producedDataType();
>>>>>>>>>>>> }
>>>>>>>>>>>> ```
>>>>>>>>>>>>
>>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the 
>>>>>>>>>>>> future, so
>>>>>>>>>>>> make the parameters interface, which is conducive to the future 
>>>>>>>>>>>> expansion
>>>>>>>>>>>> without destroying the compatibility of user implementation. If it 
>>>>>>>>>>>> is the
>>>>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>>>>> liuyang0...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be
>>>>>>>>>>>>> clear in the proposal.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the semantic problem, I think the main point is the
>>>>>>>>>>>>> different returned data types
>>>>>>>>>>>>> for the target aggregate function and the row format returned
>>>>>>>>>>>>> by the underlying storage.
>>>>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>>>>> Need to let developers know that we need to handle the
>>>>>>>>>>>>> semantic differences between
>>>>>>>>>>>>> the underlying storage system and Flink in related connectors.
>>>>>>>>>>>>> [Supplemented in proposal]
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE
>>>>>>>>>>>>> rule set, and better to put it
>>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>>>>
>>>>>>>>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>>>>>>>>> understand your suggestion. Is it to add
>>>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>>>>>>> interface, and holds the
>>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful
>>>>>>>>>>>>>> discussion has solved
>>>>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as
>>>>>>>>>>>>>> far as I know,
>>>>>>>>>>>>>> the semantics of each database is actually different, which
>>>>>>>>>>>>>> may need to be
>>>>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For example, the AVG output types of various databases may be
>>>>>>>>>>>>>> different.
>>>>>>>>>>>>>> For example, MySQL outputs double, this is different from
>>>>>>>>>>>>>> Flink. What
>>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and
>>>>>>>>>>>>>> count, But we also
>>>>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>>>>> phase, which may
>>>>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ## About interface
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>>>>> `Aggregate`
>>>>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this
>>>>>>>>>>>>>> way, we can add
>>>>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think the current design is very good, just put forward
>>>>>>>>>>>>>> some ideas.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>>>>> liuyang0...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>>>>>>> TableSource can
>>>>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>>>>> > It's better to just return a boolean type to indicate
>>>>>>>>>>>>>> whether all of
>>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>>>>> proposal]
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced
>>>>>>>>>>>>>> data type of
>>>>>>>>>>>>>> > the new table source to make sure that the new table source
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>>>>>>> return type.
>>>>>>>>>>>>>> > The reason for adding this parameter in this function is
>>>>>>>>>>>>>> also to facilitate
>>>>>>>>>>>>>> > the user to build the final output type. I have changed
>>>>>>>>>>>>>> this parameter
>>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>>>>> changed to use
>>>>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > final class Result {
>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>> acceptedAggregates;
>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>>>>> > > }
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a
>>>>>>>>>>>>>> boolean return type
>>>>>>>>>>>>>> > > enough?
>>>>>>>>>>>>>> > >     From my understanding, all of the aggregates should
>>>>>>>>>>>>>> be accepted,
>>>>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>>>>> aggregates are
>>>>>>>>>>>>>> > > "accepted".
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced
>>>>>>>>>>>>>> data type of the
>>>>>>>>>>>>>> > > new source, or just the return type of all the agg
>>>>>>>>>>>>>> functions?
>>>>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users
>>>>>>>>>>>>>> to concat a
>>>>>>>>>>>>>> > final
>>>>>>>>>>>>>> > > output type.
>>>>>>>>>>>>>> > >     The return type of each agg function can be obtained
>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to
>>>>>>>>>>>>>> `grouping` or
>>>>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>>>>> "grouping sets".
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > What do you think?
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Best,
>>>>>>>>>>>>>> > > Jark
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <ykt...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>> > >> Best,
>>>>>>>>>>>>>> > >> Kurt
>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>> > >> wrote:
>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is
>>>>>>>>>>>>>> more like a
>>>>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2
>>>>>>>>>>>>>> which handle all in
>>>>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator
>>>>>>>>>>>>>> rather than logical
>>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>>>>>>>>> suggestions.
>>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of
>>>>>>>>>>>>>> upgrading the new
>>>>>>>>>>>>>> > table
>>>>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>>>>> > >>>> > we really should consider the new interface for the
>>>>>>>>>>>>>> new optimize
>>>>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>>>>> sooner or later. I
>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>>>>> > >>>> > the ability interface for the
>>>>>>>>>>>>>> SupportsAggregatePushDown definition
>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>> > >>>> above
>>>>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>>>>> better choice, and
>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC
>>>>>>>>>>>>>> connector, as we don't
>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>>>>> should use idea 1
>>>>>>>>>>>>>> > >>>> or
>>>>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on
>>>>>>>>>>>>>> the proposal,
>>>>>>>>>>>>>> > our
>>>>>>>>>>>>>> > >>>> team
>>>>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in
>>>>>>>>>>>>>> 1.11 and
>>>>>>>>>>>>>> > proposed
>>>>>>>>>>>>>> > >>>> a new
>>>>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>>>>> update your
>>>>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>>>>> better
>>>>>>>>>>>>>> > >>>> representation
>>>>>>>>>>>>>> > >>>> > than
>>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because,
>>>>>>>>>>>>>> it would be
>>>>>>>>>>>>>> > easier
>>>>>>>>>>>>>> > >>>> to
>>>>>>>>>>>>>> > >>>> > know
>>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors
>>>>>>>>>>>>>> will be supported
>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>> > >>>> the
>>>>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>>>>> > liuyang0...@gmail.com>
>>>>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>>>>>>> Planner.
>>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently
>>>>>>>>>>>>>> fully done at Flink
>>>>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>>>>>>> storage of Flink
>>>>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>>>>> > >>>> > has
>>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will
>>>>>>>>>>>>>> improve
>>>>>>>>>>>>>> > >>>> performance
>>>>>>>>>>>>>> > >>>> > from
>>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and
>>>>>>>>>>>>>> computation overhead.
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese
>>>>>>>>>>>>>> Academy of Science
>>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com <
>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > --
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy
>>>>>>>>>>>>>> of Science
>>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> > >>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> --
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > --
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > *With kind regards
>>>>>>>>>>>>>> > ------------------------------------------------------------
>>>>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>>
>>>>>>>>>>>>> *With kind regards
>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *With kind regards
>>>>>> ------------------------------------------------------------
>>>>>> Sebastian Liu 刘洋
>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>> Mobile\WeChat: +86—15201613655
>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>> QQ: 3239559*
>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> *With kind regards
>>>> ------------------------------------------------------------
>>>> Sebastian Liu 刘洋
>>>> Institute of Computing Technology, Chinese Academy of Science
>>>> Mobile\WeChat: +86—15201613655
>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>> QQ: 3239559*
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
> QQ: 3239559*
>
>

Reply via email to