Great! Thanks for pushing this work. Looking forward to the pull requests. Best, Jark
On Fri, 8 Jan 2021 at 17:57, Sebastian Liu <liuyang0...@gmail.com> wrote: > Hi Jark, > > Cool, following your suggestions I have created three related subtasks > under Flink-20791. > Hope to assign these subtasks to me too, when you have time. And I > will push forward the relevant implementation. > > Jark Wu <imj...@gmail.com> 于2021年1月8日周五 下午12:30写道: > >> Hi Sebastian, >> >> I assigned the issue to you. But I suggest creating sub-tasks under this >> issue. Because I think this would be a big contribution. >> For example, you can split it into: >> 1. Introduce SupportsAggregatePushDown interface >> 2. Support SupportsAggregatePushDown in planner >> 3. Support SupportsAggregatePushDown for JDBC source >> 4. ... >> >> Best, >> Jark >> >> On Thu, 7 Jan 2021 at 23:27, Sebastian Liu <liuyang0...@gmail.com> wrote: >> >>> Hi Jark, >>> >>> Seems that we have reached the agreement on the proposal. Could you >>> please help to assign the below jira ticket to me? >>> https://issues.apache.org/jira/browse/FLINK-20791 >>> >>> Jark Wu <imj...@gmail.com> 于2021年1月7日周四 上午10:25写道: >>> >>>> Thanks for updating the design doc. >>>> It looks good to me. >>>> >>>> Best, >>>> Jark >>>> >>>> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <jingsongl...@gmail.com> >>>> wrote: >>>> >>>>> Sounds good to me. >>>>> >>>>> We don't have to worry about future changes, because it has covered >>>>> all the capabilities of calcite aggregation. >>>>> >>>>> Best, >>>>> Jingsong >>>>> >>>>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <liuyang0...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Jark, >>>>>> >>>>>> Sounds good to me. For better scalability in the future, we could add >>>>>> the AggregateExpression. >>>>>> ``` >>>>>> >>>>>> public class AggregateExpression implements ResolvedExpression { >>>>>> >>>>>> private final FunctionDefinition functionDefinition; >>>>>> >>>>>> private final List<FieldReferenceExpression> args; >>>>>> >>>>>> private final @Nullable CallExpression filterExpression; >>>>>> >>>>>> private final DataType resultType; >>>>>> >>>>>> private final boolean distinct; >>>>>> >>>>>> private final boolean approximate; >>>>>> >>>>>> >>>>>> >>>>>> private final boolean ignoreNulls; >>>>>> >>>>>> } >>>>>> ``` >>>>>> >>>>>> And we really only need one GroupingSets parameter for grouping. I >>>>>> have updated the related interface in the proposal. >>>>>> Appreciate the continued feedback and help. >>>>>> >>>>>> Jark Wu <imj...@gmail.com> 于2021年1月6日周三 下午9:34写道: >>>>>> >>>>>>> Hi Liu, Jingsong, >>>>>>> >>>>>>> Regarding the agg with filter, I think in theory we can support >>>>>>> pushing such a pattern into source. >>>>>>> We don't need to support it in the first version, but in the long >>>>>>> term, we can support it. >>>>>>> The designed interface should be future proof. >>>>>>> >>>>>>> Considering filter arg and distinct flag should be part of the >>>>>>> aggregate expression. >>>>>>> I'm wondering if CallExpression is a good representation for it. >>>>>>> What do you think about proposing the following >>>>>>> `AggregateExpression` to replace the `CallExpression`? >>>>>>> >>>>>>> class AggregateExpression implements ResolvedExpression { >>>>>>> private final FunctionDefinition functionDefinition; >>>>>>> private final List<FieldReferenceExpression> args; >>>>>>> private final @Nullable CallExpression filterExpr; >>>>>>> private final boolean distinct; >>>>>>> } >>>>>>> >>>>>>> Besides, we don't need both groupingFields and groupingSets. >>>>>>> `groupingSets` should be a superset of groupingFields. >>>>>>> Then the interface of SupportsAggregatePushDown can be: >>>>>>> >>>>>>> interface SupportsAggregatePushDown { >>>>>>> >>>>>>> boolean applyAggregates( >>>>>>> List<int[]> groupingSets, >>>>>>> List<AggregateExpression> aggregates, >>>>>>> DataType producedDataType); >>>>>>> } >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <liuyang0...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jingsong, Jark, >>>>>>>> >>>>>>>> Thx so much for our discussion, and the cases mentioned above are >>>>>>>> really worthy for further discussion. >>>>>>>> >>>>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1) >>>>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center; >>>>>>>> For the current Blink Planner, the optimized plan will be: >>>>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> >>>>>>>> LocalAgg -> Exchange -> FinalAgg. >>>>>>>> As there is a Calc above the TableSource, this pattern can't match >>>>>>>> the LocalAggPushDownRule in the current design. >>>>>>>> >>>>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) >>>>>>>> from call_center group by rollup(cc_class, cc_employees); >>>>>>>> For the current Blink Planner, the optimized plan will be: >>>>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> >>>>>>>> Calc. >>>>>>>> It's also not covered by the current LocalAggPushDownRule design. >>>>>>>> >>>>>>>> 3. I want to add a case which we haven't discussed yet. >>>>>>>> Aggregate with Having clause. >>>>>>>> eg: select COUNT(1) from call_center group by cc_class having >>>>>>>> max(cc_tax_percentage) > 0.2; >>>>>>>> For the current Blink Planner, the optimized plan will be: >>>>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg -> >>>>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]). >>>>>>>> >>>>>>>> The core discussion points are summarized as follows: >>>>>>>> a) Aggregate is a more complex scenario than predicates or limits, >>>>>>>> and also depends on the different underlying storages. >>>>>>>> b) One rule seems can't completely cover all aggregate scenario, >>>>>>>> but whether SupportSAggregatePushDown interface can be a bit more >>>>>>>> general? >>>>>>>> c) Could the CallExpression express the semantics of CalCite >>>>>>>> AggregateCall? >>>>>>>> >>>>>>>> IMO: Completely push down aggregate is generally hard for >>>>>>>> distributed systems. Usually we need a GROUP BY and exactly >>>>>>>> matches the partition mode in downstream storage. At the same time, >>>>>>>> the benefit of remove the final aggregate is actually limited. >>>>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and >>>>>>>> IO benefits. But I also agree that >>>>>>>> the SupportsAggregatePushDown interface should be as generic as >>>>>>>> possible for future extensions, and meanwhile keep confidence >>>>>>>> in the interface we design. >>>>>>>> >>>>>>>> For core points (a): As the complexity of aggregate, one >>>>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / >>>>>>>> Exchange >>>>>>>> / FinalXXAgg" >>>>>>>> in physical phase. Seems that we can't solve all cases with only >>>>>>>> one rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only >>>>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present. >>>>>>>> >>>>>>>> For core points (b & c): I think we can change the interface to be: >>>>>>>> ``` >>>>>>>> >>>>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression> >>>>>>>> aggregateExpressions, DataType producedDataType, List<int[]> >>>>>>>> groupingSets); >>>>>>>> >>>>>>>> ``` >>>>>>>> >>>>>>>> >>>>>>>> Simple Group: groupingSets.size() == 1 && >>>>>>>> groupingSets.get(0).equals(groupingFields) >>>>>>>> >>>>>>>> Cube Group: groupingSets.size() == IntMath.pow(2, >>>>>>>> groupingFields.cardinality()) >>>>>>>> Rollup: >>>>>>>> Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup >>>>>>>> >>>>>>>> Then we can handle the complex grouping case. The Connector >>>>>>>> developer of the downstream storage should determine >>>>>>>> whether it supports the associated grouping type. For the filter >>>>>>>> and having clause, they will convert to be related Calc RelNode, >>>>>>>> and no longer in the LocalAggregate node, the CallExpression may be >>>>>>>> sufficient to express the semantics of AggregateCall. >>>>>>>> >>>>>>>> What do you think? Looking forward to our further discussion. >>>>>>>> >>>>>>>> >>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月6日周三 下午2:24写道: >>>>>>>> >>>>>>>>> > I think filter expressions and grouping sets are semantic >>>>>>>>> arguments instead of utilities. If we want to push them into sources, >>>>>>>>> the >>>>>>>>> connector developers should be aware of them.Wrapping them in a >>>>>>>>> context >>>>>>>>> implicitly is error-prone that the existing connector will produce >>>>>>>>> wrong >>>>>>>>> results when upgrading to new Flink versions. >>>>>>>>> >>>>>>>>> We can have some mechanism to check the upgrading. >>>>>>>>> >>>>>>>>> > I think for these cases, providing a new default method to >>>>>>>>> override might be a better choice. >>>>>>>>> >>>>>>>>> Then we will have three or more methods. For the API level, I >>>>>>>>> really don't like it... >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jingsong >>>>>>>>> >>>>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I think filter expressions and grouping sets are semantic >>>>>>>>>> arguments instead of utilities. >>>>>>>>>> If we want to push them into sources, the connector developers >>>>>>>>>> should be aware of them. >>>>>>>>>> Wrapping them in a context implicitly is error-prone that the >>>>>>>>>> existing connector will produce wrong results >>>>>>>>>> when upgrading to new Flink versions (as we are pushing >>>>>>>>>> grouping_sets/filter_args, but connector ignores it). >>>>>>>>>> I think for these cases, providing a new default method to >>>>>>>>>> override might be a better choice. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jark >>>>>>>>>> >>>>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I'm also curious about aggregate with filter (COUNT(1) >>>>>>>>>>> FILTER(WHERE d > 1)). Can we push it down? I'm not sure that a >>>>>>>>>>> single call >>>>>>>>>>> expression can express it, and how we should embody it and convey >>>>>>>>>>> it to >>>>>>>>>>> users. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jingsong >>>>>>>>>>> >>>>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li < >>>>>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Jark, >>>>>>>>>>>> >>>>>>>>>>>> I don't want to limit this interface to LocalAgg Push down. >>>>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too. >>>>>>>>>>>> >>>>>>>>>>>> So, this rule can do something more advanced. For example, we >>>>>>>>>>>> can push down group sets to source too, for the SQL: "GROUP BY >>>>>>>>>>>> GROUPING >>>>>>>>>>>> SETS (f1, f2)". Then, we need to add more information to push down. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Jingsong >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I think this may be over designed. We should have confidence >>>>>>>>>>>>> in the interface we design, the interface should be stable. >>>>>>>>>>>>> Wrapping things in a big context has a cost of losing user >>>>>>>>>>>>> convenience. >>>>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do >>>>>>>>>>>>> you know any potential parameters? >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Jark >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li < >>>>>>>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Sebastian, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Well, I mean: >>>>>>>>>>>>>> >>>>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields, >>>>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType >>>>>>>>>>>>>> producedDataType);` >>>>>>>>>>>>>> VS >>>>>>>>>>>>>> ``` >>>>>>>>>>>>>> boolean applyAggregates(Aggregation agg); >>>>>>>>>>>>>> >>>>>>>>>>>>>> interface Aggregation { >>>>>>>>>>>>>> int[] groupingFields(); >>>>>>>>>>>>>> List<CallExpression> aggregateExpressions(); >>>>>>>>>>>>>> DataType producedDataType(); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> ``` >>>>>>>>>>>>>> >>>>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a >>>>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the >>>>>>>>>>>>>> future, so >>>>>>>>>>>>>> make the parameters interface, which is conducive to the future >>>>>>>>>>>>>> expansion >>>>>>>>>>>>>> without destroying the compatibility of user implementation. If >>>>>>>>>>>>>> it is the >>>>>>>>>>>>>> way before, users need to modify the code. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Jingsong >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu < >>>>>>>>>>>>>> liuyang0...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Jinsong, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to >>>>>>>>>>>>>>> be clear in the proposal. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For the semantic problem, I think the main point is the >>>>>>>>>>>>>>> different returned data types >>>>>>>>>>>>>>> for the target aggregate function and the row format >>>>>>>>>>>>>>> returned by the underlying storage. >>>>>>>>>>>>>>> That's why we provide the producedDataType in the >>>>>>>>>>>>>>> SupportsAggregatePushDown interface. >>>>>>>>>>>>>>> Need to let developers know that we need to handle the >>>>>>>>>>>>>>> semantic differences between >>>>>>>>>>>>>>> the underlying storage system and Flink in related >>>>>>>>>>>>>>> connectors. [Supplemented in proposal] >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule >>>>>>>>>>>>>>> rule, it's also a key point. >>>>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE >>>>>>>>>>>>>>> rule set, and better to put it >>>>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal] >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For the scalability of the interface, actually I don't >>>>>>>>>>>>>>> exactly understand your suggestion. Is it to add >>>>>>>>>>>>>>> an abstract class, to implement the >>>>>>>>>>>>>>> SupportsAggregatePushDown interface, and holds the >>>>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[] >>>>>>>>>>>>>>> GroupingFields, DataType producedDataType` >>>>>>>>>>>>>>> fields? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Looking forward to your further feedback or guidance. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your proposal! Sebastian. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful >>>>>>>>>>>>>>>> discussion has solved >>>>>>>>>>>>>>>> many of my concerns. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ## Semantic problems >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as >>>>>>>>>>>>>>>> far as I know, >>>>>>>>>>>>>>>> the semantics of each database is actually different, which >>>>>>>>>>>>>>>> may need to be >>>>>>>>>>>>>>>> reflected in your specific implementation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For example, the AVG output types of various databases may >>>>>>>>>>>>>>>> be different. >>>>>>>>>>>>>>>> For example, MySQL outputs double, this is different from >>>>>>>>>>>>>>>> Flink. What >>>>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and >>>>>>>>>>>>>>>> count, But we also >>>>>>>>>>>>>>>> need care about decimal and others) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ## The phase of push-down rule >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano >>>>>>>>>>>>>>>> phase, which may >>>>>>>>>>>>>>>> make the cost calculation very troublesome. >>>>>>>>>>>>>>>> So in PHYSICAL_REWRITE? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> ## About interface >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an >>>>>>>>>>>>>>>> `Aggregate` >>>>>>>>>>>>>>>> interface, it contains `List<CallExpression> >>>>>>>>>>>>>>>> aggregateExpressions, int[] >>>>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this >>>>>>>>>>>>>>>> way, we can add >>>>>>>>>>>>>>>> fields easily without breaking compatibility. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think the current design is very good, just put forward >>>>>>>>>>>>>>>> some ideas. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Jingsong >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu < >>>>>>>>>>>>>>>> liuyang0...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > Hi Jark, >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of >>>>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some >>>>>>>>>>>>>>>> adjustments. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if >>>>>>>>>>>>>>>> the TableSource can >>>>>>>>>>>>>>>> > handle all of the aggregates. >>>>>>>>>>>>>>>> > It's better to just return a boolean type to indicate >>>>>>>>>>>>>>>> whether all of >>>>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in >>>>>>>>>>>>>>>> proposal] >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the >>>>>>>>>>>>>>>> produced data type of >>>>>>>>>>>>>>>> > the new table source to make sure that the new table >>>>>>>>>>>>>>>> source can >>>>>>>>>>>>>>>> > connect with the related exchange node. The format of this >>>>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg >>>>>>>>>>>>>>>> function's return type. >>>>>>>>>>>>>>>> > The reason for adding this parameter in this function is >>>>>>>>>>>>>>>> also to facilitate >>>>>>>>>>>>>>>> > the user to build the final output type. I have changed >>>>>>>>>>>>>>>> this parameter >>>>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal] >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have >>>>>>>>>>>>>>>> changed to use >>>>>>>>>>>>>>>> > groupingFields. [Resolved in proposal] >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further >>>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道: >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > > I'm also +1 for idea#2. >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > Regarding to the updated interface, >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression> >>>>>>>>>>>>>>>> aggregateExpressions, >>>>>>>>>>>>>>>> > > int[] groupSet, DataType aggOutputDataType); >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > final class Result { >>>>>>>>>>>>>>>> > > private final List<CallExpression> >>>>>>>>>>>>>>>> acceptedAggregates; >>>>>>>>>>>>>>>> > > private final List<CallExpression> >>>>>>>>>>>>>>>> remainingAggregates; >>>>>>>>>>>>>>>> > > } >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > I have following comments: >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a >>>>>>>>>>>>>>>> boolean return type >>>>>>>>>>>>>>>> > > enough? >>>>>>>>>>>>>>>> > > From my understanding, all of the aggregates should >>>>>>>>>>>>>>>> be accepted, >>>>>>>>>>>>>>>> > > otherwise the pushdown should fail. >>>>>>>>>>>>>>>> > > Therefore, users don't need to distinguish which >>>>>>>>>>>>>>>> aggregates are >>>>>>>>>>>>>>>> > > "accepted". >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced >>>>>>>>>>>>>>>> data type of the >>>>>>>>>>>>>>>> > > new source, or just the return type of all the agg >>>>>>>>>>>>>>>> functions? >>>>>>>>>>>>>>>> > > I would prefer to `producedDataType` just like >>>>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for >>>>>>>>>>>>>>>> users to concat a >>>>>>>>>>>>>>>> > final >>>>>>>>>>>>>>>> > > output type. >>>>>>>>>>>>>>>> > > The return type of each agg function can be >>>>>>>>>>>>>>>> obtained from the >>>>>>>>>>>>>>>> > > `CallExpression`. >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to >>>>>>>>>>>>>>>> `grouping` or >>>>>>>>>>>>>>>> > > `groupedFields` ? >>>>>>>>>>>>>>>> > > The `groupSet` may confuse users that it relates to >>>>>>>>>>>>>>>> "grouping sets". >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > What do you think? >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > Best, >>>>>>>>>>>>>>>> > > Jark >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young < >>>>>>>>>>>>>>>> ykt...@gmail.com> wrote: >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > >> Sorry for the typo -_-! >>>>>>>>>>>>>>>> > >> I meant idea #2. >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> > >> Best, >>>>>>>>>>>>>>>> > >> Kurt >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu < >>>>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>>>> > >> wrote: >>>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>>> > >>> Hi Kurt, >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is >>>>>>>>>>>>>>>> more like a >>>>>>>>>>>>>>>> > >>> physical operator rather than logical >>>>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2 >>>>>>>>>>>>>>>> which handle all in >>>>>>>>>>>>>>>> > >>> the physical optimization phase? >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>> Looking forward for the further discussion. >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道: >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator >>>>>>>>>>>>>>>> rather than logical >>>>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1. >>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>> > >>>> Best, >>>>>>>>>>>>>>>> > >>>> Kurt >>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu < >>>>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>>>> > >>>> wrote: >>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and >>>>>>>>>>>>>>>> valuable suggestions. >>>>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of >>>>>>>>>>>>>>>> upgrading the new >>>>>>>>>>>>>>>> > table >>>>>>>>>>>>>>>> > >>>> > source api, >>>>>>>>>>>>>>>> > >>>> > we really should consider the new interface for >>>>>>>>>>>>>>>> the new optimize >>>>>>>>>>>>>>>> > >>>> rule. If >>>>>>>>>>>>>>>> > >>>> > the new rule >>>>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it >>>>>>>>>>>>>>>> sooner or later. I >>>>>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>>>>> > >>>> > change to use >>>>>>>>>>>>>>>> > >>>> > the ability interface for the >>>>>>>>>>>>>>>> SupportsAggregatePushDown definition >>>>>>>>>>>>>>>> > in >>>>>>>>>>>>>>>> > >>>> above >>>>>>>>>>>>>>>> > >>>> > proposal. >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a >>>>>>>>>>>>>>>> better choice, and >>>>>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>>>>> > >>>> > resolved this >>>>>>>>>>>>>>>> > >>>> > comment in the proposal. >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC >>>>>>>>>>>>>>>> connector, as we don't >>>>>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>>>>> > >>>> > Druid connector >>>>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present. >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we >>>>>>>>>>>>>>>> should use idea 1 >>>>>>>>>>>>>>>> > >>>> or >>>>>>>>>>>>>>>> > >>>> > idea 2 in proposal. >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> > What do you think? After we reach the agreement >>>>>>>>>>>>>>>> on the proposal, >>>>>>>>>>>>>>>> > our >>>>>>>>>>>>>>>> > >>>> team >>>>>>>>>>>>>>>> > >>>> > can drive to >>>>>>>>>>>>>>>> > >>>> > complete this feature. >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 >>>>>>>>>>>>>>>> 下午2:58写道: >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> > > Hi Sebastian, >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great >>>>>>>>>>>>>>>> improvement for >>>>>>>>>>>>>>>> > >>>> Flink >>>>>>>>>>>>>>>> > >>>> > SQL. >>>>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following >>>>>>>>>>>>>>>> thoughts: >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource >>>>>>>>>>>>>>>> in 1.11 and >>>>>>>>>>>>>>>> > proposed >>>>>>>>>>>>>>>> > >>>> a new >>>>>>>>>>>>>>>> > >>>> > > set of DynamicTableSource interfaces. Could you >>>>>>>>>>>>>>>> update your >>>>>>>>>>>>>>>> > >>>> proposal to >>>>>>>>>>>>>>>> > >>>> > > use the new interfaces? >>>>>>>>>>>>>>>> > >>>> > > Follow the existing ability interfaces, e.g. >>>>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, >>>>>>>>>>>>>>>> SupportsProjectionPushDown. >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a >>>>>>>>>>>>>>>> better >>>>>>>>>>>>>>>> > >>>> representation >>>>>>>>>>>>>>>> > >>>> > than >>>>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, >>>>>>>>>>>>>>>> it would be >>>>>>>>>>>>>>>> > easier >>>>>>>>>>>>>>>> > >>>> to >>>>>>>>>>>>>>>> > >>>> > know >>>>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments. >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors >>>>>>>>>>>>>>>> will be supported >>>>>>>>>>>>>>>> > in >>>>>>>>>>>>>>>> > >>>> the >>>>>>>>>>>>>>>> > >>>> > > plan? >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > > Best, >>>>>>>>>>>>>>>> > >>>> > > Jark >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu < >>>>>>>>>>>>>>>> > liuyang0...@gmail.com> >>>>>>>>>>>>>>>> > >>>> > wrote: >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > > > Hi all, >>>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the >>>>>>>>>>>>>>>> Blink Planner. >>>>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently >>>>>>>>>>>>>>>> fully done at Flink >>>>>>>>>>>>>>>> > >>>> layer. >>>>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many >>>>>>>>>>>>>>>> downstream storage of Flink >>>>>>>>>>>>>>>> > >>>> SQL >>>>>>>>>>>>>>>> > >>>> > has >>>>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator. >>>>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer >>>>>>>>>>>>>>>> will improve >>>>>>>>>>>>>>>> > >>>> performance >>>>>>>>>>>>>>>> > >>>> > from >>>>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and >>>>>>>>>>>>>>>> computation overhead. >>>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new >>>>>>>>>>>>>>>> feature. >>>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing >>>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome. >>>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>>> > >>>> > > > -- >>>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>>> > >>>> > > > *With kind regards >>>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋 >>>>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese >>>>>>>>>>>>>>>> Academy of Science >>>>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com < >>>>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>>>> > >>>> > > > QQ: 3239559* >>>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> > -- >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> > *With kind regards >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋 >>>>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy >>>>>>>>>>>>>>>> of Science >>>>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>>>> > >>>> > E-mail: liuyang0...@gmail.com < >>>>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>>>> > >>>> > QQ: 3239559* >>>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>> -- >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>> *With kind regards >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋 >>>>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of >>>>>>>>>>>>>>>> Science >>>>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>>>>> > >>> QQ: 3239559* >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > -- >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > *With kind regards >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>>>> > Sebastian Liu 刘洋 >>>>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of >>>>>>>>>>>>>>>> Science >>>>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>>>>> > QQ: 3239559* >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *With kind regards >>>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>>> Sebastian Liu 刘洋 >>>>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science >>>>>>>>>>>>>>> Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>>>> QQ: 3239559* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best, Jingsong Lee >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> *With kind regards >>>>>>>> ------------------------------------------------------------ >>>>>>>> Sebastian Liu 刘洋 >>>>>>>> Institute of Computing Technology, Chinese Academy of Science >>>>>>>> Mobile\WeChat: +86—15201613655 >>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>> QQ: 3239559* >>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> *With kind regards >>>>>> ------------------------------------------------------------ >>>>>> Sebastian Liu 刘洋 >>>>>> Institute of Computing Technology, Chinese Academy of Science >>>>>> Mobile\WeChat: +86—15201613655 >>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>> QQ: 3239559* >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >>> >>> -- >>> >>> *With kind regards >>> ------------------------------------------------------------ >>> Sebastian Liu 刘洋 >>> Institute of Computing Technology, Chinese Academy of Science >>> Mobile\WeChat: +86—15201613655 >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>> QQ: 3239559* >>> >>> > > -- > > *With kind regards > ------------------------------------------------------------ > Sebastian Liu 刘洋 > Institute of Computing Technology, Chinese Academy of Science > Mobile\WeChat: +86—15201613655 > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> > QQ: 3239559* > >