> I think filter expressions and grouping sets are semantic arguments instead of utilities. If we want to push them into sources, the connector developers should be aware of them.Wrapping them in a context implicitly is error-prone that the existing connector will produce wrong results when upgrading to new Flink versions.
We can have some mechanism to check the upgrading. > I think for these cases, providing a new default method to override might be a better choice. Then we will have three or more methods. For the API level, I really don't like it... Best, Jingsong On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote: > I think filter expressions and grouping sets are semantic arguments > instead of utilities. > If we want to push them into sources, the connector developers should be > aware of them. > Wrapping them in a context implicitly is error-prone that the existing > connector will produce wrong results > when upgrading to new Flink versions (as we are pushing > grouping_sets/filter_args, but connector ignores it). > I think for these cases, providing a new default method to override might > be a better choice. > > Best, > Jark > > On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com> wrote: > >> Hi, >> >> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d > >> 1)). Can we push it down? I'm not sure that a single call expression can >> express it, and how we should embody it and convey it to users. >> >> Best, >> Jingsong >> >> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> >>> Hi Jark, >>> >>> I don't want to limit this interface to LocalAgg Push down. Actually, >>> sometimes, we can push whole aggregation to source too. >>> >>> So, this rule can do something more advanced. For example, we can push >>> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1, >>> f2)". Then, we need to add more information to push down. >>> >>> Best, >>> Jingsong >>> >>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com> wrote: >>> >>>> I think this may be over designed. We should have confidence in the >>>> interface we design, the interface should be stable. >>>> Wrapping things in a big context has a cost of losing user convenience. >>>> Foremost, we don't see any parameters to add in the future. Do you know >>>> any potential parameters? >>>> >>>> Best, >>>> Jark >>>> >>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <jingsongl...@gmail.com> >>>> wrote: >>>> >>>>> Hi Sebastian, >>>>> >>>>> Well, I mean: >>>>> >>>>> `boolean applyAggregates(int[] groupingFields, List<CallExpression> >>>>> aggregateExpressions, DataType producedDataType);` >>>>> VS >>>>> ``` >>>>> boolean applyAggregates(Aggregation agg); >>>>> >>>>> interface Aggregation { >>>>> int[] groupingFields(); >>>>> List<CallExpression> aggregateExpressions(); >>>>> DataType producedDataType(); >>>>> } >>>>> ``` >>>>> >>>>> Maybe I've over considered it, but I think Aggregation is a >>>>> complicated thing. Maybe we need to extend its parameters in the future, >>>>> so >>>>> make the parameters interface, which is conducive to the future expansion >>>>> without destroying the compatibility of user implementation. If it is the >>>>> way before, users need to modify the code. >>>>> >>>>> Best, >>>>> Jingsong >>>>> >>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <liuyang0...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Jinsong, >>>>>> >>>>>> Thx a lot for your suggestion. These points really need to be clear >>>>>> in the proposal. >>>>>> >>>>>> For the semantic problem, I think the main point is the different >>>>>> returned data types >>>>>> for the target aggregate function and the row format returned by the >>>>>> underlying storage. >>>>>> That's why we provide the producedDataType in the >>>>>> SupportsAggregatePushDown interface. >>>>>> Need to let developers know that we need to handle the semantic >>>>>> differences between >>>>>> the underlying storage system and Flink in related connectors. >>>>>> [Supplemented in proposal] >>>>>> >>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule, >>>>>> it's also a key point. >>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule >>>>>> set, and better to put it >>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal] >>>>>> >>>>>> For the scalability of the interface, actually I don't exactly >>>>>> understand your suggestion. Is it to add >>>>>> an abstract class, to implement the SupportsAggregatePushDown >>>>>> interface, and holds the >>>>>> `List < CallExpression > aggregateExpressions, int[] GroupingFields, >>>>>> DataType producedDataType` >>>>>> fields? >>>>>> >>>>>> Looking forward to your further feedback or guidance. >>>>>> >>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道: >>>>>> >>>>>>> Thanks for your proposal! Sebastian. >>>>>>> >>>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has >>>>>>> solved >>>>>>> many of my concerns. >>>>>>> >>>>>>> ## Semantic problems >>>>>>> >>>>>>> We may need to add some mechanisms or comments, because as far as I >>>>>>> know, >>>>>>> the semantics of each database is actually different, which may need >>>>>>> to be >>>>>>> reflected in your specific implementation. >>>>>>> >>>>>>> For example, the AVG output types of various databases may be >>>>>>> different. >>>>>>> For example, MySQL outputs double, this is different from Flink. What >>>>>>> should we do? (Lucky, avg will be splitted into sum and count, But >>>>>>> we also >>>>>>> need care about decimal and others) >>>>>>> >>>>>>> ## The phase of push-down rule >>>>>>> >>>>>>> I strongly recommend that you do not put it in the Volcano phase, >>>>>>> which may >>>>>>> make the cost calculation very troublesome. >>>>>>> So in PHYSICAL_REWRITE? >>>>>>> >>>>>>> ## About interface >>>>>>> >>>>>>> For scalability, I slightly recommend that we introduce an >>>>>>> `Aggregate` >>>>>>> interface, it contains `List<CallExpression> aggregateExpressions, >>>>>>> int[] >>>>>>> groupingFields, DataType producedDataType` fields. In this way, we >>>>>>> can add >>>>>>> fields easily without breaking compatibility. >>>>>>> >>>>>>> I think the current design is very good, just put forward some ideas. >>>>>>> >>>>>>> Best, >>>>>>> Jingsong >>>>>>> >>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <liuyang0...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> > Hi Jark, >>>>>>> > >>>>>>> > Thx for your further feedback and help. The interface of >>>>>>> > SupportsAggregatePushDown may indeed need some adjustments. >>>>>>> > >>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the >>>>>>> TableSource can >>>>>>> > handle all of the aggregates. >>>>>>> > It's better to just return a boolean type to indicate whether all >>>>>>> of >>>>>>> > aggregates push down was successful or not. [Resolved in proposal] >>>>>>> > >>>>>>> > For (2) Agree: The aggOutputDataType represent the produced data >>>>>>> type of >>>>>>> > the new table source to make sure that the new table source can >>>>>>> > connect with the related exchange node. The format of this >>>>>>> > aggOutputDataType is groupedFields's type + agg function's return >>>>>>> type. >>>>>>> > The reason for adding this parameter in this function is also to >>>>>>> facilitate >>>>>>> > the user to build the final output type. I have changed this >>>>>>> parameter >>>>>>> > to be producedDataType. [Resolved in proposal] >>>>>>> > >>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed >>>>>>> to use >>>>>>> > groupingFields. [Resolved in proposal] >>>>>>> > >>>>>>> > Thx again for the suggestion, looking for the further discussion. >>>>>>> > >>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道: >>>>>>> > >>>>>>> > > I'm also +1 for idea#2. >>>>>>> > > >>>>>>> > > Regarding to the updated interface, >>>>>>> > > >>>>>>> > > Result applyAggregates(List<CallExpression> aggregateExpressions, >>>>>>> > > int[] groupSet, DataType aggOutputDataType); >>>>>>> > > >>>>>>> > > final class Result { >>>>>>> > > private final List<CallExpression> acceptedAggregates; >>>>>>> > > private final List<CallExpression> remainingAggregates; >>>>>>> > > } >>>>>>> > > >>>>>>> > > I have following comments: >>>>>>> > > >>>>>>> > > 1) Do we need the composite Result return type? Is a boolean >>>>>>> return type >>>>>>> > > enough? >>>>>>> > > From my understanding, all of the aggregates should be >>>>>>> accepted, >>>>>>> > > otherwise the pushdown should fail. >>>>>>> > > Therefore, users don't need to distinguish which aggregates >>>>>>> are >>>>>>> > > "accepted". >>>>>>> > > >>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data type >>>>>>> of the >>>>>>> > > new source, or just the return type of all the agg functions? >>>>>>> > > I would prefer to `producedDataType` just like >>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to >>>>>>> concat a >>>>>>> > final >>>>>>> > > output type. >>>>>>> > > The return type of each agg function can be obtained from the >>>>>>> > > `CallExpression`. >>>>>>> > > >>>>>>> > > 3) What do you think about renaming `groupSet` to `grouping` or >>>>>>> > > `groupedFields` ? >>>>>>> > > The `groupSet` may confuse users that it relates to >>>>>>> "grouping sets". >>>>>>> > > >>>>>>> > > >>>>>>> > > What do you think? >>>>>>> > > >>>>>>> > > Best, >>>>>>> > > Jark >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <ykt...@gmail.com> >>>>>>> wrote: >>>>>>> > > >>>>>>> > >> Sorry for the typo -_-! >>>>>>> > >> I meant idea #2. >>>>>>> > >> >>>>>>> > >> Best, >>>>>>> > >> Kurt >>>>>>> > >> >>>>>>> > >> >>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu < >>>>>>> liuyang0...@gmail.com> >>>>>>> > >> wrote: >>>>>>> > >> >>>>>>> > >>> Hi Kurt, >>>>>>> > >>> >>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more like >>>>>>> a >>>>>>> > >>> physical operator rather than logical >>>>>>> > >>> operator, I think your suggestion should be idea #2 which >>>>>>> handle all in >>>>>>> > >>> the physical optimization phase? >>>>>>> > >>> >>>>>>> > >>> Looking forward for the further discussion. >>>>>>> > >>> >>>>>>> > >>> >>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道: >>>>>>> > >>> >>>>>>> > >>>> Local aggregation is more like a physical operator rather >>>>>>> than logical >>>>>>> > >>>> operator. I would suggest going with idea #1. >>>>>>> > >>>> >>>>>>> > >>>> Best, >>>>>>> > >>>> Kurt >>>>>>> > >>>> >>>>>>> > >>>> >>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu < >>>>>>> liuyang0...@gmail.com> >>>>>>> > >>>> wrote: >>>>>>> > >>>> >>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable >>>>>>> suggestions. >>>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading the >>>>>>> new >>>>>>> > table >>>>>>> > >>>> > source api, >>>>>>> > >>>> > we really should consider the new interface for the new >>>>>>> optimize >>>>>>> > >>>> rule. If >>>>>>> > >>>> > the new rule >>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or >>>>>>> later. I >>>>>>> > >>>> have >>>>>>> > >>>> > change to use >>>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown >>>>>>> definition >>>>>>> > in >>>>>>> > >>>> above >>>>>>> > >>>> > proposal. >>>>>>> > >>>> > >>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better >>>>>>> choice, and >>>>>>> > >>>> have >>>>>>> > >>>> > resolved this >>>>>>> > >>>> > comment in the proposal. >>>>>>> > >>>> > >>>>>>> > >>>> > For (3): I suggest we first support the JDBC connector, as >>>>>>> we don't >>>>>>> > >>>> have >>>>>>> > >>>> > Druid connector >>>>>>> > >>>> > and ES connector just has sink api at present. >>>>>>> > >>>> > >>>>>>> > >>>> > But perhaps the biggest question may be whether we should >>>>>>> use idea 1 >>>>>>> > >>>> or >>>>>>> > >>>> > idea 2 in proposal. >>>>>>> > >>>> > >>>>>>> > >>>> > What do you think? After we reach the agreement on the >>>>>>> proposal, >>>>>>> > our >>>>>>> > >>>> team >>>>>>> > >>>> > can drive to >>>>>>> > >>>> > complete this feature. >>>>>>> > >>>> > >>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 下午2:58写道: >>>>>>> > >>>> > >>>>>>> > >>>> > > Hi Sebastian, >>>>>>> > >>>> > > >>>>>>> > >>>> > > Thanks for the proposal. I think this is a great >>>>>>> improvement for >>>>>>> > >>>> Flink >>>>>>> > >>>> > SQL. >>>>>>> > >>>> > > I went through the design doc and have following thoughts: >>>>>>> > >>>> > > >>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and >>>>>>> > proposed >>>>>>> > >>>> a new >>>>>>> > >>>> > > set of DynamicTableSource interfaces. Could you update >>>>>>> your >>>>>>> > >>>> proposal to >>>>>>> > >>>> > > use the new interfaces? >>>>>>> > >>>> > > Follow the existing ability interfaces, e.g. >>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown. >>>>>>> > >>>> > > >>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a better >>>>>>> > >>>> representation >>>>>>> > >>>> > than >>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it would >>>>>>> be >>>>>>> > easier >>>>>>> > >>>> to >>>>>>> > >>>> > know >>>>>>> > >>>> > > what's the index and type of the arguments. >>>>>>> > >>>> > > >>>>>>> > >>>> > > 3) It would be better to list which connectors will be >>>>>>> supported >>>>>>> > in >>>>>>> > >>>> the >>>>>>> > >>>> > > plan? >>>>>>> > >>>> > > >>>>>>> > >>>> > > Best, >>>>>>> > >>>> > > Jark >>>>>>> > >>>> > > >>>>>>> > >>>> > > >>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu < >>>>>>> > liuyang0...@gmail.com> >>>>>>> > >>>> > wrote: >>>>>>> > >>>> > > >>>>>>> > >>>> > > > Hi all, >>>>>>> > >>>> > > > >>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink Planner. >>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully done >>>>>>> at Flink >>>>>>> > >>>> layer. >>>>>>> > >>>> > > > With the developing of storage, many downstream storage >>>>>>> of Flink >>>>>>> > >>>> SQL >>>>>>> > >>>> > has >>>>>>> > >>>> > > > the ability to deal with Aggregation operator. >>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will improve >>>>>>> > >>>> performance >>>>>>> > >>>> > from >>>>>>> > >>>> > > > the perspective of the network IO and computation >>>>>>> overhead. >>>>>>> > >>>> > > > >>>>>>> > >>>> > > > I have drafted a design doc for this new feature. >>>>>>> > >>>> > > > >>>>>>> > >>>> > > > >>>>>>> > >>>> > > >>>>>>> > >>>> > >>>>>>> > >>>> >>>>>>> > >>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing >>>>>>> > >>>> > > > >>>>>>> > >>>> > > > Any comment or discussion is welcome. >>>>>>> > >>>> > > > >>>>>>> > >>>> > > > -- >>>>>>> > >>>> > > > >>>>>>> > >>>> > > > *With kind regards >>>>>>> > >>>> > > > >>>>>>> ------------------------------------------------------------ >>>>>>> > >>>> > > > Sebastian Liu 刘洋 >>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of >>>>>>> Science >>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655 >>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>> > >>>> > > > QQ: 3239559* >>>>>>> > >>>> > > > >>>>>>> > >>>> > > >>>>>>> > >>>> > >>>>>>> > >>>> > >>>>>>> > >>>> > -- >>>>>>> > >>>> > >>>>>>> > >>>> > *With kind regards >>>>>>> > >>>> > ------------------------------------------------------------ >>>>>>> > >>>> > Sebastian Liu 刘洋 >>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of >>>>>>> Science >>>>>>> > >>>> > Mobile\WeChat: +86—15201613655 >>>>>>> > >>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>> > >>>> > QQ: 3239559* >>>>>>> > >>>> > >>>>>>> > >>>> >>>>>>> > >>> >>>>>>> > >>> >>>>>>> > >>> -- >>>>>>> > >>> >>>>>>> > >>> *With kind regards >>>>>>> > >>> ------------------------------------------------------------ >>>>>>> > >>> Sebastian Liu 刘洋 >>>>>>> > >>> Institute of Computing Technology, Chinese Academy of Science >>>>>>> > >>> Mobile\WeChat: +86—15201613655 >>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>> > >>> QQ: 3239559* >>>>>>> > >>> >>>>>>> > >>> >>>>>>> > >>>>>>> > -- >>>>>>> > >>>>>>> > *With kind regards >>>>>>> > ------------------------------------------------------------ >>>>>>> > Sebastian Liu 刘洋 >>>>>>> > Institute of Computing Technology, Chinese Academy of Science >>>>>>> > Mobile\WeChat: +86—15201613655 >>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>>> > QQ: 3239559* >>>>>>> > >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best, Jingsong Lee >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> *With kind regards >>>>>> ------------------------------------------------------------ >>>>>> Sebastian Liu 刘洋 >>>>>> Institute of Computing Technology, Chinese Academy of Science >>>>>> Mobile\WeChat: +86—15201613655 >>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com> >>>>>> QQ: 3239559* >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >>> >>> -- >>> Best, Jingsong Lee >>> >> >> >> -- >> Best, Jingsong Lee >> > -- Best, Jingsong Lee