Hi Jark, Cool, following your suggestions I have created three related subtasks under Flink-20791. Hope to assign these subtasks to me too, when you have time. And I will push forward the relevant implementation.
Jark Wu <imj...@gmail.com> 于2021年1月8日周五 下午12:30写道: > Hi Sebastian, > > I assigned the issue to you. But I suggest creating sub-tasks under this > issue. Because I think this would be a big contribution. > For example, you can split it into: > 1. Introduce SupportsAggregatePushDown interface > 2. Support SupportsAggregatePushDown in planner > 3. Support SupportsAggregatePushDown for JDBC source > 4. ... > > Best, > Jark > > On Thu, 7 Jan 2021 at 23:27, Sebastian Liu <liuyang0...@gmail.com> wrote: > >> Hi Jark, >> >> Seems that we have reached the agreement on the proposal. Could you >> please help to assign the below jira ticket to me? >> https://issues.apache.org/jira/browse/FLINK-20791 >> >> Jark Wu <imj...@gmail.com> 于2021年1月7日周四 上午10:25写道: >> >>> Thanks for updating the design doc. >>> It looks good to me. >>> >>> Best, >>> Jark >>> >>> On Thu, 7 Jan 2021 at 10:16, Jingsong Li <jingsongl...@gmail.com> wrote: >>> >>>> Sounds good to me. >>>> >>>> We don't have to worry about future changes, because it has covered all >>>> the capabilities of calcite aggregation. >>>> >>>> Best, >>>> Jingsong >>>> >>>> On Thu, Jan 7, 2021 at 12:14 AM Sebastian Liu <liuyang0...@gmail.com> >>>> wrote: >>>> >>>>> Hi Jark, >>>>> >>>>> Sounds good to me. For better scalability in the future, we could add >>>>> the AggregateExpression. >>>>> ``` >>>>> >>>>> public class AggregateExpression implements ResolvedExpression { >>>>> >>>>> private final FunctionDefinition functionDefinition; >>>>> >>>>> private final List<FieldReferenceExpression> args; >>>>> >>>>> private final @Nullable CallExpression filterExpression; >>>>> >>>>> private final DataType resultType; >>>>> >>>>> private final boolean distinct; >>>>> >>>>> private final boolean approximate; >>>>> >>>>> >>>>> >>>>> private final boolean ignoreNulls; >>>>> >>>>> } >>>>> ``` >>>>> >>>>> And we really only need one GroupingSets parameter for grouping. I >>>>> have updated the related interface in the proposal. >>>>> Appreciate the continued feedback and help. >>>>> >>>>> Jark Wu <imj...@gmail.com> 于2021年1月6日周三 下午9:34写道: >>>>> >>>>>> Hi Liu, Jingsong, >>>>>> >>>>>> Regarding the agg with filter, I think in theory we can support >>>>>> pushing such a pattern into source. >>>>>> We don't need to support it in the first version, but in the long >>>>>> term, we can support it. >>>>>> The designed interface should be future proof. >>>>>> >>>>>> Considering filter arg and distinct flag should be part of the >>>>>> aggregate expression. >>>>>> I'm wondering if CallExpression is a good representation for it. >>>>>> What do you think about proposing the following `AggregateExpression` >>>>>> to replace the `CallExpression`? >>>>>> >>>>>> class AggregateExpression implements ResolvedExpression { >>>>>> private final FunctionDefinition functionDefinition; >>>>>> private final List<FieldReferenceExpression> args; >>>>>> private final @Nullable CallExpression filterExpr; >>>>>> private final boolean distinct; >>>>>> } >>>>>> >>>>>> Besides, we don't need both groupingFields and groupingSets. >>>>>> `groupingSets` should be a superset of groupingFields. >>>>>> Then the interface of SupportsAggregatePushDown can be: >>>>>> >>>>>> interface SupportsAggregatePushDown { >>>>>> >>>>>> boolean applyAggregates( >>>>>> List<int[]> groupingSets, >>>>>> List<AggregateExpression> aggregates, >>>>>> DataType producedDataType); >>>>>> } >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> On Wed, 6 Jan 2021 at 19:56, Sebastian Liu <liuyang0...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Jingsong, Jark, >>>>>>> >>>>>>> Thx so much for our discussion, and the cases mentioned above are >>>>>>> really worthy for further discussion. >>>>>>> >>>>>>> 1. For aggregate with filter expressions: eg: select COUNT(1) >>>>>>> FILTER(WHERE cc_call_center_sk > 3) from call_center; >>>>>>> For the current Blink Planner, the optimized plan will be: >>>>>>> TableSourceScan -> Calc(IS TRUE(>(cc_call_center_sk, 3))) -> >>>>>>> LocalAgg -> Exchange -> FinalAgg. >>>>>>> As there is a Calc above the TableSource, this pattern can't match >>>>>>> the LocalAggPushDownRule in the current design. >>>>>>> >>>>>>> 2. For the grouping set or rollup use case: eg: select COUNT(1) from >>>>>>> call_center group by rollup(cc_class, cc_employees); >>>>>>> For the current Blink Planner, the optimized plan will be: >>>>>>> TableSourceScan -> Expand -> LocalAgg -> Exchange -> FinalAgg -> >>>>>>> Calc. >>>>>>> It's also not covered by the current LocalAggPushDownRule design. >>>>>>> >>>>>>> 3. I want to add a case which we haven't discussed yet. >>>>>>> Aggregate with Having clause. >>>>>>> eg: select COUNT(1) from call_center group by cc_class having >>>>>>> max(cc_tax_percentage) > 0.2; >>>>>>> For the current Blink Planner, the optimized plan will be: >>>>>>> TableSourceScan -> LocalAgg -> Exchange -> FinalAgg -> >>>>>>> Calc(where=[>($f2, 0.2:DECIMAL(2, 1))]). >>>>>>> >>>>>>> The core discussion points are summarized as follows: >>>>>>> a) Aggregate is a more complex scenario than predicates or limits, >>>>>>> and also depends on the different underlying storages. >>>>>>> b) One rule seems can't completely cover all aggregate scenario, but >>>>>>> whether SupportSAggregatePushDown interface can be a bit more general? >>>>>>> c) Could the CallExpression express the semantics of CalCite >>>>>>> AggregateCall? >>>>>>> >>>>>>> IMO: Completely push down aggregate is generally hard for >>>>>>> distributed systems. Usually we need a GROUP BY and exactly >>>>>>> matches the partition mode in downstream storage. At the same time, >>>>>>> the benefit of remove the final aggregate is actually limited. >>>>>>> The LocalAggPushDown generally yields more than 80% of the CPU and >>>>>>> IO benefits. But I also agree that >>>>>>> the SupportsAggregatePushDown interface should be as generic as >>>>>>> possible for future extensions, and meanwhile keep confidence >>>>>>> in the interface we design. >>>>>>> >>>>>>> For core points (a): As the complexity of aggregate, one >>>>>>> LogicalAggregate node may extend to "Expand / Calc / LocalXXAgg / >>>>>>> Exchange >>>>>>> / FinalXXAgg" >>>>>>> in physical phase. Seems that we can't solve all cases with only one >>>>>>> rule. So I suggest PushLocalAggIntoTableSourceScanRule focus only >>>>>>> on the pattern of TableSourceScan + LocalXXAggregate at present. >>>>>>> >>>>>>> For core points (b & c): I think we can change the interface to be: >>>>>>> ``` >>>>>>> >>>>>>> boolean applyAggregates(int[] groupingFields, List<CallExpression> >>>>>>> aggregateExpressions, DataType producedDataType, List<int[]> >>>>>>> groupingSets); >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> >>>>>>> Simple Group: groupingSets.size() == 1 && >>>>>>> groupingSets.get(0).equals(groupingFields) >>>>>>> >>>>>>> Cube Group: groupingSets.size() == IntMath.pow(2, >>>>>>> groupingFields.cardinality()) >>>>>>> Rollup: >>>>>>> Refernece org.apache.calcite.rel.core.Aggregate.Group#isRollup >>>>>>> >>>>>>> Then we can handle the complex grouping case. The Connector >>>>>>> developer of the downstream storage should determine >>>>>>> whether it supports the associated grouping type. For the filter and >>>>>>> having clause, they will convert to be related Calc RelNode, >>>>>>> and no longer in the LocalAggregate node, the CallExpression may be >>>>>>> sufficient to express the semantics of AggregateCall. >>>>>>> >>>>>>> What do you think? Looking forward to our further discussion. >>>>>>> >>>>>>> >>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月6日周三 下午2:24写道: >>>>>>> >>>>>>>> > I think filter expressions and grouping sets are semantic >>>>>>>> arguments instead of utilities. If we want to push them into sources, >>>>>>>> the >>>>>>>> connector developers should be aware of them.Wrapping them in a context >>>>>>>> implicitly is error-prone that the existing connector will produce >>>>>>>> wrong >>>>>>>> results when upgrading to new Flink versions. >>>>>>>> >>>>>>>> We can have some mechanism to check the upgrading. >>>>>>>> >>>>>>>> > I think for these cases, providing a new default method to >>>>>>>> override might be a better choice. >>>>>>>> >>>>>>>> Then we will have three or more methods. For the API level, I >>>>>>>> really don't like it... >>>>>>>> >>>>>>>> Best, >>>>>>>> Jingsong >>>>>>>> >>>>>>>> On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I think filter expressions and grouping sets are semantic >>>>>>>>> arguments instead of utilities. >>>>>>>>> If we want to push them into sources, the connector developers >>>>>>>>> should be aware of them. >>>>>>>>> Wrapping them in a context implicitly is error-prone that the >>>>>>>>> existing connector will produce wrong results >>>>>>>>> when upgrading to new Flink versions (as we are pushing >>>>>>>>> grouping_sets/filter_args, but connector ignores it). >>>>>>>>> I think for these cases, providing a new default method to >>>>>>>>> override might be a better choice. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jark >>>>>>>>> >>>>>>>>> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I'm also curious about aggregate with filter (COUNT(1) >>>>>>>>>> FILTER(WHERE d > 1)). Can we push it down? I'm not sure that a >>>>>>>>>> single call >>>>>>>>>> expression can express it, and how we should embody it and convey it >>>>>>>>>> to >>>>>>>>>> users. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jingsong >>>>>>>>>> >>>>>>>>>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li < >>>>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Jark, >>>>>>>>>>> >>>>>>>>>>> I don't want to limit this interface to LocalAgg Push down. >>>>>>>>>>> Actually, sometimes, we can push whole aggregation to source too. >>>>>>>>>>> >>>>>>>>>>> So, this rule can do something more advanced. For example, we >>>>>>>>>>> can push down group sets to source too, for the SQL: "GROUP BY >>>>>>>>>>> GROUPING >>>>>>>>>>> SETS (f1, f2)". Then, we need to add more information to push down. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jingsong >>>>>>>>>>> >>>>>>>>>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> I think this may be over designed. We should have confidence in >>>>>>>>>>>> the interface we design, the interface should be stable. >>>>>>>>>>>> Wrapping things in a big context has a cost of losing user >>>>>>>>>>>> convenience. >>>>>>>>>>>> Foremost, we don't see any parameters to add in the future. Do >>>>>>>>>>>> you know any potential parameters? >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Jark >>>>>>>>>>>> >>>>>>>>>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li < >>>>>>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Sebastian, >>>>>>>>>>>>> >>>>>>>>>>>>> Well, I mean: >>>>>>>>>>>>> >>>>>>>>>>>>> `boolean applyAggregates(int[] groupingFields, >>>>>>>>>>>>> List<CallExpression> aggregateExpressions, DataType >>>>>>>>>>>>> producedDataType);` >>>>>>>>>>>>> VS >>>>>>>>>>>>> ``` >>>>>>>>>>>>> boolean applyAggregates(Aggregation agg); >>>>>>>>>>>>> >>>>>>>>>>>>> interface Aggregation { >>>>>>>>>>>>> int[] groupingFields(); >>>>>>>>>>>>> List<CallExpression> aggregateExpressions(); >>>>>>>>>>>>> DataType producedDataType(); >>>>>>>>>>>>> } >>>>>>>>>>>>> ``` >>>>>>>>>>>>> >>>>>>>>>>>>> Maybe I've over considered it, but I think Aggregation is a >>>>>>>>>>>>> complicated thing. Maybe we need to extend its parameters in the >>>>>>>>>>>>> future, so >>>>>>>>>>>>> make the parameters interface, which is conducive to the future >>>>>>>>>>>>> expansion >>>>>>>>>>>>> without destroying the compatibility of user implementation. If >>>>>>>>>>>>> it is the >>>>>>>>>>>>> way before, users need to modify the code. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Jingsong >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu < >>>>>>>>>>>>> liuyang0...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Jinsong, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thx a lot for your suggestion. These points really need to be >>>>>>>>>>>>>> clear in the proposal. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For the semantic problem, I think the main point is the >>>>>>>>>>>>>> different returned data types >>>>>>>>>>>>>> for the target aggregate function and the row format returned >>>>>>>>>>>>>> by the underlying storage. >>>>>>>>>>>>>> That's why we provide the producedDataType in the >>>>>>>>>>>>>> SupportsAggregatePushDown interface. >>>>>>>>>>>>>> Need to let developers know that we need to handle the >>>>>>>>>>>>>> semantic differences between >>>>>>>>>>>>>> the underlying storage system and Flink in related >>>>>>>>>>>>>> connectors. [Supplemented in proposal] >>>>>>>>>>>>>> >>>>>>>>>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule >>>>>>>>>>>>>> rule, it's also a key point. >>>>>>>>>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE >>>>>>>>>>>>>> rule set, and better to put it >>>>>>>>>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal] >>>>>>>>>>>>>> >>>>>>>>>>>>>> For the scalability of the interface, actually I don't >>>>>>>>>>>>>> exactly understand your suggestion. Is it to add >>>>>>>>>>>>>> an abstract class, to implement the SupportsAggregatePushDown >>>>>>>>>>>>>> interface, and holds the >>>>>>>>>>>>>> `List < CallExpression > aggregateExpressions, int[] >>>>>>>>>>>>>> GroupingFields, DataType producedDataType` >>>>>>>>>>>>>> fields? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Looking forward to your further feedback or guidance. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for your proposal! Sebastian. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful >>>>>>>>>>>>>>> discussion has solved >>>>>>>>>>>>>>> many of my concerns. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ## Semantic problems >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We may need to add some mechanisms or comments, because as >>>>>>>>>>>>>>> far as I know, >>>>>>>>>>>>>>> the semantics of each database is actually different, which >>>>>>>>>>>>>>> may need to be >>>>>>>>>>>>>>> reflected in your specific implementation. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For example, the AVG output types of various databases may >>>>>>>>>>>>>>> be different. >>>>>>>>>>>>>>> For example, MySQL outputs double, this is different from >>>>>>>>>>>>>>> Flink. What >>>>>>>>>>>>>>> should we do? (Lucky, avg will be splitted into sum and >>>>>>>>>>>>>>> count, But we also >>>>>>>>>>>>>>> need care about decimal and others) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ## The phase of push-down rule >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I strongly recommend that you do not put it in the Volcano >>>>>>>>>>>>>>> phase, which may >>>>>>>>>>>>>>> make the cost calculation very troublesome. >>>>>>>>>>>>>>> So in PHYSICAL_REWRITE? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ## About interface >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For scalability, I slightly recommend that we introduce an >>>>>>>>>>>>>>> `Aggregate` >>>>>>>>>>>>>>> interface, it contains `List<CallExpression> >>>>>>>>>>>>>>> aggregateExpressions, int[] >>>>>>>>>>>>>>> groupingFields, DataType producedDataType` fields. In this >>>>>>>>>>>>>>> way, we can add >>>>>>>>>>>>>>> fields easily without breaking compatibility. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think the current design is very good, just put forward >>>>>>>>>>>>>>> some ideas. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Jingsong >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu < >>>>>>>>>>>>>>> liuyang0...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > Hi Jark, >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Thx for your further feedback and help. The interface of >>>>>>>>>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the >>>>>>>>>>>>>>> TableSource can >>>>>>>>>>>>>>> > handle all of the aggregates. >>>>>>>>>>>>>>> > It's better to just return a boolean type to indicate >>>>>>>>>>>>>>> whether all of >>>>>>>>>>>>>>> > aggregates push down was successful or not. [Resolved in >>>>>>>>>>>>>>> proposal] >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > For (2) Agree: The aggOutputDataType represent the >>>>>>>>>>>>>>> produced data type of >>>>>>>>>>>>>>> > the new table source to make sure that the new table >>>>>>>>>>>>>>> source can >>>>>>>>>>>>>>> > connect with the related exchange node. The format of this >>>>>>>>>>>>>>> > aggOutputDataType is groupedFields's type + agg function's >>>>>>>>>>>>>>> return type. >>>>>>>>>>>>>>> > The reason for adding this parameter in this function is >>>>>>>>>>>>>>> also to facilitate >>>>>>>>>>>>>>> > the user to build the final output type. I have changed >>>>>>>>>>>>>>> this parameter >>>>>>>>>>>>>>> > to be producedDataType. [Resolved in proposal] >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have >>>>>>>>>>>>>>> changed to use >>>>>>>>>>>>>>> > groupingFields. [Resolved in proposal] >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Thx again for the suggestion, looking for the further >>>>>>>>>>>>>>> discussion. >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道: >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > > I'm also +1 for idea#2. >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > Regarding to the updated interface, >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > Result applyAggregates(List<CallExpression> >>>>>>>>>>>>>>> aggregateExpressions, >>>>>>>>>>>>>>> > > int[] groupSet, DataType aggOutputDataType); >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > final class Result { >>>>>>>>>>>>>>> > > private final List<CallExpression> >>>>>>>>>>>>>>> acceptedAggregates; >>>>>>>>>>>>>>> > > private final List<CallExpression> >>>>>>>>>>>>>>> remainingAggregates; >>>>>>>>>>>>>>> > > } >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > I have following comments: >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > 1) Do we need the composite Result return type? Is a >>>>>>>>>>>>>>> boolean return type >>>>>>>>>>>>>>> > > enough? >>>>>>>>>>>>>>> > > From my understanding, all of the aggregates should >>>>>>>>>>>>>>> be accepted, >>>>>>>>>>>>>>> > > otherwise the pushdown should fail. >>>>>>>>>>>>>>> > > Therefore, users don't need to distinguish which >>>>>>>>>>>>>>> aggregates are >>>>>>>>>>>>>>> > > "accepted". >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced >>>>>>>>>>>>>>> data type of the >>>>>>>>>>>>>>> > > new source, or just the return type of all the agg >>>>>>>>>>>>>>> functions? >>>>>>>>>>>>>>> > > I would prefer to `producedDataType` just like >>>>>>>>>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users >>>>>>>>>>>>>>> to concat a >>>>>>>>>>>>>>> > final >>>>>>>>>>>>>>> > > output type. >>>>>>>>>>>>>>> > > The return type of each agg function can be obtained >>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>> > > `CallExpression`. >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > 3) What do you think about renaming `groupSet` to >>>>>>>>>>>>>>> `grouping` or >>>>>>>>>>>>>>> > > `groupedFields` ? >>>>>>>>>>>>>>> > > The `groupSet` may confuse users that it relates to >>>>>>>>>>>>>>> "grouping sets". >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > What do you think? >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > Best, >>>>>>>>>>>>>>> > > Jark >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young < >>>>>>>>>>>>>>> ykt...@gmail.com> wrote: >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > >> Sorry for the typo -_-! >>>>>>>>>>>>>>> > >> I meant idea #2. >>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> > >> Best, >>>>>>>>>>>>>>> > >> Kurt >>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu < >>>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>>> > >> wrote: >>>>>>>>>>>>>>> > >> >>>>>>>>>>>>>>> > >>> Hi Kurt, >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is >>>>>>>>>>>>>>> more like a >>>>>>>>>>>>>>> > >>> physical operator rather than logical >>>>>>>>>>>>>>> > >>> operator, I think your suggestion should be idea #2 >>>>>>>>>>>>>>> which handle all in >>>>>>>>>>>>>>> > >>> the physical optimization phase? >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>> Looking forward for the further discussion. >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道: >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>>> Local aggregation is more like a physical operator >>>>>>>>>>>>>>> rather than logical >>>>>>>>>>>>>>> > >>>> operator. I would suggest going with idea #1. >>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>> > >>>> Best, >>>>>>>>>>>>>>> > >>>> Kurt >>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu < >>>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>>> > >>>> wrote: >>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and >>>>>>>>>>>>>>> valuable suggestions. >>>>>>>>>>>>>>> > >>>> > For (1): Agree: Since we are in the period of >>>>>>>>>>>>>>> upgrading the new >>>>>>>>>>>>>>> > table >>>>>>>>>>>>>>> > >>>> > source api, >>>>>>>>>>>>>>> > >>>> > we really should consider the new interface for the >>>>>>>>>>>>>>> new optimize >>>>>>>>>>>>>>> > >>>> rule. If >>>>>>>>>>>>>>> > >>>> > the new rule >>>>>>>>>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it >>>>>>>>>>>>>>> sooner or later. I >>>>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>>>> > >>>> > change to use >>>>>>>>>>>>>>> > >>>> > the ability interface for the >>>>>>>>>>>>>>> SupportsAggregatePushDown definition >>>>>>>>>>>>>>> > in >>>>>>>>>>>>>>> > >>>> above >>>>>>>>>>>>>>> > >>>> > proposal. >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a >>>>>>>>>>>>>>> better choice, and >>>>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>>>> > >>>> > resolved this >>>>>>>>>>>>>>> > >>>> > comment in the proposal. >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> > For (3): I suggest we first support the JDBC >>>>>>>>>>>>>>> connector, as we don't >>>>>>>>>>>>>>> > >>>> have >>>>>>>>>>>>>>> > >>>> > Druid connector >>>>>>>>>>>>>>> > >>>> > and ES connector just has sink api at present. >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> > But perhaps the biggest question may be whether we >>>>>>>>>>>>>>> should use idea 1 >>>>>>>>>>>>>>> > >>>> or >>>>>>>>>>>>>>> > >>>> > idea 2 in proposal. >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> > What do you think? After we reach the agreement on >>>>>>>>>>>>>>> the proposal, >>>>>>>>>>>>>>> > our >>>>>>>>>>>>>>> > >>>> team >>>>>>>>>>>>>>> > >>>> > can drive to >>>>>>>>>>>>>>> > >>>> > complete this feature. >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 下午2:58写道: >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> > > Hi Sebastian, >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great >>>>>>>>>>>>>>> improvement for >>>>>>>>>>>>>>> > >>>> Flink >>>>>>>>>>>>>>> > >>>> > SQL. >>>>>>>>>>>>>>> > >>>> > > I went through the design doc and have following >>>>>>>>>>>>>>> thoughts: >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in >>>>>>>>>>>>>>> 1.11 and >>>>>>>>>>>>>>> > proposed >>>>>>>>>>>>>>> > >>>> a new >>>>>>>>>>>>>>> > >>>> > > set of DynamicTableSource interfaces. Could you >>>>>>>>>>>>>>> update your >>>>>>>>>>>>>>> > >>>> proposal to >>>>>>>>>>>>>>> > >>>> > > use the new interfaces? >>>>>>>>>>>>>>> > >>>> > > Follow the existing ability interfaces, e.g. >>>>>>>>>>>>>>> > >>>> > > SupportsFilterPushDown, >>>>>>>>>>>>>>> SupportsProjectionPushDown. >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a >>>>>>>>>>>>>>> better >>>>>>>>>>>>>>> > >>>> representation >>>>>>>>>>>>>>> > >>>> > than >>>>>>>>>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, >>>>>>>>>>>>>>> it would be >>>>>>>>>>>>>>> > easier >>>>>>>>>>>>>>> > >>>> to >>>>>>>>>>>>>>> > >>>> > know >>>>>>>>>>>>>>> > >>>> > > what's the index and type of the arguments. >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > > 3) It would be better to list which connectors >>>>>>>>>>>>>>> will be supported >>>>>>>>>>>>>>> > in >>>>>>>>>>>>>>> > >>>> the >>>>>>>>>>>>>>> > >>>> > > plan? >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > > Best, >>>>>>>>>>>>>>> > >>>> > > Jark >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu < >>>>>>>>>>>>>>> > liuyang0...@gmail.com> >>>>>>>>>>>>>>> > >>>> > wrote: >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > > > Hi all, >>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink >>>>>>>>>>>>>>> Planner. >>>>>>>>>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently >>>>>>>>>>>>>>> fully done at Flink >>>>>>>>>>>>>>> > >>>> layer. >>>>>>>>>>>>>>> > >>>> > > > With the developing of storage, many downstream >>>>>>>>>>>>>>> storage of Flink >>>>>>>>>>>>>>> > >>>> SQL >>>>>>>>>>>>>>> > >>>> > has >>>>>>>>>>>>>>> > >>>> > > > the ability to deal with Aggregation operator. >>>>>>>>>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer >>>>>>>>>>>>>>> will improve >>>>>>>>>>>>>>> > >>>> performance >>>>>>>>>>>>>>> > >>>> > from >>>>>>>>>>>>>>> > >>>> > > > the perspective of the network IO and >>>>>>>>>>>>>>> computation overhead. >>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>> > >>>> > > > I have drafted a design doc for this new >>>>>>>>>>>>>>> feature. >>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing >>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>> > >>>> > > > Any comment or discussion is welcome. >>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>> > >>>> > > > -- >>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>> > >>>> > > > *With kind regards >>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>>> > >>>> > > > Sebastian Liu 刘洋 >>>>>>>>>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese >>>>>>>>>>>>>>> Academy of Science >>>>>>>>>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com < >>>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>>> > >>>> > > > QQ: 3239559* >>>>>>>>>>>>>>> > >>>> > > > >>>>>>>>>>>>>>> > >>>> > > >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> > -- >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> > *With kind regards >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>>> > >>>> > Sebastian Liu 刘洋 >>>>>>>>>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy >>>>>>>>>>>>>>> of Science >>>>>>>>>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>>> > >>>> > E-mail: liuyang0...@gmail.com < >>>>>>>>>>>>>>> liuyang0...@gmail.com> >>>>>>>>>>>>>>> > >>>> > QQ: 3239559* >>>>>>>>>>>>>>> > >>>> > >>>>>>>>>>>>>>> > >>>> >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>> -- >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>> *With kind regards >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>>> > >>> Sebastian Liu 刘洋 >>>>>>>>>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of >>>>>>>>>>>>>>> Science >>>>>>>>>>>>>>> > >>> Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>>>> > >>> QQ: 3239559* >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>> >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > -- >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > *With kind regards >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>>> > Sebastian Liu 刘洋 >>>>>>>>>>>>>>> > Institute of Computing Technology, Chinese Academy of >>>>>>>>>>>>>>> Science >>>>>>>>>>>>>>> > Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>>>> > QQ: 3239559* >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> >>>>>>>>>>>>>> *With kind regards >>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>> Sebastian Liu 刘洋 >>>>>>>>>>>>>> Institute of Computing Technology, Chinese Academy of Science >>>>>>>>>>>>>> Mobile\WeChat: +86—15201613655 >>>>>>>>>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>>>>>>>>> QQ: 3239559* >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best, Jingsong Lee >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best, Jingsong Lee >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> *With kind regards >>>>>>> ------------------------------------------------------------ >>>>>>> Sebastian Liu 刘洋 >>>>>>> Institute of Computing Technology, Chinese Academy of Science >>>>>>> Mobile\WeChat: +86—15201613655 >>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>> QQ: 3239559* >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> >>>>> *With kind regards >>>>> ------------------------------------------------------------ >>>>> Sebastian Liu 刘洋 >>>>> Institute of Computing Technology, Chinese Academy of Science >>>>> Mobile\WeChat: +86—15201613655 >>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>> QQ: 3239559* >>>>> >>>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>> >> >> -- >> >> *With kind regards >> ------------------------------------------------------------ >> Sebastian Liu 刘洋 >> Institute of Computing Technology, Chinese Academy of Science >> Mobile\WeChat: +86—15201613655 >> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >> QQ: 3239559* >> >> -- *With kind regards ------------------------------------------------------------ Sebastian Liu 刘洋 Institute of Computing Technology, Chinese Academy of Science Mobile\WeChat: +86—15201613655 E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> QQ: 3239559*