> I think filter expressions and grouping sets are semantic arguments
instead of utilities. If we want to push them into sources, the connector
developers should be aware of them.Wrapping them in a context implicitly is
error-prone that the existing connector will produce wrong results when
upgrading to new Flink versions.

We can have some mechanism to check the upgrading.

> I think for these cases, providing a new default method to override might
be a better choice.

Then we will have three or more methods. For the API level, I really don't
like it...

Best,
Jingsong

On Wed, Jan 6, 2021 at 2:10 PM Jark Wu <imj...@gmail.com> wrote:

> I think filter expressions and grouping sets are semantic arguments
> instead of utilities.
> If we want to push them into sources, the connector developers should be
> aware of them.
> Wrapping them in a context implicitly is error-prone that the existing
> connector will produce wrong results
>  when upgrading to new Flink versions (as we are pushing
> grouping_sets/filter_args, but connector ignores it).
> I think for these cases, providing a new default method to override might
> be a better choice.
>
> Best,
> Jark
>
> On Wed, 6 Jan 2021 at 13:56, Jingsong Li <jingsongl...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm also curious about aggregate with filter (COUNT(1) FILTER(WHERE d >
>> 1)). Can we push it down? I'm not sure that a single call expression can
>> express it, and how we should embody it and convey it to users.
>>
>> Best,
>> Jingsong
>>
>> On Wed, Jan 6, 2021 at 1:36 PM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>>
>>> Hi Jark,
>>>
>>> I don't want to limit this interface to LocalAgg Push down. Actually,
>>> sometimes, we can push whole aggregation to source too.
>>>
>>> So, this rule can do something more advanced. For example, we can push
>>> down group sets to source too, for the SQL: "GROUP BY GROUPING SETS (f1,
>>> f2)". Then, we need to add more information to push down.
>>>
>>> Best,
>>> Jingsong
>>>
>>> On Wed, Jan 6, 2021 at 11:02 AM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> I think this may be over designed. We should have confidence in the
>>>> interface we design, the interface should be stable.
>>>> Wrapping things in a big context has a cost of losing user convenience.
>>>> Foremost, we don't see any parameters to add in the future. Do you know
>>>> any potential parameters?
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 6 Jan 2021 at 10:28, Jingsong Li <jingsongl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Sebastian,
>>>>>
>>>>> Well, I mean:
>>>>>
>>>>> `boolean applyAggregates(int[] groupingFields, List<CallExpression>
>>>>> aggregateExpressions, DataType producedDataType);`
>>>>> VS
>>>>> ```
>>>>> boolean applyAggregates(Aggregation agg);
>>>>>
>>>>> interface Aggregation {
>>>>>   int[] groupingFields();
>>>>>   List<CallExpression> aggregateExpressions();
>>>>>   DataType producedDataType();
>>>>> }
>>>>> ```
>>>>>
>>>>> Maybe I've over considered it, but I think Aggregation is a
>>>>> complicated thing. Maybe we need to extend its parameters in the future, 
>>>>> so
>>>>> make the parameters interface, which is conducive to the future expansion
>>>>> without destroying the compatibility of user implementation. If it is the
>>>>> way before, users need to modify the code.
>>>>>
>>>>> Best,
>>>>> Jingsong
>>>>>
>>>>> On Wed, Jan 6, 2021 at 12:52 AM Sebastian Liu <liuyang0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jinsong,
>>>>>>
>>>>>> Thx a lot for your suggestion. These points really need to be clear
>>>>>> in the proposal.
>>>>>>
>>>>>> For the semantic problem, I think the main point is the different
>>>>>> returned data types
>>>>>> for the target aggregate function and the row format returned by the
>>>>>> underlying storage.
>>>>>> That's why we provide the producedDataType in the
>>>>>> SupportsAggregatePushDown interface.
>>>>>> Need to let developers know that we need to handle the semantic
>>>>>> differences between
>>>>>> the underlying storage system and Flink in related connectors.
>>>>>> [Supplemented in proposal]
>>>>>>
>>>>>> For the phase of the new PushLocalAggIntoTableSourceScanRule rule,
>>>>>> it's also a key point.
>>>>>> As you suggested, we should put it into the PHYSICAL_REWRITE rule
>>>>>> set, and better to put it
>>>>>> behind the EnforceLocalXXAggRule. [Supplemented in proposal]
>>>>>>
>>>>>> For the scalability of the interface, actually I don't exactly
>>>>>> understand your suggestion. Is it to add
>>>>>> an abstract class, to implement the SupportsAggregatePushDown
>>>>>> interface, and holds the
>>>>>> `List < CallExpression > aggregateExpressions, int[] GroupingFields,
>>>>>> DataType producedDataType`
>>>>>> fields?
>>>>>>
>>>>>> Looking forward to your further feedback or guidance.
>>>>>>
>>>>>> Jingsong Li <jingsongl...@gmail.com> 于2021年1月5日周二 下午2:44写道:
>>>>>>
>>>>>>> Thanks for your proposal! Sebastian.
>>>>>>>
>>>>>>> +1 for SupportsAggregatePushDown. The above wonderful discussion has
>>>>>>> solved
>>>>>>> many of my concerns.
>>>>>>>
>>>>>>> ## Semantic problems
>>>>>>>
>>>>>>> We may need to add some mechanisms or comments, because as far as I
>>>>>>> know,
>>>>>>> the semantics of each database is actually different, which may need
>>>>>>> to be
>>>>>>> reflected in your specific implementation.
>>>>>>>
>>>>>>> For example, the AVG output types of various databases may be
>>>>>>> different.
>>>>>>> For example, MySQL outputs double, this is different from Flink. What
>>>>>>> should we do? (Lucky, avg will be splitted into sum and count, But
>>>>>>> we also
>>>>>>> need care about decimal and others)
>>>>>>>
>>>>>>> ## The phase of push-down rule
>>>>>>>
>>>>>>> I strongly recommend that you do not put it in the Volcano phase,
>>>>>>> which may
>>>>>>> make the cost calculation very troublesome.
>>>>>>> So in PHYSICAL_REWRITE?
>>>>>>>
>>>>>>> ## About interface
>>>>>>>
>>>>>>> For scalability, I slightly recommend that we introduce an
>>>>>>> `Aggregate`
>>>>>>> interface, it contains `List<CallExpression> aggregateExpressions,
>>>>>>> int[]
>>>>>>> groupingFields, DataType producedDataType` fields. In this way, we
>>>>>>> can add
>>>>>>> fields easily without breaking compatibility.
>>>>>>>
>>>>>>> I think the current design is very good, just put forward some ideas.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong
>>>>>>>
>>>>>>> On Tue, Jan 5, 2021 at 1:55 PM Sebastian Liu <liuyang0...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> > Hi Jark,
>>>>>>> >
>>>>>>> > Thx for your further feedback and help. The interface of
>>>>>>> > SupportsAggregatePushDown may indeed need some adjustments.
>>>>>>> >
>>>>>>> > For (1) Agree: Yeah, the upstream only need to know if the
>>>>>>> TableSource can
>>>>>>> > handle all of the aggregates.
>>>>>>> > It's better to just return a boolean type to indicate whether all
>>>>>>> of
>>>>>>> > aggregates push down was successful or not. [Resolved in proposal]
>>>>>>> >
>>>>>>> > For (2) Agree: The aggOutputDataType represent the produced data
>>>>>>> type of
>>>>>>> > the new table source to make sure that the new table source can
>>>>>>> > connect with the related exchange node. The format of this
>>>>>>> > aggOutputDataType is groupedFields's type + agg function's return
>>>>>>> type.
>>>>>>> > The reason for adding this parameter in this function is also to
>>>>>>> facilitate
>>>>>>> > the user to build the final output type. I have changed this
>>>>>>> parameter
>>>>>>> > to be producedDataType. [Resolved in proposal]
>>>>>>> >
>>>>>>> > For (3) Agree: Indeed, groupSet may mislead users, I have changed
>>>>>>> to use
>>>>>>> > groupingFields. [Resolved in proposal]
>>>>>>> >
>>>>>>> > Thx again for the suggestion, looking for the further discussion.
>>>>>>> >
>>>>>>> > Jark Wu <imj...@gmail.com> 于2021年1月5日周二 下午12:05写道:
>>>>>>> >
>>>>>>> > > I'm also +1 for idea#2.
>>>>>>> > >
>>>>>>> > > Regarding to the updated interface,
>>>>>>> > >
>>>>>>> > > Result applyAggregates(List<CallExpression> aggregateExpressions,
>>>>>>> > >      int[] groupSet, DataType aggOutputDataType);
>>>>>>> > >
>>>>>>> > > final class Result {
>>>>>>> > >        private final List<CallExpression> acceptedAggregates;
>>>>>>> > >        private final List<CallExpression> remainingAggregates;
>>>>>>> > > }
>>>>>>> > >
>>>>>>> > > I have following comments:
>>>>>>> > >
>>>>>>> > > 1) Do we need the composite Result return type? Is a boolean
>>>>>>> return type
>>>>>>> > > enough?
>>>>>>> > >     From my understanding, all of the aggregates should be
>>>>>>> accepted,
>>>>>>> > > otherwise the pushdown should fail.
>>>>>>> > >     Therefore, users don't need to distinguish which aggregates
>>>>>>> are
>>>>>>> > > "accepted".
>>>>>>> > >
>>>>>>> > > 2) Does the `aggOutputDataType` represent the produced data type
>>>>>>> of the
>>>>>>> > > new source, or just the return type of all the agg functions?
>>>>>>> > >     I would prefer to `producedDataType` just like
>>>>>>> > > `SupportsReadingMetadata` to reduce the effort for users to
>>>>>>> concat a
>>>>>>> > final
>>>>>>> > > output type.
>>>>>>> > >     The return type of each agg function can be obtained from the
>>>>>>> > > `CallExpression`.
>>>>>>> > >
>>>>>>> > > 3) What do you think about renaming `groupSet` to `grouping` or
>>>>>>> > > `groupedFields` ?
>>>>>>> > >     The `groupSet` may confuse users that it relates to
>>>>>>> "grouping sets".
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > What do you think?
>>>>>>> > >
>>>>>>> > > Best,
>>>>>>> > > Jark
>>>>>>> > >
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > On Tue, 5 Jan 2021 at 11:04, Kurt Young <ykt...@gmail.com>
>>>>>>> wrote:
>>>>>>> > >
>>>>>>> > >> Sorry for the typo -_-!
>>>>>>> > >> I meant idea #2.
>>>>>>> > >>
>>>>>>> > >> Best,
>>>>>>> > >> Kurt
>>>>>>> > >>
>>>>>>> > >>
>>>>>>> > >> On Tue, Jan 5, 2021 at 10:59 AM Sebastian Liu <
>>>>>>> liuyang0...@gmail.com>
>>>>>>> > >> wrote:
>>>>>>> > >>
>>>>>>> > >>> Hi Kurt,
>>>>>>> > >>>
>>>>>>> > >>> Thx a lot for your feedback. If local aggregation is more like
>>>>>>> a
>>>>>>> > >>> physical operator rather than logical
>>>>>>> > >>> operator, I think your suggestion should be idea #2 which
>>>>>>> handle all in
>>>>>>> > >>> the physical optimization phase?
>>>>>>> > >>>
>>>>>>> > >>> Looking forward for the further discussion.
>>>>>>> > >>>
>>>>>>> > >>>
>>>>>>> > >>> Kurt Young <ykt...@gmail.com> 于2021年1月5日周二 上午9:52写道:
>>>>>>> > >>>
>>>>>>> > >>>> Local aggregation is more like a physical operator rather
>>>>>>> than logical
>>>>>>> > >>>> operator. I would suggest going with idea #1.
>>>>>>> > >>>>
>>>>>>> > >>>> Best,
>>>>>>> > >>>> Kurt
>>>>>>> > >>>>
>>>>>>> > >>>>
>>>>>>> > >>>> On Wed, Dec 30, 2020 at 8:31 PM Sebastian Liu <
>>>>>>> liuyang0...@gmail.com>
>>>>>>> > >>>> wrote:
>>>>>>> > >>>>
>>>>>>> > >>>> > Hi Jark, Thx a lot for your quick reply and valuable
>>>>>>> suggestions.
>>>>>>> > >>>> > For (1): Agree: Since we are in the period of upgrading the
>>>>>>> new
>>>>>>> > table
>>>>>>> > >>>> > source api,
>>>>>>> > >>>> > we really should consider the new interface for the new
>>>>>>> optimize
>>>>>>> > >>>> rule. If
>>>>>>> > >>>> > the new rule
>>>>>>> > >>>> > doesn't use the new api, we'll have to upgrade it sooner or
>>>>>>> later. I
>>>>>>> > >>>> have
>>>>>>> > >>>> > change to use
>>>>>>> > >>>> > the ability interface for the SupportsAggregatePushDown
>>>>>>> definition
>>>>>>> > in
>>>>>>> > >>>> above
>>>>>>> > >>>> > proposal.
>>>>>>> > >>>> >
>>>>>>> > >>>> > For (2): Agree: Change to use CallExpression is a better
>>>>>>> choice, and
>>>>>>> > >>>> have
>>>>>>> > >>>> > resolved this
>>>>>>> > >>>> > comment in the proposal.
>>>>>>> > >>>> >
>>>>>>> > >>>> > For (3): I suggest we first support the JDBC connector, as
>>>>>>> we don't
>>>>>>> > >>>> have
>>>>>>> > >>>> > Druid connector
>>>>>>> > >>>> > and ES connector just has sink api at present.
>>>>>>> > >>>> >
>>>>>>> > >>>> > But perhaps the biggest question may be whether we should
>>>>>>> use idea 1
>>>>>>> > >>>> or
>>>>>>> > >>>> > idea 2 in proposal.
>>>>>>> > >>>> >
>>>>>>> > >>>> > What do you think?  After we reach the agreement on the
>>>>>>> proposal,
>>>>>>> > our
>>>>>>> > >>>> team
>>>>>>> > >>>> > can drive to
>>>>>>> > >>>> > complete this feature.
>>>>>>> > >>>> >
>>>>>>> > >>>> > Jark Wu <imj...@gmail.com> 于2020年12月29日周二 下午2:58写道:
>>>>>>> > >>>> >
>>>>>>> > >>>> > > Hi Sebastian,
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > Thanks for the proposal. I think this is a great
>>>>>>> improvement for
>>>>>>> > >>>> Flink
>>>>>>> > >>>> > SQL.
>>>>>>> > >>>> > > I went through the design doc and have following thoughts:
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > 1) Flink has deprecated the legacy TableSource in 1.11 and
>>>>>>> > proposed
>>>>>>> > >>>> a new
>>>>>>> > >>>> > >  set of DynamicTableSource interfaces. Could you update
>>>>>>> your
>>>>>>> > >>>> proposal to
>>>>>>> > >>>> > > use the new interfaces?
>>>>>>> > >>>> > >  Follow the existing ability interfaces, e.g.
>>>>>>> > >>>> > > SupportsFilterPushDown, SupportsProjectionPushDown.
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > 2) Personally, I think CallExpression would be a better
>>>>>>> > >>>> representation
>>>>>>> > >>>> > than
>>>>>>> > >>>> > > separate `FunctionDefinition` and args. Because, it would
>>>>>>> be
>>>>>>> > easier
>>>>>>> > >>>> to
>>>>>>> > >>>> > know
>>>>>>> > >>>> > > what's the index and type of the arguments.
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > 3) It would be better to list which connectors will be
>>>>>>> supported
>>>>>>> > in
>>>>>>> > >>>> the
>>>>>>> > >>>> > > plan?
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > Best,
>>>>>>> > >>>> > > Jark
>>>>>>> > >>>> > >
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > On Tue, 29 Dec 2020 at 00:49, Sebastian Liu <
>>>>>>> > liuyang0...@gmail.com>
>>>>>>> > >>>> > wrote:
>>>>>>> > >>>> > >
>>>>>>> > >>>> > > > Hi all,
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > I'd like to discuss a new feature for the Blink Planner.
>>>>>>> > >>>> > > > Aggregate operator of Flink SQL is currently fully done
>>>>>>> at Flink
>>>>>>> > >>>> layer.
>>>>>>> > >>>> > > > With the developing of storage, many downstream storage
>>>>>>> of Flink
>>>>>>> > >>>> SQL
>>>>>>> > >>>> > has
>>>>>>> > >>>> > > > the ability to deal with Aggregation operator.
>>>>>>> > >>>> > > > Pushing down Aggregate to data source layer will improve
>>>>>>> > >>>> performance
>>>>>>> > >>>> > from
>>>>>>> > >>>> > > > the perspective of the network IO and computation
>>>>>>> overhead.
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > I have drafted a design doc for this new feature.
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > >
>>>>>>> > >>>> >
>>>>>>> > >>>>
>>>>>>> >
>>>>>>> https://docs.google.com/document/d/1kGwC_h4qBNxF2eMEz6T6arByOB8yilrPLqDN0QBQXW4/edit?usp=sharing
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > Any comment or discussion is welcome.
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > --
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > > > *With kind regards
>>>>>>> > >>>> > > >
>>>>>>> ------------------------------------------------------------
>>>>>>> > >>>> > > > Sebastian Liu 刘洋
>>>>>>> > >>>> > > > Institute of Computing Technology, Chinese Academy of
>>>>>>> Science
>>>>>>> > >>>> > > > Mobile\WeChat: +86—15201613655
>>>>>>> > >>>> > > > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>> > >>>> > > > QQ: 3239559*
>>>>>>> > >>>> > > >
>>>>>>> > >>>> > >
>>>>>>> > >>>> >
>>>>>>> > >>>> >
>>>>>>> > >>>> > --
>>>>>>> > >>>> >
>>>>>>> > >>>> > *With kind regards
>>>>>>> > >>>> > ------------------------------------------------------------
>>>>>>> > >>>> > Sebastian Liu 刘洋
>>>>>>> > >>>> > Institute of Computing Technology, Chinese Academy of
>>>>>>> Science
>>>>>>> > >>>> > Mobile\WeChat: +86—15201613655
>>>>>>> > >>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>> > >>>> > QQ: 3239559*
>>>>>>> > >>>> >
>>>>>>> > >>>>
>>>>>>> > >>>
>>>>>>> > >>>
>>>>>>> > >>> --
>>>>>>> > >>>
>>>>>>> > >>> *With kind regards
>>>>>>> > >>> ------------------------------------------------------------
>>>>>>> > >>> Sebastian Liu 刘洋
>>>>>>> > >>> Institute of Computing Technology, Chinese Academy of Science
>>>>>>> > >>> Mobile\WeChat: +86—15201613655
>>>>>>> > >>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>> > >>> QQ: 3239559*
>>>>>>> > >>>
>>>>>>> > >>>
>>>>>>> >
>>>>>>> > --
>>>>>>> >
>>>>>>> > *With kind regards
>>>>>>> > ------------------------------------------------------------
>>>>>>> > Sebastian Liu 刘洋
>>>>>>> > Institute of Computing Technology, Chinese Academy of Science
>>>>>>> > Mobile\WeChat: +86—15201613655
>>>>>>> > E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>>> > QQ: 3239559*
>>>>>>> >
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *With kind regards
>>>>>> ------------------------------------------------------------
>>>>>> Sebastian Liu 刘洋
>>>>>> Institute of Computing Technology, Chinese Academy of Science
>>>>>> Mobile\WeChat: +86—15201613655
>>>>>> E-mail: liuyang0...@gmail.com <liuyang0...@gmail.com>
>>>>>> QQ: 3239559*
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best, Jingsong Lee

Reply via email to