Hi Jark,

Cool, following your suggestions I have created three related subtasks
under Flink-20791.
Hope to assign these subtasks to me too, when you have time. And I
will push forward the relevant implementation.

Jark Wu <imj...@gmail.com> 于2021年1月8日周五 下午12:30写道:

> Hi Sebastian,
>
> I assigned the issue to you. But I suggest creating sub-tasks under this
> issue. Because I think this would be a big contribution.
> For example, you can split it into:
> 1. Introduce SupportsAggregatePushDown interface
> 2. Support SupportsAggregatePushDown in planner
> 3. Support SupportsAggregatePushDown for JDBC source
> 4. ...
>
> Best,
> Jark
>
> On Thu, 7 Jan 2021 at 23:27, Sebastian Liu <liuyang0...@gmail.com> wrote:
>
>> Hi Jark,
>>
>> Seems that we have reached the agreement on the proposal. Could you
>> please help to assign the below jira ticket to me?
>> https://issues.apache.org/jira/browse/FLINK-20791
>>
>> Jark Wu <imj...@gmail.com> 于2021年1月7日周四 上午10:25写道:
>>
>>> Thanks for updating the design doc.
>>> It looks good to me.
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <jingsongl...@gmail.com> wrote:
>>>
>>>> Sounds good to me.
>>>>
>>>> We don't have to worry about future changes, because it has covered all
>>>> the capabilities of calcite aggregation.
>>>>
>>>> Best,
>>>> Jingsong
>>>>
>>>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <liuyang0...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Jark,
>>>>>
>>>>> Sounds good to me. For better scalability in the future, we could add
>>>>> the AggregateExpression.
>>>>> ```
>>>>>
>>>>> public class AggregateExpression implements ResolvedExpression {
>>>>>
>>>>>    private final FunctionDefinition functionDefinition;
>>>>>
>>>>>    private final List<FieldReferenceExpression> args;
>>>>>
>>>>>    private final @Nullable CallExpression filterExpression;
>>>>>
>>>>>    private final DataType resultType;
>>>>>
>>>>>    private final boolean distinct;
>>>>>
>>>>>    private final boolean approximate;
>>>>>
>>>>>
>>>>>
>>>>>    private final boolean ignoreNulls;
>>>>>
>>>>> }
>>>>> ```
>>>>>
>>>>> And we really only need one GroupingSets parameter for grouping. I
>>>>> have updated the related interface in the proposal.
>>>>> Appreciate the continued feedback and help.
>>>>>
>>>>> Jark Wu <imj...@gmail.com> 于2021年1月6日周三 下午9:34写道:
>>>>>
>>>>>> Hi Liu, Jingsong,
>>>>>>
>>>>>> Regarding the agg with filter, I think in theory we can support
>>>>>> pushing such a pattern into source.
>>>>>> We don't need to support it in the first version, but in the long
>>>>>> term, we can support it.
>>>>>> The designed interface should be future proof.
>>>>>>
>>>>>> Considering filter arg and distinct flag should be part of the
>>>>>> aggregate expression.
>>>>>> I'm wondering if CallExpression is a good representation for it.
>>>>>> What do you think about proposing the following `AggregateExpression`
>>>>>> to replace the `CallExpression`?
>>>>>>
>>>>>> class AggregateExpression implements ResolvedExpression {
>>>>>>     private final FunctionDefinition functionDefinition;
>>>>>>     private final List<FieldReferenceExpression> args;
>>>>>>     private final @Nullable CallExpression filterExpr;
>>>>>>     private final boolean distinct;
>>>>>> }
>>>>>>
>>>>>> Besides, we don't need both groupingFields and groupingSets.
>>>>>> `groupingSets` should be a superset of groupingFields.
>>>>>> Then the interface of SupportsAggregatePushDown can be:
>>>>>>
>>>>>> interface SupportsAggregatePushDown {
>>>>>>
>>>>>>   boolean applyAggregates(
>>>>>>     List<int[]> groupingSets,
>>>>>>     List<AggregateExpression> aggregates,
>>>>>>     DataType producedDataType);
>>>>>> }
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <liuyang0...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Jingsong, Jark,
>>>>>>>
>>>>>>> Thx so much for our discussion, and the cases mentioned above are
>>>>>>> really worthy for further discussion.
>>>>>>>
>>>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1)
>>>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center;
>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) ->
>>>>>>> LocalAgg -> Exchange -> FinalAgg.
>>>>>>> As there is a Calc above the TableSource, this pattern can't match
>>>>>>> the LocalAggPushDownRule in the current design.
>>>>>>>
>>>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from
>>>>>>> call_center group by rollup(cc_class, cc_employees);
>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>>> Calc.
>>>>>>> It's also not covered by the current LocalAggPushDownRule design.
>>>>>>>
>>>>>>> 3. I want to add a case which we haven't discussed yet.
>>>>>>> Aggregate with Having clause.
>>>>>>> eg: select COUNT(1) from call_center group by cc_class having
>>>>>>> max(cc_tax_percentage) > 0.2;
>>>>>>> For the current Blink Planner, the optimized plan will be:
>>>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg ->
>>>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]).
>>>>>>>
>>>>>>> The core discussion points are summarized as follows:
>>>>>>> a) Aggregate is a more complex scenario than predicates or limits,
>>>>>>> and also depends on the different underlying storages.
>>>>>>> b) One rule seems can't completely cover all aggregate scenario, but
>>>>>>> whether SupportSAggregatePushDown interface can be a bit more general?
>>>>>>> c) Could the CallExpression express the semantics of CalCite
>>>>>>> AggregateCall?
>>>>>>>
>>>>>>> IMO: Completely push down aggregate is generally hard for
>>>>>>> distributed systems. Usually we need a GROUP BY and exactly
>>>>>>> matches the partition mode in downstream storage. At the same time,
>>>>>>> the benefit of remove the final aggregate is actually limited.
>>>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and
>>>>>>> IO benefits. But I also agree that
>>>>>>> the SupportsAggregatePushDown interface should be as generic as
>>>>>>> possible for future extensions, and meanwhile keep confidence
>>>>>>> in the interface we design.
>>>>>>>
>>>>>>> For core points (a): As the complexity of aggregate, one
>>>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / 
>>>>>>> Exchange
>>>>>>> / FinalXXAgg"
>>>>>>> in physical phase. Seems that we can't solve all cases with only one
>>>>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only
>>>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present.
>>>>>>>
>>>>>>> For core points (b & c): I think we can change the interface to be:
>>>>>>> ```
>>>>>>>
>>>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>>>> aggregateExpressions, DataType producedDataType, List<int[]>
>>>>>>> groupingSets);
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>>
>>>>>>> Simple Group: groupingSets.size() == 1 &&
>>>>>>> groupingSets.get(0).equals(groupingFields)
>>>>>>>
>>>>>>> Cube Group: groupingSets.size() == IntMath.pow(2,
>>>>>>> groupingFields.cardinality())
>>>>>>> Rollup:
>>>>>>> Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup
>>>>>>>
>>>>>>> Then we can handle the complex grouping case. The Connector
>>>>>>> developer of the downstream storage should determine
>>>>>>> whether it supports the associated grouping type. For the filter and
>>>>>>> having clause, they will convert to be related Calc RelNode,
>>>>>>> and no longer in the LocalAggregate node, the CallExpression may be
>>>>>>> sufficient to express the semantics of AggregateCall.
>>>>>>>
>>>>>>> What do you think? Looking forward to our further discussion.
>>>>>>>
>>>>>>>
>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月6日周三 下午2:24写道:
>>>>>>>
>>>>>>>> > I think filter expressions and grouping sets are semantic
>>>>>>>> arguments instead of utilities. If we want to push them into sources, 
>>>>>>>> the
>>>>>>>> connector developers should be aware of them.Wrapping them in a context
>>>>>>>> implicitly is error-prone that the existing connector will produce 
>>>>>>>> wrong
>>>>>>>> results when upgrading to new Flink versions.
>>>>>>>>
>>>>>>>> We can have some mechanism to check the upgrading.
>>>>>>>>
>>>>>>>> > I think for these cases, providing a new default method to
>>>>>>>> override might be a better choice.
>>>>>>>>
>>>>>>>> Then we will have three or more methods. For the API level, I
>>>>>>>> really don't like it...
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong
>>>>>>>>
>>>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I think filter expressions and grouping sets are semantic
>>>>>>>>> arguments instead of utilities.
>>>>>>>>> If we want to push them into sources, the connector developers
>>>>>>>>> should be aware of them.
>>>>>>>>> Wrapping them in a context implicitly is error-prone that the
>>>>>>>>> existing connector will produce wrong results
>>>>>>>>>  when upgrading to new Flink versions (as we are pushing
>>>>>>>>> grouping_sets/filter_args, but connector ignores it).
>>>>>>>>> I think for these cases, providing a new default method to
>>>>>>>>> override might be a better choice.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jark
>>>>>>>>>
>>>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm also curious about aggregate with filter (COUNT(1)
>>>>>>>>>> FILTER(WHERE d > 1)). Can we push it down? I'm not sure that a 
>>>>>>>>>> single call
>>>>>>>>>> expression can express it, and how we should embody it and convey it 
>>>>>>>>>> to
>>>>>>>>>> users.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jingsong
>>>>>>>>>>
>>>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <
>>>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>
>>>>>>>>>>> I don't want to limit this interface to LocalAgg Push down.
>>>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too.
>>>>>>>>>>>
>>>>>>>>>>> So, this rule can do something more advanced. For example, we
>>>>>>>>>>> can push down group sets to source too, for the SQL: "GROUP BY 
>>>>>>>>>>> GROUPING
>>>>>>>>>>> SETS (f1, f2)". Then, we need to add more information to push down.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jingsong
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I think this may be over designed. We should have confidence in
>>>>>>>>>>>> the interface we design, the interface should be stable.
>>>>>>>>>>>> Wrapping things in a big context has a cost of losing user
>>>>>>>>>>>> convenience.
>>>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do
>>>>>>>>>>>> you know any potential parameters?
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Jark
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <
>>>>>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Sebastian,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Well, I mean:
>>>>>>>>>>>>>
>>>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields,
>>>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType 
>>>>>>>>>>>>> producedDataType);`
>>>>>>>>>>>>> VS
>>>>>>>>>>>>> ```
>>>>>>>>>>>>> boolean applyAggregates(Aggregation agg);
>>>>>>>>>>>>>
>>>>>>>>>>>>> interface Aggregation {
>>>>>>>>>>>>>   int[] groupingFields();
>>>>>>>>>>>>>   List<CallExpression> aggregateExpressions();
>>>>>>>>>>>>>   DataType producedDataType();
>>>>>>>>>>>>> }
>>>>>>>>>>>>> ```
>>>>>>>>>>>>>
>>>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the 
>>>>>>>>>>>>> future, so
>>>>>>>>>>>>> make the parameters interface, which is conducive to the future 
>>>>>>>>>>>>> expansion
>>>>>>>>>>>>> without destroying the compatibility of user implementation. If 
>>>>>>>>>>>>> it is the
>>>>>>>>>>>>> way before, users need to modify the code.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <
>>>>>>>>>>>>> liuyang0...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Jinsong,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be
>>>>>>>>>>>>>> clear in the proposal.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For the semantic problem, I think the main point is the
>>>>>>>>>>>>>> different returned data types
>>>>>>>>>>>>>> for the target aggregate function and the row format returned
>>>>>>>>>>>>>> by the underlying storage.
>>>>>>>>>>>>>> That's why we provide the producedDataType in the
>>>>>>>>>>>>>> SupportsAggregatePushDown interface.
>>>>>>>>>>>>>> Need to let developers know that we need to handle the
>>>>>>>>>>>>>> semantic differences between
>>>>>>>>>>>>>> the underlying storage system and Flink in related
>>>>>>>>>>>>>> connectors. [Supplemented in proposal]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule
>>>>>>>>>>>>>> rule, it's also a key point.
>>>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE
>>>>>>>>>>>>>> rule set, and better to put it
>>>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For the scalability of the interface, actually I don't
>>>>>>>>>>>>>> exactly understand your suggestion. Is it to add
>>>>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>>>>>>>>>> interface, and holds the
>>>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[]
>>>>>>>>>>>>>> GroupingFields, DataType producedDataType`
>>>>>>>>>>>>>> fields?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Looking forward to your further feedback or guidance.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful
>>>>>>>>>>>>>>> discussion has solved
>>>>>>>>>>>>>>> many of my concerns.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ## Semantic problems
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as
>>>>>>>>>>>>>>> far as I know,
>>>>>>>>>>>>>>> the semantics of each database is actually different, which
>>>>>>>>>>>>>>> may need to be
>>>>>>>>>>>>>>> reflected in your specific implementation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For example, the AVG output types of various databases may
>>>>>>>>>>>>>>> be different.
>>>>>>>>>>>>>>> For example, MySQL outputs double, this is different from
>>>>>>>>>>>>>>> Flink. What
>>>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and
>>>>>>>>>>>>>>> count, But we also
>>>>>>>>>>>>>>> need care about decimal and others)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ## The phase of push-down rule
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano
>>>>>>>>>>>>>>> phase, which may
>>>>>>>>>>>>>>> make the cost calculation very troublesome.
>>>>>>>>>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ## About interface
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>>>>>>>>>> `Aggregate`
>>>>>>>>>>>>>>> interface, it contains `List<CallExpression>
>>>>>>>>>>>>>>> aggregateExpressions, int[]
>>>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this
>>>>>>>>>>>>>>> way, we can add
>>>>>>>>>>>>>>> fields easily without breaking compatibility.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think the current design is very good, just put forward
>>>>>>>>>>>>>>> some ideas.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Jingsong
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <
>>>>>>>>>>>>>>> liuyang0...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > Hi Jark,
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>>>>>>>>>> TableSource can
>>>>>>>>>>>>>>> > handle all of the aggregates.
>>>>>>>>>>>>>>> > It's better to just return a boolean type to indicate
>>>>>>>>>>>>>>> whether all of
>>>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in
>>>>>>>>>>>>>>> proposal]
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the
>>>>>>>>>>>>>>> produced data type of
>>>>>>>>>>>>>>> > the new table source to make sure that the new table
>>>>>>>>>>>>>>> source can
>>>>>>>>>>>>>>> > connect with the related exchange node. The format of this
>>>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's
>>>>>>>>>>>>>>> return type.
>>>>>>>>>>>>>>> > The reason for adding this parameter in this function is
>>>>>>>>>>>>>>> also to facilitate
>>>>>>>>>>>>>>> > the user to build the final output type. I have changed
>>>>>>>>>>>>>>> this parameter
>>>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have
>>>>>>>>>>>>>>> changed to use
>>>>>>>>>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further
>>>>>>>>>>>>>>> discussion.
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道:
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > > I'm also +1 for idea#2.
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Regarding to the updated interface,
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression>
>>>>>>>>>>>>>>> aggregateExpressions,
>>>>>>>>>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > final class Result {
>>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>>> acceptedAggregates;
>>>>>>>>>>>>>>> > >        private final List<CallExpression>
>>>>>>>>>>>>>>> remainingAggregates;
>>>>>>>>>>>>>>> > > }
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > I have following comments:
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a
>>>>>>>>>>>>>>> boolean return type
>>>>>>>>>>>>>>> > > enough?
>>>>>>>>>>>>>>> > >     From my understanding, all of the aggregates should
>>>>>>>>>>>>>>> be accepted,
>>>>>>>>>>>>>>> > > otherwise the pushdown should fail.
>>>>>>>>>>>>>>> > >     Therefore, users don't need to distinguish which
>>>>>>>>>>>>>>> aggregates are
>>>>>>>>>>>>>>> > > "accepted".
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced
>>>>>>>>>>>>>>> data type of the
>>>>>>>>>>>>>>> > > new source, or just the return type of all the agg
>>>>>>>>>>>>>>> functions?
>>>>>>>>>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users
>>>>>>>>>>>>>>> to concat a
>>>>>>>>>>>>>>> > final
>>>>>>>>>>>>>>> > > output type.
>>>>>>>>>>>>>>> > >     The return type of each agg function can be obtained
>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>> > > `CallExpression`.
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to
>>>>>>>>>>>>>>> `grouping` or
>>>>>>>>>>>>>>> > > `groupedFields` ?
>>>>>>>>>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>>>>>>>>>> "grouping sets".
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > What do you think?
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > Best,
>>>>>>>>>>>>>>> > > Jark
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <
>>>>>>>>>>>>>>> ykt...@gmail.com> wrote:
>>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>>> > >> Sorry for the typo -_-!
>>>>>>>>>>>>>>> > >> I meant idea #2.
>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>> > >> Best,
>>>>>>>>>>>>>>> > >> Kurt
>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>>> > >> wrote:
>>>>>>>>>>>>>>> > >>
>>>>>>>>>>>>>>> > >>> Hi Kurt,
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is
>>>>>>>>>>>>>>> more like a
>>>>>>>>>>>>>>> > >>> physical operator rather than logical
>>>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2
>>>>>>>>>>>>>>> which handle all in
>>>>>>>>>>>>>>> > >>> the physical optimization phase?
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道:
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator
>>>>>>>>>>>>>>> rather than logical
>>>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>> Best,
>>>>>>>>>>>>>>> > >>>> Kurt
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>>> > >>>> wrote:
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and
>>>>>>>>>>>>>>> valuable suggestions.
>>>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of
>>>>>>>>>>>>>>> upgrading the new
>>>>>>>>>>>>>>> > table
>>>>>>>>>>>>>>> > >>>> > source api,
>>>>>>>>>>>>>>> > >>>> > we really should consider the new interface for the
>>>>>>>>>>>>>>> new optimize
>>>>>>>>>>>>>>> > >>>> rule. If
>>>>>>>>>>>>>>> > >>>> > the new rule
>>>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it
>>>>>>>>>>>>>>> sooner or later. I
>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>> > >>>> > change to use
>>>>>>>>>>>>>>> > >>>> > the ability interface for the
>>>>>>>>>>>>>>> SupportsAggregatePushDown definition
>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>> > >>>> above
>>>>>>>>>>>>>>> > >>>> > proposal.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a
>>>>>>>>>>>>>>> better choice, and
>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>> > >>>> > resolved this
>>>>>>>>>>>>>>> > >>>> > comment in the proposal.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC
>>>>>>>>>>>>>>> connector, as we don't
>>>>>>>>>>>>>>> > >>>> have
>>>>>>>>>>>>>>> > >>>> > Druid connector
>>>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we
>>>>>>>>>>>>>>> should use idea 1
>>>>>>>>>>>>>>> > >>>> or
>>>>>>>>>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > What do you think?  After we reach the agreement on
>>>>>>>>>>>>>>> the proposal,
>>>>>>>>>>>>>>> > our
>>>>>>>>>>>>>>> > >>>> team
>>>>>>>>>>>>>>> > >>>> > can drive to
>>>>>>>>>>>>>>> > >>>> > complete this feature.
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 下午2:58写道:
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>>>>>>>>>> improvement for
>>>>>>>>>>>>>>> > >>>> Flink
>>>>>>>>>>>>>>> > >>>> > SQL.
>>>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following
>>>>>>>>>>>>>>> thoughts:
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in
>>>>>>>>>>>>>>> 1.11 and
>>>>>>>>>>>>>>> > proposed
>>>>>>>>>>>>>>> > >>>> a new
>>>>>>>>>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you
>>>>>>>>>>>>>>> update your
>>>>>>>>>>>>>>> > >>>> proposal to
>>>>>>>>>>>>>>> > >>>> > > use the new interfaces?
>>>>>>>>>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown,
>>>>>>>>>>>>>>> SupportsProjectionPushDown.
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a
>>>>>>>>>>>>>>> better
>>>>>>>>>>>>>>> > >>>> representation
>>>>>>>>>>>>>>> > >>>> > than
>>>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because,
>>>>>>>>>>>>>>> it would be
>>>>>>>>>>>>>>> > easier
>>>>>>>>>>>>>>> > >>>> to
>>>>>>>>>>>>>>> > >>>> > know
>>>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors
>>>>>>>>>>>>>>> will be supported
>>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>>> > >>>> the
>>>>>>>>>>>>>>> > >>>> > > plan?
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > Best,
>>>>>>>>>>>>>>> > >>>> > > Jark
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>>>>>>>>>> > liuyang0...@gmail.com>
>>>>>>>>>>>>>>> > >>>> > wrote:
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> > > > Hi all,
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink
>>>>>>>>>>>>>>> Planner.
>>>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently
>>>>>>>>>>>>>>> fully done at Flink
>>>>>>>>>>>>>>> > >>>> layer.
>>>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream
>>>>>>>>>>>>>>> storage of Flink
>>>>>>>>>>>>>>> > >>>> SQL
>>>>>>>>>>>>>>> > >>>> > has
>>>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer
>>>>>>>>>>>>>>> will improve
>>>>>>>>>>>>>>> > >>>> performance
>>>>>>>>>>>>>>> > >>>> > from
>>>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and
>>>>>>>>>>>>>>> computation overhead.
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new
>>>>>>>>>>>>>>> feature.
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > --
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > > > *With kind regards
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese
>>>>>>>>>>>>>>> Academy of Science
>>>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com <
>>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>>>>>>>>>> > >>>> > > >
>>>>>>>>>>>>>>> > >>>> > >
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > --
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>> > *With kind regards
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy
>>>>>>>>>>>>>>> of Science
>>>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> > >>>> > E-mail: liuyang0...@gmail.com <
>>>>>>>>>>>>>>> liuyang0...@gmail.com>
>>>>>>>>>>>>>>> > >>>> > QQ: 3239559*
>>>>>>>>>>>>>>> > >>>> >
>>>>>>>>>>>>>>> > >>>>
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> --
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>> *With kind regards
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>>>> > >>> QQ: 3239559*
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> > >>>
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > --
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> > *With kind regards
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>>> > Sebastian Liu 刘洋
>>>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>>>>>>>>>> Science
>>>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>>>> > QQ: 3239559*
>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *With kind regards
>>>>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>>>> Sebastian Liu 刘洋
>>>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>>>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>>>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>>>>>>>>> QQ: 3239559*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *With kind regards
>>>>>>> ------------------------------------------------------------
>>>>>>> Sebastian Liu 刘洋
>>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>> Mobile\WeChat: +86—15201613655
>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>> QQ: 3239559*
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *With kind regards
>>>>> ------------------------------------------------------------
>>>>> Sebastian Liu 刘洋
>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>> Mobile\WeChat: +86—15201613655
>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>> QQ: 3239559*
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>> --
>>
>> *With kind regards
>> ------------------------------------------------------------
>> Sebastian Liu 刘洋
>> Institute of Computing Technology, Chinese Academy of Science
>> Mobile\WeChat: +86—15201613655
>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>> QQ: 3239559*
>>
>>

-- 

*With kind regards
------------------------------------------------------------
Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
QQ: 3239559*

Reply via email to