Great! Thanks for pushing this work.
Looking forward to the pull requests.

Best,
Jark

On Fri, 8 Jan 2021 at 17:57, Sebastian Liu <liuyang0...@gmail.com> wrote:

> Hi Jark,
>
> Cool, following your suggestions I have created three related subtasks
> under Flink-20791.
> Hope to assign these subtasks to me too, when you have time. And I
> will push forward the relevant implementation.
>
> Jark Wu <imj...@gmail.com> 于2021年1月8日周五 下午12:30写道:
>
>> Hi Sebastian,
>>
>> I assigned the issue to you. But I suggest creating sub-tasks under this
>> issue. Because I think this would be a big contribution.
>> For example, you can split it into:
>> 1. Introduce SupportsAggregatePushDown interface
>> 2. Support SupportsAggregatePushDown in planner
>> 3. Support SupportsAggregatePushDown for JDBC source
>> 4. ...
>>
>> Best,
>> Jark
>>
>> On Thu, 7 Jan 2021 at 23:27, Sebastian Liu <liuyang0...@gmail.com> wrote:
>>
>>> Hi Jark,
>>>
>>> Seems that we have reached the agreement on the proposal. Could you
>>> please help to assign the below jira ticket to me?
>>> https://issues.apache.org/jira/browse/FLINK-20791
>>>
>>> Jark Wu <imj...@gmail.com> 于2021年1月7日周四 上午10:25写道:
>>>
>>>> Thanks for updating the design doc.
>>>> It looks good to me.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <jingsongl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sounds good to me.
>>>>>
>>>>> We don't have to worry about future changes, because it has covered
>>>>> all the capabilities of calcite aggregation.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <liuyang0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> Sounds good to me. For better scalability in the future, we could add
>>>>>> the AggregateExpression.
>>>>>> ```
>>>>>>
>>>>>> public class AggregateExpression implements ResolvedExpression {
>>>>>>
>>>>>>    private final FunctionDefinition functionDefinition;
>>>>>>
>>>>>>    private final List<FieldReferenceExpression> args;
>>>>>>
>>>>>>    private final @Nullable CallExpression filterExpression;
>>>>>>
>>>>>>    private final DataType resultType;
>>>>>>
>>>>>>    private final boolean distinct;
>>>>>>
>>>>>>    private final boolean approximate;
>>>>>>
>>>>>>
>>>>>>
>>>>>>    private final boolean ignoreNulls;
>>>>>>
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> And we really only need one GroupingSets parameter for grouping. I
>>>>>> have updated the related interface in the proposal.
>>>>>> Appreciate the continued feedback and help.
>>>>>>
>>>>>> Jark Wu <imj...@gmail.com> 于2021年1月6日周三 下午9:34写道:
>>>>>>
>>>>>>> Hi Liu, Jingsong,
>>>>>>>
>>>>>>> Regarding the agg with filter, I think in theory we can support
>>>>>>> pushing such a pattern into source.
>>>>>>> We don't need to support it in the first version, but in the long
>>>>>>> term, we can support it.
>>>>>>> The designed interface should be future proof.
>>>>>>>
>>>>>>> Considering filter arg and distinct flag should be part of the
>>>>>>> aggregate expression.
>>>>>>> I'm wondering if CallExpression is a good representation for it.
>>>>>>> What do you think about proposing the following
>>>>>>> `AggregateExpression` to replace the `CallExpression`?
>>>>>>>
>>>>>>> class AggregateExpression implements ResolvedExpression {
>>>>>>>     private final FunctionDefinition functionDefinition;
>>>>>>>     private final List<FieldReferenceExpression> args;
>>>>>>>     private final @Nullable CallExpression filterExpr;
>>>>>>>     private final boolean distinct;
>>>>>>> }
>>>>>>>
>>>>>>> Besides, we don't need both groupingFields and groupingSets.
>>>>>>> `groupingSets` should be a superset of groupingFields.
>>>>>>> Then the interface of SupportsAggregatePushDown can be:
>>>>>>>
>>>>>>> interface SupportsAggregatePushDown {
>>>>>>>
>>>>>>>   boolean applyAggregates(
>>>>>>>     List<int[]> groupingSets,
>>>>>>>     List<AggregateExpression> aggregates,
>>>>>>>     DataType producedDataType);
>>>>>>> }
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <liuyang0...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jingsong, Jark,
>>>>>>>>
>>>>>>>> Thx so much for our discussion, and the cases mentioned above are
>>>>>>>> really worthy for further discussion.
>>>>>>>>
>>>>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>>>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) ->
>>>>>>>> LocalAgg -> Exchange -> FinalAgg.
>>>>>>>> As there is a Calc above the TableSource, this pattern can't match
>>>>>>>> the LocalAggPushDownRule in the current design.
>>>>>>>>
>>>>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1)
>>>>>>>> from call_center group by rollup(cc_class, cc_employees);
>>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>>>> Calc.
>>>>>>>> It's also not covered by the current LocalAggPushDownRule design.
>>>>>>>>
>>>>>>>> 3. I want to add a case which we haven't discussed yet.
>>>>>>>> Aggregate with Having clause.
>>>>>>>> eg: select COUNT(1) from call_center group by cc_class having
>>>>>>>> max(cc_tax_percentage) > 0.2;
>>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>>>>>>
>>>>>>>> The core discussion points are summarized as follows:
>>>>>>>> a) Aggregate is a more complex scenario than predicates or limits,
>>>>>>>> and also depends on the different underlying storages.
>>>>>>>> b) One rule seems can't completely cover all aggregate scenario,
>>>>>>>> but whether SupportSAggregatePushDown interface can be a bit more 
>>>>>>>> general?
>>>>>>>> c) Could the CallExpression express the semantics of CalCite
>>>>>>>> AggregateCall?
>>>>>>>>
>>>>>>>> IMO: Completely push down aggregate is generally hard for
>>>>>>>> distributed systems. Usually we need a GROUP BY and exactly
>>>>>>>> matches the partition mode in downstream storage. At the same time,
>>>>>>>> the benefit of remove the final aggregate is actually limited.
>>>>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and
>>>>>>>> IO benefits. But I also agree that
>>>>>>>> the SupportsAggregatePushDown interface should be as generic as
>>>>>>>> possible for future extensions, and meanwhile keep confidence
>>>>>>>> in the interface we design.
>>>>>>>>
>>>>>>>> For core points (a): As the complexity of aggregate, one
>>>>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / 
>>>>>>>> Exchange
>>>>>>>> / FinalXXAgg"
>>>>>>>> in physical phase. Seems that we can't solve all cases with only
>>>>>>>> one rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>>>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>>>>>>
>>>>>>>> For core points (b & c): I think we can change the interface to be:
>>>>>>>> ```
>>>>>>>>
>>>>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>>>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>>>>>>> groupingSets);
>>>>>>>>
>>>>>>>> ```
>>>>>>>>
>>>>>>>>
>>>>>>>> Simple Group: groupingSets.size() == 1 &&
>>>>>>>> groupingSets.get(0).equals(groupingFields)
>>>>>>>>
>>>>>>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>>>>>>> groupingFields.cardinality())
>>>>>>>> Rollup:
>>>>>>>> Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>>>>>>
>>>>>>>> Then we can handle the complex grouping case. The Connector
>>>>>>>> developer of the downstream storage should determine
>>>>>>>> whether it supports the associated grouping type. For the filter
>>>>>>>> and having clause, they will convert to be related Calc RelNode,
>>>>>>>> and no longer in the LocalAggregate node, the CallExpression may be
>>>>>>>> sufficient to express the semantics of AggregateCall.
>>>>>>>>
>>>>>>>> What do you think? Looking forward to our further discussion.
>>>>>>>>
>>>>>>>>
>>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月6日周三 下午2:24写道:
>>>>>>>>
>>>>>>>>> > I think filter expressions and grouping sets are semantic
>>>>>>>>> arguments instead of utilities. If we want to push them into sources, 
>>>>>>>>> the
>>>>>>>>> connector developers should be aware of them.Wrapping them in a 
>>>>>>>>> context
>>>>>>>>> implicitly is error-prone that the existing connector will produce 
>>>>>>>>> wrong
>>>>>>>>> results when upgrading to new Flink versions.
>>>>>>>>>
>>>>>>>>> We can have some mechanism to check the upgrading.
>>>>>>>>>
>>>>>>>>> > I think for these cases, providing a new default method to
>>>>>>>>> override might be a better choice.
>>>>>>>>>
>>>>>>>>> Then we will have three or more methods. For the API level, I
>>>>>>>>> really don't like it...
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong
>>>>>>>>>
>>>>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I think filter expressions and grouping sets are semantic
>>>>>>>>>> arguments instead of utilities.
>>>>>>>>>> If we want to push them into sources, the connector developers
>>>>>>>>>> should be aware of them.
>>>>>>>>>> Wrapping them in a context implicitly is error-prone that the
>>>>>>>>>> existing connector will produce wrong results
>>>>>>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>>>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>>>>>>> I think for these cases, providing a new default method to
>>>>>>>>>> override might be a better choice.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jark
>>>>>>>>>>
>>>>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm also curious about aggregate with filter (COUNT(1)
>>>>>>>>>>> FILTER(WHERE d > 1)). Can we push it down? I'm not sure that a 
>>>>>>>>>>> single call
>>>>>>>>>>> expression can express it, and how we should embody it and convey 
>>>>>>>>>>> it to
>>>>>>>>>>> users.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jingsong
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <
>>>>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>>
>>>>>>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>>>>>>
>>>>>>>>>>>> So, this rule can do something more advanced. For example, we
>>>>>>>>>>>> can push down group sets to source too, for the SQL: "GROUP BY 
>>>>>>>>>>>> GROUPING
>>>>>>>>>>>> SETS (f1, f2)". Then, we need to add more information to push down.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I think this may be over designed. We should have confidence
>>>>>>>>>>>>> in the interface we design, the interface should be stable.
>>>>>>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>>>>>>> convenience.
>>>>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do
>>>>>>>>>>>>> you know any potential parameters?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jark
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <
>>>>>>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Sebastian,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Well, I mean:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType 
>>>>>>>>>>>>>> producedDataType);`
>>>>>>>>>>>>>> VS
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> interface Aggregation {
>>>>>>>>>>>>>>   int[] groupingFields();
>>>>>>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>>>>>>   DataType producedDataType();
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the 
>>>>>>>>>>>>>> future, so
>>>>>>>>>>>>>> make the parameters interface, which is conducive to the future 
>>>>>>>>>>>>>> expansion
>>>>>>>>>>>>>> without destroying the compatibility of user implementation. If 
>>>>>>>>>>>>>> it is the
>>>>>>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>>>>>>> liuyang0...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to
>>>>>>>>>>>>>>> be clear in the proposal.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For the semantic problem, I think the main point is the
>>>>>>>>>>>>>>> different returned data types
>>>>>>>>>>>>>>> for the target aggregate function and the row format
>>>>>>>>>>>>>>> returned by the underlying storage.
>>>>>>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>>>>>>> Need to let developers know that we need to handle the
>>>>>>>>>>>>>>> semantic differences between
>>>>>>>>>>>>>>> the underlying storage system and Flink in related
>>>>>>>>>>>>>>> connectors. [Supplemented in proposal]
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE
>>>>>>>>>>>>>>> rule set, and better to put it
>>>>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For the scalability of the interface, actually I don't
>>>>>>>>>>>>>>> exactly understand your suggestion. Is it to add
>>>>>>>>>>>>>>> an abstract class, to implement the
>>>>>>>>>>>>>>> SupportsAggregatePushDown interface, and holds the
>>>>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful
>>>>>>>>>>>>>>>> discussion has solved
>>>>>>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as
>>>>>>>>>>>>>>>> far as I know,
>>>>>>>>>>>>>>>> the semantics of each database is actually different, which
>>>>>>>>>>>>>>>> may need to be
>>>>>>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For example, the AVG output types of various databases may
>>>>>>>>>>>>>>>> be different.
>>>>>>>>>>>>>>>> For example, MySQL outputs double, this is different from
>>>>>>>>>>>>>>>> Flink. What
>>>>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and
>>>>>>>>>>>>>>>> count, But we also
>>>>>>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>>>>>>> phase, which may
>>>>>>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> ## About interface
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>>>>>>> `Aggregate`
>>>>>>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this
>>>>>>>>>>>>>>>> way, we can add
>>>>>>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think the current design is very good, just put forward
>>>>>>>>>>>>>>>> some ideas.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>>>>>>> liuyang0...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some
>>>>>>>>>>>>>>>> adjustments.
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if
>>>>>>>>>>>>>>>> the TableSource can
>>>>>>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>>>>>>> > It's better to just return a boolean type to indicate
>>>>>>>>>>>>>>>> whether all of
>>>>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>>>>>>> proposal]
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the
>>>>>>>>>>>>>>>> produced data type of
>>>>>>>>>>>>>>>> > the new table source to make sure that the new table
>>>>>>>>>>>>>>>> source can
>>>>>>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg
>>>>>>>>>>>>>>>> function's return type.
>>>>>>>>>>>>>>>> > The reason for adding this parameter in this function is
>>>>>>>>>>>>>>>> also to facilitate
>>>>>>>>>>>>>>>> > the user to build the final output type. I have changed
>>>>>>>>>>>>>>>> this parameter
>>>>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>>>>>>> changed to use
>>>>>>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > final class Result {
>>>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>>>> acceptedAggregates;
>>>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>>>>>>> > > }
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a
>>>>>>>>>>>>>>>> boolean return type
>>>>>>>>>>>>>>>> > > enough?
>>>>>>>>>>>>>>>> > >     From my understanding, all of the aggregates should
>>>>>>>>>>>>>>>> be accepted,
>>>>>>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>>>>>>> aggregates are
>>>>>>>>>>>>>>>> > > "accepted".
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced
>>>>>>>>>>>>>>>> data type of the
>>>>>>>>>>>>>>>> > > new source, or just the return type of all the agg
>>>>>>>>>>>>>>>> functions?
>>>>>>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for
>>>>>>>>>>>>>>>> users to concat a
>>>>>>>>>>>>>>>> > final
>>>>>>>>>>>>>>>> > > output type.
>>>>>>>>>>>>>>>> > >     The return type of each agg function can be
>>>>>>>>>>>>>>>> obtained from the
>>>>>>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to
>>>>>>>>>>>>>>>> `grouping` or
>>>>>>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>>>>>>> "grouping sets".
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > What do you think?
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > Best,
>>>>>>>>>>>>>>>> > > Jark
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <
>>>>>>>>>>>>>>>> ykt...@gmail.com> wrote:
>>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>>> > >> Best,
>>>>>>>>>>>>>>>> > >> Kurt
>>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>>>> > >> wrote:
>>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is
>>>>>>>>>>>>>>>> more like a
>>>>>>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2
>>>>>>>>>>>>>>>> which handle all in
>>>>>>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator
>>>>>>>>>>>>>>>> rather than logical
>>>>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and
>>>>>>>>>>>>>>>> valuable suggestions.
>>>>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of
>>>>>>>>>>>>>>>> upgrading the new
>>>>>>>>>>>>>>>> > table
>>>>>>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>>>>>>> > >>>> > we really should consider the new interface for
>>>>>>>>>>>>>>>> the new optimize
>>>>>>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>>>>>>> sooner or later. I
>>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>>>>>>> > >>>> > the ability interface for the
>>>>>>>>>>>>>>>> SupportsAggregatePushDown definition
>>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>>> > >>>> above
>>>>>>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>>>>>>> better choice, and
>>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC
>>>>>>>>>>>>>>>> connector, as we don't
>>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>>>>>>> should use idea 1
>>>>>>>>>>>>>>>> > >>>> or
>>>>>>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement
>>>>>>>>>>>>>>>> on the proposal,
>>>>>>>>>>>>>>>> > our
>>>>>>>>>>>>>>>> > >>>> team
>>>>>>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二
>>>>>>>>>>>>>>>> 下午2:58写道:
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource
>>>>>>>>>>>>>>>> in 1.11 and
>>>>>>>>>>>>>>>> > proposed
>>>>>>>>>>>>>>>> > >>>> a new
>>>>>>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>>>>>>> update your
>>>>>>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown,
>>>>>>>>>>>>>>>> SupportsProjectionPushDown.
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>>>>>>> better
>>>>>>>>>>>>>>>> > >>>> representation
>>>>>>>>>>>>>>>> > >>>> > than
>>>>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because,
>>>>>>>>>>>>>>>> it would be
>>>>>>>>>>>>>>>> > easier
>>>>>>>>>>>>>>>> > >>>> to
>>>>>>>>>>>>>>>> > >>>> > know
>>>>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors
>>>>>>>>>>>>>>>> will be supported
>>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>>> > >>>> the
>>>>>>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>>>>>>> > liuyang0...@gmail.com>
>>>>>>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the
>>>>>>>>>>>>>>>> Blink Planner.
>>>>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently
>>>>>>>>>>>>>>>> fully done at Flink
>>>>>>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many
>>>>>>>>>>>>>>>> downstream storage of Flink
>>>>>>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>>>>>>> > >>>> > has
>>>>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer
>>>>>>>>>>>>>>>> will improve
>>>>>>>>>>>>>>>> > >>>> performance
>>>>>>>>>>>>>>>> > >>>> > from
>>>>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and
>>>>>>>>>>>>>>>> computation overhead.
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new
>>>>>>>>>>>>>>>> feature.
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese
>>>>>>>>>>>>>>>> Academy of Science
>>>>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com <
>>>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > --
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy
>>>>>>>>>>>>>>>> of Science
>>>>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>>> > >>>> > E-mail: liuyang0...@gmail.com <
>>>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> --
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > --
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> > *With kind regards
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *With kind regards
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> *With kind regards
>>>>>>>> ------------------------------------------------------------
>>>>>>>> Sebastian Liu 刘洋
>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>> QQ: 3239559*
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *With kind regards
>>>>>> ------------------------------------------------------------
>>>>>> Sebastian Liu 刘洋
>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>> Mobile\WeChat: +86—15201613655
>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>> QQ: 3239559*
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>> --
>>>
>>> *With kind regards
>>> ------------------------------------------------------------
>>> Sebastian Liu 刘洋
>>> Institute of Computing Technology, Chinese Academy of Science
>>> Mobile\WeChat: +86—15201613655
>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>> QQ: 3239559*
>>>
>>>
>
> --
>
> *With kind regards
> ------------------------------------------------------------
> Sebastian Liu 刘洋
> Institute of Computing Technology, Chinese Academy of Science
> Mobile\WeChat: +86—15201613655
> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
> QQ: 3239559*
>
>

Reply via email to