Hi Sebastian, I assigned the issue to you. But I suggest creating sub-tasks under this issue. Because I think this would be a big contribution. For example, you can split it into: 1. Introduce SupportsAggregatePushDown interface 2. Support SupportsAggregatePushDown in planner 3. Support SupportsAggregatePushDown for JDBC source 4. ...
Best, Jark On Thu, 7 Jan 2021 at 23:27, Sebastian Liu <liuyang0...@gmail.com> wrote: > Hi Jark, > > Seems that we have reached the agreement on the proposal. Could you > please help to assign the below jira ticket to me? > https://issues.apache.org/jira/browse/FLINK-20791 > > Jark Wu <imj...@gmail.com> 于2021年1月7日周四 上午10:25写道: > >> Thanks for updating the design doc. >> It looks good to me. >> >> Best, >> Jark >> >> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <jingsongl...@gmail.com> wrote: >> >>> Sounds good to me. >>> >>> We don't have to worry about future changes, because it has covered all >>> the capabilities of calcite aggregation. >>> >>> Best, >>> Jingsong >>> >>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <liuyang0...@gmail.com> >>> wrote: >>> >>>> Hi Jark, >>>> >>>> Sounds good to me. For better scalability in the future, we could add >>>> the AggregateExpression. >>>> ``` >>>> >>>> public class AggregateExpression implements ResolvedExpression { >>>> >>>> private final FunctionDefinition functionDefinition; >>>> >>>> private final List<FieldReferenceExpression> args; >>>> >>>> private final @Nullable CallExpression filterExpression; >>>> >>>> private final DataType resultType; >>>> >>>> private final boolean distinct; >>>> >>>> private final boolean approximate; >>>> >>>> >>>> >>>> private final boolean ignoreNulls; >>>> >>>> } >>>> ``` >>>> >>>> And we really only need one GroupingSets parameter for grouping. I have >>>> updated the related interface in the proposal. >>>> Appreciate the continued feedback and help. >>>> >>>> Jark Wu <imj...@gmail.com> 于2021年1月6日周三 下午9:34写道: >>>> >>>>> Hi Liu, Jingsong, >>>>> >>>>> Regarding the agg with filter, I think in theory we can support >>>>> pushing such a pattern into source. >>>>> We don't need to support it in the first version, but in the long >>>>> term, we can support it. >>>>> The designed interface should be future proof. >>>>> >>>>> Considering filter arg and distinct flag should be part of the >>>>> aggregate expression. >>>>> I'm wondering if CallExpression is a good representation for it. >>>>> What do you think about proposing the following `AggregateExpression` >>>>> to replace the `CallExpression`? >>>>> >>>>> class AggregateExpression implements ResolvedExpression { >>>>> private final FunctionDefinition functionDefinition; >>>>> private final List<FieldReferenceExpression> args; >>>>> private final @Nullable CallExpression filterExpr; >>>>> private final boolean distinct; >>>>> } >>>>> >>>>> Besides, we don't need both groupingFields and groupingSets. >>>>> `groupingSets` should be a superset of groupingFields. >>>>> Then the interface of SupportsAggregatePushDown can be: >>>>> >>>>> interface SupportsAggregatePushDown { >>>>> >>>>> boolean applyAggregates( >>>>> List<int[]> groupingSets, >>>>> List<AggregateExpression> aggregates, >>>>> DataType producedDataType); >>>>> } >>>>> >>>>> What do you think? >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <liuyang0...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Jingsong, Jark, >>>>>> >>>>>> Thx so much for our discussion, and the cases mentioned above are >>>>>> really worthy for further discussion. >>>>>> >>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1) >>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center; >>>>>> For the current Blink Planner, the optimized plan will be: >>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> LocalAgg >>>>>> -> Exchange -> FinalAgg. >>>>>> As there is a Calc above the TableSource, this pattern can't match >>>>>> the LocalAggPushDownRule in the current design. >>>>>> >>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from >>>>>> call_center group by rollup(cc_class, cc_employees); >>>>>> For the current Blink Planner, the optimized plan will be: >>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> Calc. >>>>>> It's also not covered by the current LocalAggPushDownRule design. >>>>>> >>>>>> 3. I want to add a case which we haven't discussed yet. >>>>>> Aggregate with Having clause. >>>>>> eg: select COUNT(1) from call_center group by cc_class having >>>>>> max(cc_tax_percentage) > 0.2; >>>>>> For the current Blink Planner, the optimized plan will be: >>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg -> >>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]). >>>>>> >>>>>> The core discussion points are summarized as follows: >>>>>> a) Aggregate is a more complex scenario than predicates or limits, >>>>>> and also depends on the different underlying storages. >>>>>> b) One rule seems can't completely cover all aggregate scenario, but >>>>>> whether SupportSAggregatePushDown interface can be a bit more general? >>>>>> c) Could the CallExpression express the semantics of CalCite >>>>>> AggregateCall? >>>>>> >>>>>> IMO: Completely push down aggregate is generally hard for distributed >>>>>> systems. Usually we need a GROUP BY and exactly >>>>>> matches the partition mode in downstream storage. At the same time, >>>>>> the benefit of remove the final aggregate is actually limited. >>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and IO >>>>>> benefits. But I also agree that >>>>>> the SupportsAggregatePushDown interface should be as generic as >>>>>> possible for future extensions, and meanwhile keep confidence >>>>>> in the interface we design. >>>>>> >>>>>> For core points (a): As the complexity of aggregate, one >>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / >>>>>> Exchange >>>>>> / FinalXXAgg" >>>>>> in physical phase. Seems that we can't solve all cases with only one >>>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only >>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present. >>>>>> >>>>>> For core points (b & c): I think we can change the interface to be: >>>>>> ``` >>>>>> >>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression> >>>>>> aggregateExpressions, DataType producedDataType, List<int[]> >>>>>> groupingSets); >>>>>> >>>>>> ``` >>>>>> >>>>>> >>>>>> Simple Group: groupingSets.size() == 1 && >>>>>> groupingSets.get(0).equals(groupingFields) >>>>>> >>>>>> Cube Group: groupingSets.size() == IntMath.pow(2, >>>>>> groupingFields.cardinality()) >>>>>> Rollup: Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup >>>>>> >>>>>> Then we can handle the complex grouping case. The Connector developer >>>>>> of the downstream storage should determine >>>>>> whether it supports the associated grouping type. For the filter and >>>>>> having clause, they will convert to be related Calc RelNode, >>>>>> and no longer in the LocalAggregate node, the CallExpression may be >>>>>> sufficient to express the semantics of AggregateCall. >>>>>> >>>>>> What do you think? Looking forward to our further discussion. >>>>>> >>>>>> >>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月6日周三 下午2:24写道: >>>>>> >>>>>>> > I think filter expressions and grouping sets are semantic >>>>>>> arguments instead of utilities. If we want to push them into sources, >>>>>>> the >>>>>>> connector developers should be aware of them.Wrapping them in a context >>>>>>> implicitly is error-prone that the existing connector will produce wrong >>>>>>> results when upgrading to new Flink versions. >>>>>>> >>>>>>> We can have some mechanism to check the upgrading. >>>>>>> >>>>>>> > I think for these cases, providing a new default method to >>>>>>> override might be a better choice. >>>>>>> >>>>>>> Then we will have three or more methods. For the API level, I really >>>>>>> don't like it... >>>>>>> >>>>>>> Best, >>>>>>> Jingsong >>>>>>> >>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote: >>>>>>> >>>>>>>> I think filter expressions and grouping sets are semantic arguments >>>>>>>> instead of utilities. >>>>>>>> If we want to push them into sources, the connector developers >>>>>>>> should be aware of them. >>>>>>>> Wrapping them in a context implicitly is error-prone that the >>>>>>>> existing connector will produce wrong results >>>>>>>> when upgrading to new Flink versions (as we are pushing >>>>>>>> grouping_sets/filter_args, but connector ignores it). >>>>>>>> I think for these cases, providing a new default method to override >>>>>>>> might be a better choice. >>>>>>>> >>>>>>>> Best, >>>>>>>> Jark >>>>>>>> >>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I'm also curious about aggregate with filter (COUNT(1) >>>>>>>>> FILTER(WHERE d > 1)). Can we push it down? I'm not sure that a single >>>>>>>>> call >>>>>>>>> expression can express it, and how we should embody it and convey it >>>>>>>>> to >>>>>>>>> users. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jingsong >>>>>>>>> >>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <jingsongl...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Jark, >>>>>>>>>> >>>>>>>>>> I don't want to limit this interface to LocalAgg Push down. >>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too. >>>>>>>>>> >>>>>>>>>> So, this rule can do something more advanced. For example, we can >>>>>>>>>> push down group sets to source too, for the SQL: "GROUP BY GROUPING >>>>>>>>>> SETS >>>>>>>>>> (f1, f2)". Then, we need to add more information to push down. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jingsong >>>>>>>>>> >>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I think this may be over designed. We should have confidence in >>>>>>>>>>> the interface we design, the interface should be stable. >>>>>>>>>>> Wrapping things in a big context has a cost of losing user >>>>>>>>>>> convenience. >>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do >>>>>>>>>>> you know any potential parameters? >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jark >>>>>>>>>>> >>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <jingsongl...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Sebastian, >>>>>>>>>>>> >>>>>>>>>>>> Well, I mean: >>>>>>>>>>>> >>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields, >>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType >>>>>>>>>>>> producedDataType);` >>>>>>>>>>>> VS >>>>>>>>>>>> ``` >>>>>>>>>>>> boolean applyAggregates(Aggregation agg); >>>>>>>>>>>> >>>>>>>>>>>> interface Aggregation { >>>>>>>>>>>> int[] groupingFields(); >>>>>>>>>>>> List<CallExpression> aggregateExpressions(); >>>>>>>>>>>> DataType producedDataType(); >>>>>>>>>>>> } >>>>>>>>>>>> ``` >>>>>>>>>>>> >>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a >>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the >>>>>>>>>>>> future, so >>>>>>>>>>>> make the parameters interface, which is conducive to the future >>>>>>>>>>>> expansion >>>>>>>>>>>> without destroying the compatibility of user implementation. If it >>>>>>>>>>>> is the >>>>>>>>>>>> way before, users need to modify the code. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Jingsong >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu < >>>>>>>>>>>> liuyang0...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Jinsong, >>>>>>>>>>>>> >>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be >>>>>>>>>>>>> clear in the proposal. >>>>>>>>>>>>> >>>>>>>>>>>>> For the semantic problem, I think the main point is the >>>>>>>>>>>>> different returned data types >>>>>>>>>>>>> for the target aggregate function and the row format returned >>>>>>>>>>>>> by the underlying storage. >>>>>>>>>>>>> That's why we provide the producedDataType in the >>>>>>>>>>>>> SupportsAggregatePushDown interface. >>>>>>>>>>>>> Need to let developers know that we need to handle the >>>>>>>>>>>>> semantic differences between >>>>>>>>>>>>> the underlying storage system and Flink in related connectors. >>>>>>>>>>>>> [Supplemented in proposal] >>>>>>>>>>>>> >>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule >>>>>>>>>>>>> rule, it's also a key point. >>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE >>>>>>>>>>>>> rule set, and better to put it >>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal] >>>>>>>>>>>>> >>>>>>>>>>>>> For the scalability of the interface, actually I don't exactly >>>>>>>>>>>>> understand your suggestion. Is it to add >>>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown >>>>>>>>>>>>> interface, and holds the >>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[] >>>>>>>>>>>>> GroupingFields, DataType producedDataType` >>>>>>>>>>>>> fields? >>>>>>>>>>>>> >>>>>>>>>>>>> Looking forward to your further feedback or guidance. >>>>>>>>>>>>> >>>>>>>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for your proposal! Sebastian. >>>>>>>>>>>>>> >>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful >>>>>>>>>>>>>> discussion has solved >>>>>>>>>>>>>> many of my concerns. >>>>>>>>>>>>>> >>>>>>>>>>>>>> ## Semantic problems >>>>>>>>>>>>>> >>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as >>>>>>>>>>>>>> far as I know, >>>>>>>>>>>>>> the semantics of each database is actually different, which >>>>>>>>>>>>>> may need to be >>>>>>>>>>>>>> reflected in your specific implementation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For example, the AVG output types of various databases may be >>>>>>>>>>>>>> different. >>>>>>>>>>>>>> For example, MySQL outputs double, this is different from >>>>>>>>>>>>>> Flink. What >>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and >>>>>>>>>>>>>> count, But we also >>>>>>>>>>>>>> need care about decimal and others) >>>>>>>>>>>>>> >>>>>>>>>>>>>> ## The phase of push-down rule >>>>>>>>>>>>>> >>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano >>>>>>>>>>>>>> phase, which may >>>>>>>>>>>>>> make the cost calculation very troublesome. >>>>>>>>>>>>>> So in PHYSICAL_REWRITE? >>>>>>>>>>>>>> >>>>>>>>>>>>>> ## About interface >>>>>>>>>>>>>> >>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an >>>>>>>>>>>>>> `Aggregate` >>>>>>>>>>>>>> interface, it contains `List<CallExpression> >>>>>>>>>>>>>> aggregateExpressions, int[] >>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this >>>>>>>>>>>>>> way, we can add >>>>>>>>>>>>>> fields easily without breaking compatibility. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think the current design is very good, just put forward >>>>>>>>>>>>>> some ideas. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Jingsong >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu < >>>>>>>>>>>>>> liuyang0...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> > Hi Jark, >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of >>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the >>>>>>>>>>>>>> TableSource can >>>>>>>>>>>>>> > handle all of the aggregates. >>>>>>>>>>>>>> > It's better to just return a boolean type to indicate >>>>>>>>>>>>>> whether all of >>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in >>>>>>>>>>>>>> proposal] >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced >>>>>>>>>>>>>> data type of >>>>>>>>>>>>>> > the new table source to make sure that the new table source >>>>>>>>>>>>>> can >>>>>>>>>>>>>> > connect with the related exchange node. The format of this >>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's >>>>>>>>>>>>>> return type. >>>>>>>>>>>>>> > The reason for adding this parameter in this function is >>>>>>>>>>>>>> also to facilitate >>>>>>>>>>>>>> > the user to build the final output type. I have changed >>>>>>>>>>>>>> this parameter >>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal] >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have >>>>>>>>>>>>>> changed to use >>>>>>>>>>>>>> > groupingFields. [Resolved in proposal] >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further >>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道: >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > > I'm also +1 for idea#2. >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > Regarding to the updated interface, >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression> >>>>>>>>>>>>>> aggregateExpressions, >>>>>>>>>>>>>> > > int[] groupSet, DataType aggOutputDataType); >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > final class Result { >>>>>>>>>>>>>> > > private final List<CallExpression> >>>>>>>>>>>>>> acceptedAggregates; >>>>>>>>>>>>>> > > private final List<CallExpression> >>>>>>>>>>>>>> remainingAggregates; >>>>>>>>>>>>>> > > } >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > I have following comments: >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a >>>>>>>>>>>>>> boolean return type >>>>>>>>>>>>>> > > enough? >>>>>>>>>>>>>> > > From my understanding, all of the aggregates should >>>>>>>>>>>>>> be accepted, >>>>>>>>>>>>>> > > otherwise the pushdown should fail. >>>>>>>>>>>>>> > > Therefore, users don't need to distinguish which >>>>>>>>>>>>>> aggregates are >>>>>>>>>>>>>> > > "accepted". >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced >>>>>>>>>>>>>> data type of the >>>>>>>>>>>>>> > > new source, or just the return type of all the agg >>>>>>>>>>>>>> functions? >>>>>>>>>>>>>> > > I would prefer to `producedDataType` just like >>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users >>>>>>>>>>>>>> to concat a >>>>>>>>>>>>>> > final >>>>>>>>>>>>>> > > output type. >>>>>>>>>>>>>> > > The return type of each agg function can be obtained >>>>>>>>>>>>>> from the >>>>>>>>>>>>>> > > `CallExpression`. >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to >>>>>>>>>>>>>> `grouping` or >>>>>>>>>>>>>> > > `groupedFields` ? >>>>>>>>>>>>>> > > The `groupSet` may confuse users that it relates to >>>>>>>>>>>>>> "grouping sets". >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > What do you think? >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > Best, >>>>>>>>>>>>>> > > Jark >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <ykt...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > >> Sorry for the typo -_-! >>>>>>>>>>>>>> > >> I meant idea #2. >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> > >> Best, >>>>>>>>>>>>>> > >> Kurt >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu < >>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>> > >> wrote: >>>>>>>>>>>>>> > >> >>>>>>>>>>>>>> > >>> Hi Kurt, >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is >>>>>>>>>>>>>> more like a >>>>>>>>>>>>>> > >>> physical operator rather than logical >>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2 >>>>>>>>>>>>>> which handle all in >>>>>>>>>>>>>> > >>> the physical optimization phase? >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>> Looking forward for the further discussion. >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道: >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator >>>>>>>>>>>>>> rather than logical >>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1. >>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>> > >>>> Best, >>>>>>>>>>>>>> > >>>> Kurt >>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu < >>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>> > >>>> wrote: >>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable >>>>>>>>>>>>>> suggestions. >>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of >>>>>>>>>>>>>> upgrading the new >>>>>>>>>>>>>> > table >>>>>>>>>>>>>> > >>>> > source api, >>>>>>>>>>>>>> > >>>> > we really should consider the new interface for the >>>>>>>>>>>>>> new optimize >>>>>>>>>>>>>> > >>>> rule. If >>>>>>>>>>>>>> > >>>> > the new rule >>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it >>>>>>>>>>>>>> sooner or later. I >>>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>>> > >>>> > change to use >>>>>>>>>>>>>> > >>>> > the ability interface for the >>>>>>>>>>>>>> SupportsAggregatePushDown definition >>>>>>>>>>>>>> > in >>>>>>>>>>>>>> > >>>> above >>>>>>>>>>>>>> > >>>> > proposal. >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a >>>>>>>>>>>>>> better choice, and >>>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>>> > >>>> > resolved this >>>>>>>>>>>>>> > >>>> > comment in the proposal. >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC >>>>>>>>>>>>>> connector, as we don't >>>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>>> > >>>> > Druid connector >>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present. >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we >>>>>>>>>>>>>> should use idea 1 >>>>>>>>>>>>>> > >>>> or >>>>>>>>>>>>>> > >>>> > idea 2 in proposal. >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> > What do you think? After we reach the agreement on >>>>>>>>>>>>>> the proposal, >>>>>>>>>>>>>> > our >>>>>>>>>>>>>> > >>>> team >>>>>>>>>>>>>> > >>>> > can drive to >>>>>>>>>>>>>> > >>>> > complete this feature. >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 下午2:58写道: >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> > > Hi Sebastian, >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great >>>>>>>>>>>>>> improvement for >>>>>>>>>>>>>> > >>>> Flink >>>>>>>>>>>>>> > >>>> > SQL. >>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following >>>>>>>>>>>>>> thoughts: >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in >>>>>>>>>>>>>> 1.11 and >>>>>>>>>>>>>> > proposed >>>>>>>>>>>>>> > >>>> a new >>>>>>>>>>>>>> > >>>> > > set of DynamicTableSource interfaces. Could you >>>>>>>>>>>>>> update your >>>>>>>>>>>>>> > >>>> proposal to >>>>>>>>>>>>>> > >>>> > > use the new interfaces? >>>>>>>>>>>>>> > >>>> > > Follow the existing ability interfaces, e.g. >>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown. >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a >>>>>>>>>>>>>> better >>>>>>>>>>>>>> > >>>> representation >>>>>>>>>>>>>> > >>>> > than >>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, >>>>>>>>>>>>>> it would be >>>>>>>>>>>>>> > easier >>>>>>>>>>>>>> > >>>> to >>>>>>>>>>>>>> > >>>> > know >>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments. >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors >>>>>>>>>>>>>> will be supported >>>>>>>>>>>>>> > in >>>>>>>>>>>>>> > >>>> the >>>>>>>>>>>>>> > >>>> > > plan? >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > > Best, >>>>>>>>>>>>>> > >>>> > > Jark >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu < >>>>>>>>>>>>>> > liuyang0...@gmail.com> >>>>>>>>>>>>>> > >>>> > wrote: >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > > > Hi all, >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink >>>>>>>>>>>>>> Planner. >>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently >>>>>>>>>>>>>> fully done at Flink >>>>>>>>>>>>>> > >>>> layer. >>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream >>>>>>>>>>>>>> storage of Flink >>>>>>>>>>>>>> > >>>> SQL >>>>>>>>>>>>>> > >>>> > has >>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator. >>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will >>>>>>>>>>>>>> improve >>>>>>>>>>>>>> > >>>> performance >>>>>>>>>>>>>> > >>>> > from >>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and >>>>>>>>>>>>>> computation overhead. >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new feature. >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>> > >>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome. >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > -- >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> > >>>> > > > *With kind regards >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋 >>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese >>>>>>>>>>>>>> Academy of Science >>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com < >>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>> > >>>> > > > QQ: 3239559* >>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> > -- >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> > *With kind regards >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋 >>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy >>>>>>>>>>>>>> of Science >>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>> > >>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>> > QQ: 3239559* >>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>> -- >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>> *With kind regards >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋 >>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of >>>>>>>>>>>>>> Science >>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>>> > >>> QQ: 3239559* >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > -- >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > *With kind regards >>>>>>>>>>>>>> > ------------------------------------------------------------ >>>>>>>>>>>>>> > Sebastian Liu 刘洋 >>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of >>>>>>>>>>>>>> Science >>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>>> > QQ: 3239559* >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> >>>>>>>>>>>>> *With kind regards >>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>> Sebastian Liu 刘洋 >>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science >>>>>>>>>>>>> Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>> QQ: 3239559* >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best, Jingsong Lee >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best, Jingsong Lee >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best, Jingsong Lee >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> *With kind regards >>>>>> ------------------------------------------------------------ >>>>>> Sebastian Liu 刘洋 >>>>>> Institute of Computing Technology, Chinese Academy of Science >>>>>> Mobile\WeChat: +86—15201613655 >>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>> QQ: 3239559* >>>>>> >>>>>> >>>> >>>> -- >>>> >>>> *With kind regards >>>> ------------------------------------------------------------ >>>> Sebastian Liu 刘洋 >>>> Institute of Computing Technology, Chinese Academy of Science >>>> Mobile\WeChat: +86—15201613655 >>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>> QQ: 3239559* >>>> >>>> >>> >>> -- >>> Best, Jingsong Lee >>> >> > > -- > > *With kind regards > ------------------------------------------------------------ > Sebastian Liu 刘洋 > Institute of Computing Technology, Chinese Academy of Science > Mobile\WeChat: +86—15201613655 > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> > QQ: 3239559* > >