Thanks Jincheng,

I think approach 3 there is no ambiguity in the semantics and can better
explain a deterministic result, although it is not so easy to use.
So I'm +1 to approach 3.

Best,
Jark

On Fri, 5 Jul 2019 at 18:13, jincheng sun <sunjincheng...@gmail.com> wrote:

> Hi,
>
> @Kurt @Jark thanks again for your comments.
>
> @Kurt
> Separating key and non-key for UDTAGG can definitely provide more
> information for the system, however, it will also add more burden to our
> users and bring some code reuse problems. BTW, approach 3 can also be used
> to separate UDTAGG into keyed or non-keyed as we can check whether the key
> list is empty. So from this point of view, we can use approach 3 to solve
> your problem.
>
> @Jark
> It's great that the TopN in Blink can decide the key automatically. But,
> I'd like to point out another case that the keys cannot be decided by the
> system, i.e., can only be decided by the user. For example, for the TopN,
> let's say top1 for better understanding. Support the Top1 outputs three
> columns(rankid, value, seller_name), and the user wants to upsert the
> result either with key of rankid or with the key of rankid+seller_name.
> 1. With the key of rankid: In this case, the user just wants to get the
> top 1 record.
> 2. With the key of rankid+seller_name: In this case, the user wants to get
> all seller_names that have ever belong to top1. This can not be solved by
> the approach 3 if using only one function. However, it is very easy to
> implement with the withKey approach.
>
> Even though, I have thought more clearly about these things and find more
> interesting things that I want to share with you all. For the TopN example
> which I listed above, it may also lead to a problem in which batch and
> streaming are not unified.
>
> To make it worse, the upsert sink is not supported in batch and we even
> don't have any clear implementation plan about how to support upsert on the
> batch, the unification problem for `withKeys` approach becomes hang in
> doubt.
>
> So, to avoid the unification problem, I think we can also use the approach
> 3. It is more rigorous although less flexible compared to the `withKeys`
> approach.
>
> Meanwhile, I will think more about the unification problem later. Maybe
> new ideas about it may come through. :)
>
> Best,
> Jincheng
>
> Jark Wu <imj...@gmail.com> 于2019年7月5日周五 上午10:48写道:
>
>> Hi Hequn,
>>
>> > If the TopN table aggregate function
>> > outputs three columns(rankid, time, value), either rankid or
>> rankid+time could be
>> > used as the key. Which one to be chosen is more likely to be decided by
>> the user
>> > according to his business.
>> In this case, the TopN table aggregate function should return two sets of
>> unique key, one is "rankid", another is "rankid, time".
>> This will be more align with current TopN node in blink planner and let
>> optimizer to decide which key based on the downstream information (column
>> selection, sink's primary key).
>>
>>
>> Best,
>> Jark
>>
>> On Fri, 5 Jul 2019 at 00:05, Hequn Cheng <chenghe...@gmail.com> wrote:
>>
>>> Hi Kurt and Jark,
>>>
>>> Thanks a lot for your great inputs!
>>>
>>> The keys of the query may not strongly be related to the UDTAGG.
>>> It may also be related to the corresponding scenarios that a user wants
>>> to achieve.
>>>
>>> For example, take TopN again as an example. If the TopN table aggregate
>>> function
>>> outputs three columns(rankid, time, value), either rankid or rankid+time
>>> could be
>>> used as the key. Which one to be chosen is more likely to be decided by
>>> the user
>>> according to his business.
>>>
>>> Best, Hequn
>>>
>>> On Thu, Jul 4, 2019 at 8:11 PM Jark Wu <imj...@gmail.com> wrote:
>>>
>>>> Hi jingcheng,
>>>>
>>>> I agree with Kurt's point. As you said "the user must know the keys of
>>>> the output of UDTAGG clearly".
>>>> If I understand correctly, the key information is strongly relative to
>>>> the UDTAGG implementation.
>>>> Users may call `flatAggregate` on a UDTAGG instance with different keys
>>>> which may result in a wrong result.
>>>> So I think it would be better to couple key information with UDTAGG
>>>> interface (i.e. "Approach 3" in your design doc).
>>>>
>>>> Regards,
>>>> Jark
>>>>
>>>> On Thu, 4 Jul 2019 at 18:06, Kurt Young <ykt...@gmail.com> wrote:
>>>>
>>>>> Hi Jincheng,
>>>>>
>>>>> Thanks for the clarification. I think you just pointed out my concern
>>>>> by
>>>>> yourself:
>>>>>
>>>>> > When a user uses a User-defined table aggregate function (UDTAGG), he
>>>>> must understand the behavior of the UDTAGG, including the return type
>>>>> and
>>>>> the characteristics of the returned data. such as the key fields.
>>>>>
>>>>> This indicates that the UDTAGG is somehow be classified to different
>>>>> types,
>>>>> one will no key, one with key information. So the developer of the
>>>>> UDTAGG
>>>>> should choose which type of this function should be. In this case,
>>>>> my question would be, why don't we have explicit information about keys
>>>>> such as we split UDTAGG to keyed UDTAGG and non-keyed UDTAGG. So the
>>>>> user
>>>>> and the framework will have a better understanding of
>>>>> this UDTAGG. `withKeys` solution is letting user to choose the key and
>>>>> it
>>>>> seems it will only work correctly only if the user choose the *right*
>>>>> key
>>>>> this UDTAGG has.
>>>>>
>>>>> Let me know if this makes sense to you.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Jul 4, 2019 at 4:32 PM jincheng sun <sunjincheng...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> > Hi All,
>>>>> >
>>>>> > @Kurt Young <ykt...@gmail.com> one user-defined table aggregate
>>>>> function
>>>>> > can be used in both with(out) keys case, and we do not introduce any
>>>>> other
>>>>> > aggregations. just like the explanation from @Hequn.
>>>>> >
>>>>> > @Hequn Cheng <chenghe...@gmail.com> thanks for your explanation!
>>>>> >
>>>>> > One thing should be mentioned here:
>>>>> >
>>>>> > When a user uses a User-defined table aggregate function (UDTAGG),
>>>>> he must
>>>>> > understand the behavior of the UDTAGG, including the return type and
>>>>> the
>>>>> > characteristics of the returned data. such as the key fields.  So
>>>>> although
>>>>> > using `withKeys` approach it is not rigorous enough(we do not need)
>>>>> but
>>>>> > intuitive enough, considering that if `flatAggregate` is followed by
>>>>> an
>>>>> > `upsertSink`, then the user must know the keys of the output of
>>>>> UDTAGG
>>>>> > clearly, otherwise the keys of `upsertSink` cannot be defined. So I
>>>>> still
>>>>> > prefer the `withKeys` solution by now.
>>>>> >
>>>>> > Looking forward to any feedback from all of you!
>>>>> >
>>>>> > Best,
>>>>> > Jincheng
>>>>> >
>>>>> >
>>>>> >
>>>>> > Hequn Cheng <chenghe...@gmail.com> 于2019年7月1日周一 下午5:35写道:
>>>>> >
>>>>> >> Hi Kurt,
>>>>> >>
>>>>> >> Thanks for your questions. Here are my thoughts.
>>>>> >>
>>>>> >> > if I want to write such kind function, should I make sure that
>>>>> this
>>>>> >> function is used with some keys?
>>>>> >> The key information may not be used. We can also use RetractSink to
>>>>> emit
>>>>> >> the table directly.
>>>>> >>
>>>>> >> >  If I need a use case to calculate topn without key, should I
>>>>> write
>>>>> >> another function or I can reuse previous one.
>>>>> >> For the TopN example, you can reuse the previous function if you
>>>>> don't
>>>>> >> care
>>>>> >> about the key information.
>>>>> >>
>>>>> >> So, the key information is only an indicator(or a description), not
>>>>> an
>>>>> >> operator, as Jincheng mentioned above.
>>>>> >> We do not need to change the function logic and it will not add any
>>>>> other
>>>>> >> aggregations.
>>>>> >>
>>>>> >> BTW, we have three approaches in the document. Approach 1 defines
>>>>> keys on
>>>>> >> API level as we think it's more common to define keys on Table.
>>>>> >> While approach 3 defines keys in the TableAggregateFunction which
>>>>> is more
>>>>> >> precise but it is not very clear for Table users. So, we should
>>>>> take all
>>>>> >> these into consideration, and make the decision in this discussion
>>>>> thread.
>>>>> >>
>>>>> >> You can take a look at the document and welcome any suggestions or
>>>>> other
>>>>> >> better solutions.
>>>>> >>
>>>>> >> Best, Hequn
>>>>> >>
>>>>> >>
>>>>> >> On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <ykt...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >> > Hi Jincheng,
>>>>> >> >
>>>>> >> > Thanks for the clarification. Take 'TopN' for example, if I want
>>>>> to
>>>>> >> write
>>>>> >> > such kind function,
>>>>> >> > should I make sure that this function is used with some keys? If
>>>>> I need
>>>>> >> a
>>>>> >> > use case to calculate
>>>>> >> > topn without key, should I write another function or I can reuse
>>>>> >> previous
>>>>> >> > one.
>>>>> >> >
>>>>> >> > I'm not sure about the idea of this does not involve semantic
>>>>> changes.
>>>>> >> To
>>>>> >> > me, it sounds like
>>>>> >> > we are doing another nested aggregation inside the table
>>>>> >> > which TableAggregateFunction emits.
>>>>> >> >
>>>>> >> > Maybe I'm not familiar with this function enough, hope you can
>>>>> help me
>>>>> >> to
>>>>> >> > understand.
>>>>> >> >
>>>>> >> > Best,
>>>>> >> > Kurt
>>>>> >> >
>>>>> >> >
>>>>> >> > On Mon, Jul 1, 2019 at 11:59 AM jincheng sun <
>>>>> sunjincheng...@gmail.com>
>>>>> >> > wrote:
>>>>> >> >
>>>>> >> > > Hi Kurt,
>>>>> >> > >
>>>>> >> > > Thanks for your questions, I am glad to share my thoughts here:
>>>>> >> > >
>>>>> >> > > My question is, will that effect the logic
>>>>> ofTableAggregateFunction
>>>>> >> user
>>>>> >> > > > wrote? Should the user know that there will a key and make
>>>>> some
>>>>> >> changes
>>>>> >> > > to
>>>>> >> > > > this function?
>>>>> >> > >
>>>>> >> > >
>>>>> >> > > No, the keys information depends on the implementation of the
>>>>> >> > > TableAggregateFunction.
>>>>> >> > > For example, for a `topN` user defined TableAggregateFunction,
>>>>> we can
>>>>> >> > only
>>>>> >> > > use the `keys` if the `topN` contains `rankid` in the output.
>>>>> You can
>>>>> >> > > treat the
>>>>> >> > > `keys` like an indicator.
>>>>> >> > >
>>>>> >> > > If not, how will framework deal with the output of user's
>>>>> >> > > > TableAggregateFunction.  if user output multiple rows with
>>>>> the same
>>>>> >> > key,
>>>>> >> > > > should be latter one replace previous ones?
>>>>> >> > >
>>>>> >> > >
>>>>> >> > > If a TableAggregateFunction outputs multiple rows with the same
>>>>> key,
>>>>> >> the
>>>>> >> > > latter one should replace the previous one, either with upsert
>>>>> mode or
>>>>> >> > > retract mode. i.e., Whether the user defines the Key or not,
>>>>> the Flink
>>>>> >> > > framework should ensure the correctness of the semantics.
>>>>> >> > >
>>>>> >> > > At present, the problem we are discussing does not involve
>>>>> semantic
>>>>> >> > > changes. The definition of keys is to support non-window
>>>>> >> flatAggregate on
>>>>> >> > > upsert mode. (The upsert mode is already supported in the flink
>>>>> >> > framework.
>>>>> >> > > The current discussion only needs to inform the framework that
>>>>> the
>>>>> >> keys
>>>>> >> > > information, which is the `withKeys` API we discussing.)
>>>>> >> > >
>>>>> >> > > Welcome any other feedbacks :)
>>>>> >> > >
>>>>> >> > > Best,
>>>>> >> > > Jincheng
>>>>> >> > >
>>>>> >> > > Kurt Young <ykt...@gmail.com> 于2019年7月1日周一 上午9:23写道:
>>>>> >> > >
>>>>> >> > > > Hi,
>>>>> >> > > >
>>>>> >> > > > I have a question about the key information of
>>>>> >> TableAggregateFunction.
>>>>> >> > > > IIUC, you need to define
>>>>> >> > > > something like primary key or unique key in the result table
>>>>> of
>>>>> >> > > > TableAggregateFunction, and also
>>>>> >> > > > need a way to let user configure this through the API. My
>>>>> question
>>>>> >> is,
>>>>> >> > > will
>>>>> >> > > > that effect the logic of
>>>>> >> > > > TableAggregateFunction user wrote? Should the user know that
>>>>> there
>>>>> >> > will a
>>>>> >> > > > key and make some changes
>>>>> >> > > > to this function?
>>>>> >> > > >
>>>>> >> > > > If so, what's the semantic the user should learned. If not,
>>>>> how will
>>>>> >> > > > framework deal with the output of user's
>>>>> >> > > > TableAggregateFunction. For example, if user output multiple
>>>>> rows
>>>>> >> with
>>>>> >> > > the
>>>>> >> > > > same key, should be latter one
>>>>> >> > > > replace previous ones?
>>>>> >> > > >
>>>>> >> > > > Best,
>>>>> >> > > > Kurt
>>>>> >> > > >
>>>>> >> > > >
>>>>> >> > > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun <
>>>>> >> sunjincheng...@gmail.com>
>>>>> >> > > > wrote:
>>>>> >> > > >
>>>>> >> > > > > Hi hequn, Thanks for the reply! I think `withKeys` solution
>>>>> is our
>>>>> >> > > better
>>>>> >> > > > > choice!
>>>>> >> > > > >
>>>>> >> > > > >
>>>>> >> > > > > Hequn Cheng <chenghe...@gmail.com> 于2019年6月26日周三 下午5:11写道:
>>>>> >> > > > >
>>>>> >> > > > > > Hi Jincheng,
>>>>> >> > > > > >
>>>>> >> > > > > > Thanks for raising the discussion!
>>>>> >> > > > > > The key information is very important for query
>>>>> optimizations.
>>>>> >> It
>>>>> >> > > would
>>>>> >> > > > > be
>>>>> >> > > > > > nice if we can use upsert mode to achieve better
>>>>> performance.
>>>>> >> > > > > >
>>>>> >> > > > > > +1 for the `withKeys` proposal. :)
>>>>> >> > > > > >
>>>>> >> > > > > > Best, Hequn
>>>>> >> > > > > >
>>>>> >> > > > > >
>>>>> >> > > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun <
>>>>> >> > > sunjincheng...@gmail.com
>>>>> >> > > > >
>>>>> >> > > > > > wrote:
>>>>> >> > > > > >
>>>>> >> > > > > > > Hi all,
>>>>> >> > > > > > >
>>>>> >> > > > > > > With the continuous efforts from the community, we
>>>>> already
>>>>> >> > > supported
>>>>> >> > > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think
>>>>> It's
>>>>> >> > better
>>>>> >> > > > to
>>>>> >> > > > > > add
>>>>> >> > > > > > > upsert mode for  `flatAggregate`.
>>>>> >> > > > > > >
>>>>> >> > > > > > > The result table of streaming non-window
>>>>> `flatAggregate` is a
>>>>> >> > table
>>>>> >> > > > > > > contains updates. We can, of course, use a
>>>>> >> > > RetractStreamTableSink[2]
>>>>> >> > > > to
>>>>> >> > > > > > > emit the table, but we can get better performance in
>>>>> upsert
>>>>> >> mode.
>>>>> >> > > > > > However,
>>>>> >> > > > > > > due to the lack of keys, we can’t use an
>>>>> >> UpsertStreamTableSink to
>>>>> >> > > > emit
>>>>> >> > > > > > the
>>>>> >> > > > > > > table. We don’t have this problem for a normal
>>>>> aggregate as it
>>>>> >> > > emits
>>>>> >> > > > a
>>>>> >> > > > > > > single row for each group, so the unique keys are
>>>>> exactly the
>>>>> >> > same
>>>>> >> > > > with
>>>>> >> > > > > > the
>>>>> >> > > > > > > group keys. While for a `flatAggregate`, its pretty
>>>>> difference
>>>>> >> > that
>>>>> >> > > > due
>>>>> >> > > > > > to
>>>>> >> > > > > > > emits multi rows(a “sub-table”) for a single group. To
>>>>> solve
>>>>> >> this
>>>>> >> > > > > > problem,
>>>>> >> > > > > > > we need to find a way to define keys on flatAggregate,
>>>>> so
>>>>> >> that we
>>>>> >> > > can
>>>>> >> > > > > > also
>>>>> >> > > > > > > use upsert sink to emit the result table after
>>>>> flatAggregate.
>>>>> >> > > > > > >
>>>>> >> > > > > > > So, Aljoscha, Hequn and I prepared a design document
>>>>> for how
>>>>> >> to
>>>>> >> > > > define
>>>>> >> > > > > > the
>>>>> >> > > > > > > update keys for  `flatAggregate` in upsert mode.  The
>>>>> detail
>>>>> >> can
>>>>> >> > be
>>>>> >> > > > > found
>>>>> >> > > > > > > here:
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > >
>>>>> >> > > > >
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing
>>>>> >> > > > > > >
>>>>> >> > > > > > > I appreciate it if you can have look at the document
>>>>> and any
>>>>> >> > > comments
>>>>> >> > > > > are
>>>>> >> > > > > > > welcome!
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > > > Best,
>>>>> >> > > > > > >
>>>>> >> > > > > > > Jincheng
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > > > [1]
>>>>> >> > > > > > >
>>>>> >> > > > > >
>>>>> >> > > > >
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
>>>>> >> > > > > > >
>>>>> >> > > > > > > [2]
>>>>> >> > > > > > >
>>>>> >> > > > > > >
>>>>> >> > > > > >
>>>>> >> > > > >
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource
>>>>> >> > > > > > >
>>>>> >> > > > > >
>>>>> >> > > > >
>>>>> >> > > >
>>>>> >> > >
>>>>> >> >
>>>>> >>
>>>>> >
>>>>>
>>>>

Reply via email to