Hi Jark, Seems that we have reached the agreement on the proposal. Could you please help to assign the below jira ticket to me? https://issues.apache.org/jira/browse/FLINK-20791
Jark Wu <imj...@gmail.com> 于2021年1月7日周四 上午10:25写道: > Thanks for updating the design doc. > It looks good to me. > > Best, > Jark > > On Thu, 7 Jan 2021 at 10:16, Jingsong Li <jingsongl...@gmail.com> wrote: > >> Sounds good to me. >> >> We don't have to worry about future changes, because it has covered all >> the capabilities of calcite aggregation. >> >> Best, >> Jingsong >> >> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <liuyang0...@gmail.com> >> wrote: >> >>> Hi Jark, >>> >>> Sounds good to me. For better scalability in the future, we could add >>> the AggregateExpression. >>> ``` >>> >>> public class AggregateExpression implements ResolvedExpression { >>> >>> private final FunctionDefinition functionDefinition; >>> >>> private final List<FieldReferenceExpression> args; >>> >>> private final @Nullable CallExpression filterExpression; >>> >>> private final DataType resultType; >>> >>> private final boolean distinct; >>> >>> private final boolean approximate; >>> >>> >>> >>> private final boolean ignoreNulls; >>> >>> } >>> ``` >>> >>> And we really only need one GroupingSets parameter for grouping. I have >>> updated the related interface in the proposal. >>> Appreciate the continued feedback and help. >>> >>> Jark Wu <imj...@gmail.com> 于2021年1月6日周三 下午9:34写道: >>> >>>> Hi Liu, Jingsong, >>>> >>>> Regarding the agg with filter, I think in theory we can support pushing >>>> such a pattern into source. >>>> We don't need to support it in the first version, but in the long term, >>>> we can support it. >>>> The designed interface should be future proof. >>>> >>>> Considering filter arg and distinct flag should be part of the >>>> aggregate expression. >>>> I'm wondering if CallExpression is a good representation for it. >>>> What do you think about proposing the following `AggregateExpression` >>>> to replace the `CallExpression`? >>>> >>>> class AggregateExpression implements ResolvedExpression { >>>> private final FunctionDefinition functionDefinition; >>>> private final List<FieldReferenceExpression> args; >>>> private final @Nullable CallExpression filterExpr; >>>> private final boolean distinct; >>>> } >>>> >>>> Besides, we don't need both groupingFields and groupingSets. >>>> `groupingSets` should be a superset of groupingFields. >>>> Then the interface of SupportsAggregatePushDown can be: >>>> >>>> interface SupportsAggregatePushDown { >>>> >>>> boolean applyAggregates( >>>> List<int[]> groupingSets, >>>> List<AggregateExpression> aggregates, >>>> DataType producedDataType); >>>> } >>>> >>>> What do you think? >>>> >>>> Best, >>>> Jark >>>> >>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <liuyang0...@gmail.com> >>>> wrote: >>>> >>>>> Hi Jingsong, Jark, >>>>> >>>>> Thx so much for our discussion, and the cases mentioned above are >>>>> really worthy for further discussion. >>>>> >>>>> 1. For aggregate with filter expressions: eg: select COUNT(1) >>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center; >>>>> For the current Blink Planner, the optimized plan will be: >>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg >>>>> -> Exchange -> FinalAgg. >>>>> As there is a Calc above the TableSource, this pattern can't match the >>>>> LocalAggPushDownRule in the current design. >>>>> >>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from >>>>> call_center group by rollup(cc_class, cc_employees); >>>>> For the current Blink Planner, the optimized plan will be: >>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc. >>>>> It's also not covered by the current LocalAggPushDownRule design. >>>>> >>>>> 3. I want to add a case which we haven't discussed yet. Aggregate with >>>>> Having clause. >>>>> eg: select COUNT(1) from call_center group by cc_class having >>>>> max(cc_tax_percentage) > 0.2; >>>>> For the current Blink Planner, the optimized plan will be: >>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg -> >>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]). >>>>> >>>>> The core discussion points are summarized as follows: >>>>> a) Aggregate is a more complex scenario than predicates or limits, and >>>>> also depends on the different underlying storages. >>>>> b) One rule seems can't completely cover all aggregate scenario, but >>>>> whether SupportSAggregatePushDown interface can be a bit more general? >>>>> c) Could the CallExpression express the semantics of CalCite >>>>> AggregateCall? >>>>> >>>>> IMO: Completely push down aggregate is generally hard for distributed >>>>> systems. Usually we need a GROUP BY and exactly >>>>> matches the partition mode in downstream storage. At the same time, >>>>> the benefit of remove the final aggregate is actually limited. >>>>> The LocalAggPushDown generally yields more than 80% of the CPU and IO >>>>> benefits. But I also agree that >>>>> the SupportsAggregatePushDown interface should be as generic as >>>>> possible for future extensions, and meanwhile keep confidence >>>>> in the interface we design. >>>>> >>>>> For core points (a): As the complexity of aggregate, one >>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / Exchange >>>>> / FinalXXAgg" >>>>> in physical phase. Seems that we can't solve all cases with only one >>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only >>>>> on the pattern of TableSourceScan + LocalXXAggregate at present. >>>>> >>>>> For core points (b & c): I think we can change the interface to be: >>>>> ``` >>>>> >>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression> >>>>> aggregateExpressions, DataType producedDataType, List<int[]> >>>>> groupingSets); >>>>> >>>>> ``` >>>>> >>>>> >>>>> Simple Group: groupingSets.size() == 1 && >>>>> groupingSets.get(0).equals(groupingFields) >>>>> >>>>> Cube Group: groupingSets.size() == IntMath.pow(2, >>>>> groupingFields.cardinality()) >>>>> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup >>>>> >>>>> Then we can handle the complex grouping case. The Connector developer >>>>> of the downstream storage should determine >>>>> whether it supports the associated grouping type. For the filter and >>>>> having clause, they will convert to be related Calc RelNode, >>>>> and no longer in the LocalAggregate node, the CallExpression may be >>>>> sufficient to express the semantics of AggregateCall. >>>>> >>>>> What do you think? Looking forward to our further discussion. >>>>> >>>>> >>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月6日周三 下午2:24写道: >>>>> >>>>>> > I think filter expressions and grouping sets are semantic arguments >>>>>> instead of utilities. If we want to push them into sources, the connector >>>>>> developers should be aware of them.Wrapping them in a context implicitly >>>>>> is >>>>>> error-prone that the existing connector will produce wrong results when >>>>>> upgrading to new Flink versions. >>>>>> >>>>>> We can have some mechanism to check the upgrading. >>>>>> >>>>>> > I think for these cases, providing a new default method to override >>>>>> might be a better choice. >>>>>> >>>>>> Then we will have three or more methods. For the API level, I really >>>>>> don't like it... >>>>>> >>>>>> Best, >>>>>> Jingsong >>>>>> >>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote: >>>>>> >>>>>>> I think filter expressions and grouping sets are semantic arguments >>>>>>> instead of utilities. >>>>>>> If we want to push them into sources, the connector developers >>>>>>> should be aware of them. >>>>>>> Wrapping them in a context implicitly is error-prone that the >>>>>>> existing connector will produce wrong results >>>>>>> when upgrading to new Flink versions (as we are pushing >>>>>>> grouping_sets/filter_args, but connector ignores it). >>>>>>> I think for these cases, providing a new default method to override >>>>>>> might be a better choice. >>>>>>> >>>>>>> Best, >>>>>>> Jark >>>>>>> >>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE >>>>>>>> d > 1)). Can we push it down? I'm not sure that a single call >>>>>>>> expression >>>>>>>> can express it, and how we should embody it and convey it to users. >>>>>>>> >>>>>>>> Best, >>>>>>>> Jingsong >>>>>>>> >>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <jingsongl...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Jark, >>>>>>>>> >>>>>>>>> I don't want to limit this interface to LocalAgg Push down. >>>>>>>>> Actually, sometimes, we can push whole aggregation to source too. >>>>>>>>> >>>>>>>>> So, this rule can do something more advanced. For example, we can >>>>>>>>> push down group sets to source too, for the SQL: "GROUP BY GROUPING >>>>>>>>> SETS >>>>>>>>> (f1, f2)". Then, we need to add more information to push down. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jingsong >>>>>>>>> >>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I think this may be over designed. We should have confidence in >>>>>>>>>> the interface we design, the interface should be stable. >>>>>>>>>> Wrapping things in a big context has a cost of losing user >>>>>>>>>> convenience. >>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do >>>>>>>>>> you know any potential parameters? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jark >>>>>>>>>> >>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <jingsongl...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Sebastian, >>>>>>>>>>> >>>>>>>>>>> Well, I mean: >>>>>>>>>>> >>>>>>>>>>> `boolean applyAggregates(int[] groupingFields, >>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType >>>>>>>>>>> producedDataType);` >>>>>>>>>>> VS >>>>>>>>>>> ``` >>>>>>>>>>> boolean applyAggregates(Aggregation agg); >>>>>>>>>>> >>>>>>>>>>> interface Aggregation { >>>>>>>>>>> int[] groupingFields(); >>>>>>>>>>> List<CallExpression> aggregateExpressions(); >>>>>>>>>>> DataType producedDataType(); >>>>>>>>>>> } >>>>>>>>>>> ``` >>>>>>>>>>> >>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a >>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the >>>>>>>>>>> future, so >>>>>>>>>>> make the parameters interface, which is conducive to the future >>>>>>>>>>> expansion >>>>>>>>>>> without destroying the compatibility of user implementation. If it >>>>>>>>>>> is the >>>>>>>>>>> way before, users need to modify the code. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jingsong >>>>>>>>>>> >>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu < >>>>>>>>>>> liuyang0...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Jinsong, >>>>>>>>>>>> >>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be >>>>>>>>>>>> clear in the proposal. >>>>>>>>>>>> >>>>>>>>>>>> For the semantic problem, I think the main point is the >>>>>>>>>>>> different returned data types >>>>>>>>>>>> for the target aggregate function and the row format returned >>>>>>>>>>>> by the underlying storage. >>>>>>>>>>>> That's why we provide the producedDataType in the >>>>>>>>>>>> SupportsAggregatePushDown interface. >>>>>>>>>>>> Need to let developers know that we need to handle the semantic >>>>>>>>>>>> differences between >>>>>>>>>>>> the underlying storage system and Flink in related connectors. >>>>>>>>>>>> [Supplemented in proposal] >>>>>>>>>>>> >>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule >>>>>>>>>>>> rule, it's also a key point. >>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE >>>>>>>>>>>> rule set, and better to put it >>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal] >>>>>>>>>>>> >>>>>>>>>>>> For the scalability of the interface, actually I don't exactly >>>>>>>>>>>> understand your suggestion. Is it to add >>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown >>>>>>>>>>>> interface, and holds the >>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[] >>>>>>>>>>>> GroupingFields, DataType producedDataType` >>>>>>>>>>>> fields? >>>>>>>>>>>> >>>>>>>>>>>> Looking forward to your further feedback or guidance. >>>>>>>>>>>> >>>>>>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for your proposal! Sebastian. >>>>>>>>>>>>> >>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful >>>>>>>>>>>>> discussion has solved >>>>>>>>>>>>> many of my concerns. >>>>>>>>>>>>> >>>>>>>>>>>>> ## Semantic problems >>>>>>>>>>>>> >>>>>>>>>>>>> We may need to add some mechanisms or comments, because as far >>>>>>>>>>>>> as I know, >>>>>>>>>>>>> the semantics of each database is actually different, which >>>>>>>>>>>>> may need to be >>>>>>>>>>>>> reflected in your specific implementation. >>>>>>>>>>>>> >>>>>>>>>>>>> For example, the AVG output types of various databases may be >>>>>>>>>>>>> different. >>>>>>>>>>>>> For example, MySQL outputs double, this is different from >>>>>>>>>>>>> Flink. What >>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and count, >>>>>>>>>>>>> But we also >>>>>>>>>>>>> need care about decimal and others) >>>>>>>>>>>>> >>>>>>>>>>>>> ## The phase of push-down rule >>>>>>>>>>>>> >>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano >>>>>>>>>>>>> phase, which may >>>>>>>>>>>>> make the cost calculation very troublesome. >>>>>>>>>>>>> So in PHYSICAL_REWRITE? >>>>>>>>>>>>> >>>>>>>>>>>>> ## About interface >>>>>>>>>>>>> >>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an >>>>>>>>>>>>> `Aggregate` >>>>>>>>>>>>> interface, it contains `List<CallExpression> >>>>>>>>>>>>> aggregateExpressions, int[] >>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this >>>>>>>>>>>>> way, we can add >>>>>>>>>>>>> fields easily without breaking compatibility. >>>>>>>>>>>>> >>>>>>>>>>>>> I think the current design is very good, just put forward some >>>>>>>>>>>>> ideas. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Jingsong >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu < >>>>>>>>>>>>> liuyang0...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> > Hi Jark, >>>>>>>>>>>>> > >>>>>>>>>>>>> > Thx for your further feedback and help. The interface of >>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments. >>>>>>>>>>>>> > >>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the >>>>>>>>>>>>> TableSource can >>>>>>>>>>>>> > handle all of the aggregates. >>>>>>>>>>>>> > It's better to just return a boolean type to indicate >>>>>>>>>>>>> whether all of >>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in >>>>>>>>>>>>> proposal] >>>>>>>>>>>>> > >>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced >>>>>>>>>>>>> data type of >>>>>>>>>>>>> > the new table source to make sure that the new table source >>>>>>>>>>>>> can >>>>>>>>>>>>> > connect with the related exchange node. The format of this >>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's >>>>>>>>>>>>> return type. >>>>>>>>>>>>> > The reason for adding this parameter in this function is >>>>>>>>>>>>> also to facilitate >>>>>>>>>>>>> > the user to build the final output type. I have changed this >>>>>>>>>>>>> parameter >>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal] >>>>>>>>>>>>> > >>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have >>>>>>>>>>>>> changed to use >>>>>>>>>>>>> > groupingFields. [Resolved in proposal] >>>>>>>>>>>>> > >>>>>>>>>>>>> > Thx again for the suggestion, looking for the further >>>>>>>>>>>>> discussion. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道: >>>>>>>>>>>>> > >>>>>>>>>>>>> > > I'm also +1 for idea#2. >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > Regarding to the updated interface, >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression> >>>>>>>>>>>>> aggregateExpressions, >>>>>>>>>>>>> > > int[] groupSet, DataType aggOutputDataType); >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > final class Result { >>>>>>>>>>>>> > > private final List<CallExpression> >>>>>>>>>>>>> acceptedAggregates; >>>>>>>>>>>>> > > private final List<CallExpression> >>>>>>>>>>>>> remainingAggregates; >>>>>>>>>>>>> > > } >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > I have following comments: >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a >>>>>>>>>>>>> boolean return type >>>>>>>>>>>>> > > enough? >>>>>>>>>>>>> > > From my understanding, all of the aggregates should be >>>>>>>>>>>>> accepted, >>>>>>>>>>>>> > > otherwise the pushdown should fail. >>>>>>>>>>>>> > > Therefore, users don't need to distinguish which >>>>>>>>>>>>> aggregates are >>>>>>>>>>>>> > > "accepted". >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced >>>>>>>>>>>>> data type of the >>>>>>>>>>>>> > > new source, or just the return type of all the agg >>>>>>>>>>>>> functions? >>>>>>>>>>>>> > > I would prefer to `producedDataType` just like >>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users >>>>>>>>>>>>> to concat a >>>>>>>>>>>>> > final >>>>>>>>>>>>> > > output type. >>>>>>>>>>>>> > > The return type of each agg function can be obtained >>>>>>>>>>>>> from the >>>>>>>>>>>>> > > `CallExpression`. >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to >>>>>>>>>>>>> `grouping` or >>>>>>>>>>>>> > > `groupedFields` ? >>>>>>>>>>>>> > > The `groupSet` may confuse users that it relates to >>>>>>>>>>>>> "grouping sets". >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > What do you think? >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > Best, >>>>>>>>>>>>> > > Jark >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <ykt...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> > > >>>>>>>>>>>>> > >> Sorry for the typo -_-! >>>>>>>>>>>>> > >> I meant idea #2. >>>>>>>>>>>>> > >> >>>>>>>>>>>>> > >> Best, >>>>>>>>>>>>> > >> Kurt >>>>>>>>>>>>> > >> >>>>>>>>>>>>> > >> >>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu < >>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>> > >> wrote: >>>>>>>>>>>>> > >> >>>>>>>>>>>>> > >>> Hi Kurt, >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is >>>>>>>>>>>>> more like a >>>>>>>>>>>>> > >>> physical operator rather than logical >>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2 >>>>>>>>>>>>> which handle all in >>>>>>>>>>>>> > >>> the physical optimization phase? >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>> Looking forward for the further discussion. >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道: >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator >>>>>>>>>>>>> rather than logical >>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1. >>>>>>>>>>>>> > >>>> >>>>>>>>>>>>> > >>>> Best, >>>>>>>>>>>>> > >>>> Kurt >>>>>>>>>>>>> > >>>> >>>>>>>>>>>>> > >>>> >>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu < >>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>> > >>>> wrote: >>>>>>>>>>>>> > >>>> >>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable >>>>>>>>>>>>> suggestions. >>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of >>>>>>>>>>>>> upgrading the new >>>>>>>>>>>>> > table >>>>>>>>>>>>> > >>>> > source api, >>>>>>>>>>>>> > >>>> > we really should consider the new interface for the >>>>>>>>>>>>> new optimize >>>>>>>>>>>>> > >>>> rule. If >>>>>>>>>>>>> > >>>> > the new rule >>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it >>>>>>>>>>>>> sooner or later. I >>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>> > >>>> > change to use >>>>>>>>>>>>> > >>>> > the ability interface for the >>>>>>>>>>>>> SupportsAggregatePushDown definition >>>>>>>>>>>>> > in >>>>>>>>>>>>> > >>>> above >>>>>>>>>>>>> > >>>> > proposal. >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a >>>>>>>>>>>>> better choice, and >>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>> > >>>> > resolved this >>>>>>>>>>>>> > >>>> > comment in the proposal. >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC >>>>>>>>>>>>> connector, as we don't >>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>> > >>>> > Druid connector >>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present. >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we >>>>>>>>>>>>> should use idea 1 >>>>>>>>>>>>> > >>>> or >>>>>>>>>>>>> > >>>> > idea 2 in proposal. >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> > What do you think? After we reach the agreement on >>>>>>>>>>>>> the proposal, >>>>>>>>>>>>> > our >>>>>>>>>>>>> > >>>> team >>>>>>>>>>>>> > >>>> > can drive to >>>>>>>>>>>>> > >>>> > complete this feature. >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 下午2:58写道: >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> > > Hi Sebastian, >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great >>>>>>>>>>>>> improvement for >>>>>>>>>>>>> > >>>> Flink >>>>>>>>>>>>> > >>>> > SQL. >>>>>>>>>>>>> > >>>> > > I went through the design doc and have following >>>>>>>>>>>>> thoughts: >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in >>>>>>>>>>>>> 1.11 and >>>>>>>>>>>>> > proposed >>>>>>>>>>>>> > >>>> a new >>>>>>>>>>>>> > >>>> > > set of DynamicTableSource interfaces. Could you >>>>>>>>>>>>> update your >>>>>>>>>>>>> > >>>> proposal to >>>>>>>>>>>>> > >>>> > > use the new interfaces? >>>>>>>>>>>>> > >>>> > > Follow the existing ability interfaces, e.g. >>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown. >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a >>>>>>>>>>>>> better >>>>>>>>>>>>> > >>>> representation >>>>>>>>>>>>> > >>>> > than >>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it >>>>>>>>>>>>> would be >>>>>>>>>>>>> > easier >>>>>>>>>>>>> > >>>> to >>>>>>>>>>>>> > >>>> > know >>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments. >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors will >>>>>>>>>>>>> be supported >>>>>>>>>>>>> > in >>>>>>>>>>>>> > >>>> the >>>>>>>>>>>>> > >>>> > > plan? >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > > Best, >>>>>>>>>>>>> > >>>> > > Jark >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu < >>>>>>>>>>>>> > liuyang0...@gmail.com> >>>>>>>>>>>>> > >>>> > wrote: >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > > > Hi all, >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink >>>>>>>>>>>>> Planner. >>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently >>>>>>>>>>>>> fully done at Flink >>>>>>>>>>>>> > >>>> layer. >>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream >>>>>>>>>>>>> storage of Flink >>>>>>>>>>>>> > >>>> SQL >>>>>>>>>>>>> > >>>> > has >>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator. >>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will >>>>>>>>>>>>> improve >>>>>>>>>>>>> > >>>> performance >>>>>>>>>>>>> > >>>> > from >>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and computation >>>>>>>>>>>>> overhead. >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature. >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> >>>>>>>>>>>>> > >>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome. >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > -- >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > > *With kind regards >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋 >>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese >>>>>>>>>>>>> Academy of Science >>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com < >>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>> > >>>> > > > QQ: 3239559* >>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> > -- >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> > *With kind regards >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋 >>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of >>>>>>>>>>>>> Science >>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>> > >>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>> > >>>> > QQ: 3239559* >>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>> > >>>> >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>> -- >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>> *With kind regards >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>> > >>> Sebastian Liu 刘洋 >>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of >>>>>>>>>>>>> Science >>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>> > >>> QQ: 3239559* >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>> >>>>>>>>>>>>> > >>>>>>>>>>>>> > -- >>>>>>>>>>>>> > >>>>>>>>>>>>> > *With kind regards >>>>>>>>>>>>> > ------------------------------------------------------------ >>>>>>>>>>>>> > Sebastian Liu 刘洋 >>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of Science >>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>> > QQ: 3239559* >>>>>>>>>>>>> > >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> *With kind regards >>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>> Sebastian Liu 刘洋 >>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science >>>>>>>>>>>> Mobile\WeChat: +86—15201613655 >>>>>>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>> QQ: 3239559* >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best, Jingsong Lee >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best, Jingsong Lee >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best, Jingsong Lee >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> *With kind regards >>>>> ------------------------------------------------------------ >>>>> Sebastian Liu 刘洋 >>>>> Institute of Computing Technology, Chinese Academy of Science >>>>> Mobile\WeChat: +86—15201613655 >>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>> QQ: 3239559* >>>>> >>>>> >>> >>> -- >>> >>> *With kind regards >>> ------------------------------------------------------------ >>> Sebastian Liu 刘洋 >>> Institute of Computing Technology, Chinese Academy of Science >>> Mobile\WeChat: +86—15201613655 >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>> QQ: 3239559* >>> >>> >> >> -- >> Best, Jingsong Lee >> > -- *With kind regards ------------------------------------------------------------ Sebastian Liu 刘洋 Institute of Computing Technology, Chinese Academy of Science Mobile\WeChat: +86—15201613655 E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> QQ: 3239559*