Thanks Jincheng, I think approach 3 there is no ambiguity in the semantics and can better explain a deterministic result, although it is not so easy to use. So I'm +1 to approach 3.
Best, Jark On Fri, 5 Jul 2019 at 18:13, jincheng sun <sunjincheng...@gmail.com> wrote: > Hi, > > @Kurt @Jark thanks again for your comments. > > @Kurt > Separating key and non-key for UDTAGG can definitely provide more > information for the system, however, it will also add more burden to our > users and bring some code reuse problems. BTW, approach 3 can also be used > to separate UDTAGG into keyed or non-keyed as we can check whether the key > list is empty. So from this point of view, we can use approach 3 to solve > your problem. > > @Jark > It's great that the TopN in Blink can decide the key automatically. But, > I'd like to point out another case that the keys cannot be decided by the > system, i.e., can only be decided by the user. For example, for the TopN, > let's say top1 for better understanding. Support the Top1 outputs three > columns(rankid, value, seller_name), and the user wants to upsert the > result either with key of rankid or with the key of rankid+seller_name. > 1. With the key of rankid: In this case, the user just wants to get the > top 1 record. > 2. With the key of rankid+seller_name: In this case, the user wants to get > all seller_names that have ever belong to top1. This can not be solved by > the approach 3 if using only one function. However, it is very easy to > implement with the withKey approach. > > Even though, I have thought more clearly about these things and find more > interesting things that I want to share with you all. For the TopN example > which I listed above, it may also lead to a problem in which batch and > streaming are not unified. > > To make it worse, the upsert sink is not supported in batch and we even > don't have any clear implementation plan about how to support upsert on the > batch, the unification problem for `withKeys` approach becomes hang in > doubt. > > So, to avoid the unification problem, I think we can also use the approach > 3. It is more rigorous although less flexible compared to the `withKeys` > approach. > > Meanwhile, I will think more about the unification problem later. Maybe > new ideas about it may come through. :) > > Best, > Jincheng > > Jark Wu <imj...@gmail.com> 于2019年7月5日周五 上午10:48写道: > >> Hi Hequn, >> >> > If the TopN table aggregate function >> > outputs three columns(rankid, time, value), either rankid or >> rankid+time could be >> > used as the key. Which one to be chosen is more likely to be decided by >> the user >> > according to his business. >> In this case, the TopN table aggregate function should return two sets of >> unique key, one is "rankid", another is "rankid, time". >> This will be more align with current TopN node in blink planner and let >> optimizer to decide which key based on the downstream information (column >> selection, sink's primary key). >> >> >> Best, >> Jark >> >> On Fri, 5 Jul 2019 at 00:05, Hequn Cheng <chenghe...@gmail.com> wrote: >> >>> Hi Kurt and Jark, >>> >>> Thanks a lot for your great inputs! >>> >>> The keys of the query may not strongly be related to the UDTAGG. >>> It may also be related to the corresponding scenarios that a user wants >>> to achieve. >>> >>> For example, take TopN again as an example. If the TopN table aggregate >>> function >>> outputs three columns(rankid, time, value), either rankid or rankid+time >>> could be >>> used as the key. Which one to be chosen is more likely to be decided by >>> the user >>> according to his business. >>> >>> Best, Hequn >>> >>> On Thu, Jul 4, 2019 at 8:11 PM Jark Wu <imj...@gmail.com> wrote: >>> >>>> Hi jingcheng, >>>> >>>> I agree with Kurt's point. As you said "the user must know the keys of >>>> the output of UDTAGG clearly". >>>> If I understand correctly, the key information is strongly relative to >>>> the UDTAGG implementation. >>>> Users may call `flatAggregate` on a UDTAGG instance with different keys >>>> which may result in a wrong result. >>>> So I think it would be better to couple key information with UDTAGG >>>> interface (i.e. "Approach 3" in your design doc). >>>> >>>> Regards, >>>> Jark >>>> >>>> On Thu, 4 Jul 2019 at 18:06, Kurt Young <ykt...@gmail.com> wrote: >>>> >>>>> Hi Jincheng, >>>>> >>>>> Thanks for the clarification. I think you just pointed out my concern >>>>> by >>>>> yourself: >>>>> >>>>> > When a user uses a User-defined table aggregate function (UDTAGG), he >>>>> must understand the behavior of the UDTAGG, including the return type >>>>> and >>>>> the characteristics of the returned data. such as the key fields. >>>>> >>>>> This indicates that the UDTAGG is somehow be classified to different >>>>> types, >>>>> one will no key, one with key information. So the developer of the >>>>> UDTAGG >>>>> should choose which type of this function should be. In this case, >>>>> my question would be, why don't we have explicit information about keys >>>>> such as we split UDTAGG to keyed UDTAGG and non-keyed UDTAGG. So the >>>>> user >>>>> and the framework will have a better understanding of >>>>> this UDTAGG. `withKeys` solution is letting user to choose the key and >>>>> it >>>>> seems it will only work correctly only if the user choose the *right* >>>>> key >>>>> this UDTAGG has. >>>>> >>>>> Let me know if this makes sense to you. >>>>> >>>>> Best, >>>>> Kurt >>>>> >>>>> >>>>> On Thu, Jul 4, 2019 at 4:32 PM jincheng sun <sunjincheng...@gmail.com> >>>>> wrote: >>>>> >>>>> > Hi All, >>>>> > >>>>> > @Kurt Young <ykt...@gmail.com> one user-defined table aggregate >>>>> function >>>>> > can be used in both with(out) keys case, and we do not introduce any >>>>> other >>>>> > aggregations. just like the explanation from @Hequn. >>>>> > >>>>> > @Hequn Cheng <chenghe...@gmail.com> thanks for your explanation! >>>>> > >>>>> > One thing should be mentioned here: >>>>> > >>>>> > When a user uses a User-defined table aggregate function (UDTAGG), >>>>> he must >>>>> > understand the behavior of the UDTAGG, including the return type and >>>>> the >>>>> > characteristics of the returned data. such as the key fields. So >>>>> although >>>>> > using `withKeys` approach it is not rigorous enough(we do not need) >>>>> but >>>>> > intuitive enough, considering that if `flatAggregate` is followed by >>>>> an >>>>> > `upsertSink`, then the user must know the keys of the output of >>>>> UDTAGG >>>>> > clearly, otherwise the keys of `upsertSink` cannot be defined. So I >>>>> still >>>>> > prefer the `withKeys` solution by now. >>>>> > >>>>> > Looking forward to any feedback from all of you! >>>>> > >>>>> > Best, >>>>> > Jincheng >>>>> > >>>>> > >>>>> > >>>>> > Hequn Cheng <chenghe...@gmail.com> 于2019年7月1日周一 下午5:35写道: >>>>> > >>>>> >> Hi Kurt, >>>>> >> >>>>> >> Thanks for your questions. Here are my thoughts. >>>>> >> >>>>> >> > if I want to write such kind function, should I make sure that >>>>> this >>>>> >> function is used with some keys? >>>>> >> The key information may not be used. We can also use RetractSink to >>>>> emit >>>>> >> the table directly. >>>>> >> >>>>> >> > If I need a use case to calculate topn without key, should I >>>>> write >>>>> >> another function or I can reuse previous one. >>>>> >> For the TopN example, you can reuse the previous function if you >>>>> don't >>>>> >> care >>>>> >> about the key information. >>>>> >> >>>>> >> So, the key information is only an indicator(or a description), not >>>>> an >>>>> >> operator, as Jincheng mentioned above. >>>>> >> We do not need to change the function logic and it will not add any >>>>> other >>>>> >> aggregations. >>>>> >> >>>>> >> BTW, we have three approaches in the document. Approach 1 defines >>>>> keys on >>>>> >> API level as we think it's more common to define keys on Table. >>>>> >> While approach 3 defines keys in the TableAggregateFunction which >>>>> is more >>>>> >> precise but it is not very clear for Table users. So, we should >>>>> take all >>>>> >> these into consideration, and make the decision in this discussion >>>>> thread. >>>>> >> >>>>> >> You can take a look at the document and welcome any suggestions or >>>>> other >>>>> >> better solutions. >>>>> >> >>>>> >> Best, Hequn >>>>> >> >>>>> >> >>>>> >> On Mon, Jul 1, 2019 at 12:13 PM Kurt Young <ykt...@gmail.com> >>>>> wrote: >>>>> >> >>>>> >> > Hi Jincheng, >>>>> >> > >>>>> >> > Thanks for the clarification. Take 'TopN' for example, if I want >>>>> to >>>>> >> write >>>>> >> > such kind function, >>>>> >> > should I make sure that this function is used with some keys? If >>>>> I need >>>>> >> a >>>>> >> > use case to calculate >>>>> >> > topn without key, should I write another function or I can reuse >>>>> >> previous >>>>> >> > one. >>>>> >> > >>>>> >> > I'm not sure about the idea of this does not involve semantic >>>>> changes. >>>>> >> To >>>>> >> > me, it sounds like >>>>> >> > we are doing another nested aggregation inside the table >>>>> >> > which TableAggregateFunction emits. >>>>> >> > >>>>> >> > Maybe I'm not familiar with this function enough, hope you can >>>>> help me >>>>> >> to >>>>> >> > understand. >>>>> >> > >>>>> >> > Best, >>>>> >> > Kurt >>>>> >> > >>>>> >> > >>>>> >> > On Mon, Jul 1, 2019 at 11:59 AM jincheng sun < >>>>> sunjincheng...@gmail.com> >>>>> >> > wrote: >>>>> >> > >>>>> >> > > Hi Kurt, >>>>> >> > > >>>>> >> > > Thanks for your questions, I am glad to share my thoughts here: >>>>> >> > > >>>>> >> > > My question is, will that effect the logic >>>>> ofTableAggregateFunction >>>>> >> user >>>>> >> > > > wrote? Should the user know that there will a key and make >>>>> some >>>>> >> changes >>>>> >> > > to >>>>> >> > > > this function? >>>>> >> > > >>>>> >> > > >>>>> >> > > No, the keys information depends on the implementation of the >>>>> >> > > TableAggregateFunction. >>>>> >> > > For example, for a `topN` user defined TableAggregateFunction, >>>>> we can >>>>> >> > only >>>>> >> > > use the `keys` if the `topN` contains `rankid` in the output. >>>>> You can >>>>> >> > > treat the >>>>> >> > > `keys` like an indicator. >>>>> >> > > >>>>> >> > > If not, how will framework deal with the output of user's >>>>> >> > > > TableAggregateFunction. if user output multiple rows with >>>>> the same >>>>> >> > key, >>>>> >> > > > should be latter one replace previous ones? >>>>> >> > > >>>>> >> > > >>>>> >> > > If a TableAggregateFunction outputs multiple rows with the same >>>>> key, >>>>> >> the >>>>> >> > > latter one should replace the previous one, either with upsert >>>>> mode or >>>>> >> > > retract mode. i.e., Whether the user defines the Key or not, >>>>> the Flink >>>>> >> > > framework should ensure the correctness of the semantics. >>>>> >> > > >>>>> >> > > At present, the problem we are discussing does not involve >>>>> semantic >>>>> >> > > changes. The definition of keys is to support non-window >>>>> >> flatAggregate on >>>>> >> > > upsert mode. (The upsert mode is already supported in the flink >>>>> >> > framework. >>>>> >> > > The current discussion only needs to inform the framework that >>>>> the >>>>> >> keys >>>>> >> > > information, which is the `withKeys` API we discussing.) >>>>> >> > > >>>>> >> > > Welcome any other feedbacks :) >>>>> >> > > >>>>> >> > > Best, >>>>> >> > > Jincheng >>>>> >> > > >>>>> >> > > Kurt Young <ykt...@gmail.com> 于2019年7月1日周一 上午9:23写道: >>>>> >> > > >>>>> >> > > > Hi, >>>>> >> > > > >>>>> >> > > > I have a question about the key information of >>>>> >> TableAggregateFunction. >>>>> >> > > > IIUC, you need to define >>>>> >> > > > something like primary key or unique key in the result table >>>>> of >>>>> >> > > > TableAggregateFunction, and also >>>>> >> > > > need a way to let user configure this through the API. My >>>>> question >>>>> >> is, >>>>> >> > > will >>>>> >> > > > that effect the logic of >>>>> >> > > > TableAggregateFunction user wrote? Should the user know that >>>>> there >>>>> >> > will a >>>>> >> > > > key and make some changes >>>>> >> > > > to this function? >>>>> >> > > > >>>>> >> > > > If so, what's the semantic the user should learned. If not, >>>>> how will >>>>> >> > > > framework deal with the output of user's >>>>> >> > > > TableAggregateFunction. For example, if user output multiple >>>>> rows >>>>> >> with >>>>> >> > > the >>>>> >> > > > same key, should be latter one >>>>> >> > > > replace previous ones? >>>>> >> > > > >>>>> >> > > > Best, >>>>> >> > > > Kurt >>>>> >> > > > >>>>> >> > > > >>>>> >> > > > On Mon, Jul 1, 2019 at 7:19 AM jincheng sun < >>>>> >> sunjincheng...@gmail.com> >>>>> >> > > > wrote: >>>>> >> > > > >>>>> >> > > > > Hi hequn, Thanks for the reply! I think `withKeys` solution >>>>> is our >>>>> >> > > better >>>>> >> > > > > choice! >>>>> >> > > > > >>>>> >> > > > > >>>>> >> > > > > Hequn Cheng <chenghe...@gmail.com> 于2019年6月26日周三 下午5:11写道: >>>>> >> > > > > >>>>> >> > > > > > Hi Jincheng, >>>>> >> > > > > > >>>>> >> > > > > > Thanks for raising the discussion! >>>>> >> > > > > > The key information is very important for query >>>>> optimizations. >>>>> >> It >>>>> >> > > would >>>>> >> > > > > be >>>>> >> > > > > > nice if we can use upsert mode to achieve better >>>>> performance. >>>>> >> > > > > > >>>>> >> > > > > > +1 for the `withKeys` proposal. :) >>>>> >> > > > > > >>>>> >> > > > > > Best, Hequn >>>>> >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > On Wed, Jun 26, 2019 at 4:37 PM jincheng sun < >>>>> >> > > sunjincheng...@gmail.com >>>>> >> > > > > >>>>> >> > > > > > wrote: >>>>> >> > > > > > >>>>> >> > > > > > > Hi all, >>>>> >> > > > > > > >>>>> >> > > > > > > With the continuous efforts from the community, we >>>>> already >>>>> >> > > supported >>>>> >> > > > > > > `flatAggregate`[1] on TableAPI in retract mode. I think >>>>> It's >>>>> >> > better >>>>> >> > > > to >>>>> >> > > > > > add >>>>> >> > > > > > > upsert mode for `flatAggregate`. >>>>> >> > > > > > > >>>>> >> > > > > > > The result table of streaming non-window >>>>> `flatAggregate` is a >>>>> >> > table >>>>> >> > > > > > > contains updates. We can, of course, use a >>>>> >> > > RetractStreamTableSink[2] >>>>> >> > > > to >>>>> >> > > > > > > emit the table, but we can get better performance in >>>>> upsert >>>>> >> mode. >>>>> >> > > > > > However, >>>>> >> > > > > > > due to the lack of keys, we can’t use an >>>>> >> UpsertStreamTableSink to >>>>> >> > > > emit >>>>> >> > > > > > the >>>>> >> > > > > > > table. We don’t have this problem for a normal >>>>> aggregate as it >>>>> >> > > emits >>>>> >> > > > a >>>>> >> > > > > > > single row for each group, so the unique keys are >>>>> exactly the >>>>> >> > same >>>>> >> > > > with >>>>> >> > > > > > the >>>>> >> > > > > > > group keys. While for a `flatAggregate`, its pretty >>>>> difference >>>>> >> > that >>>>> >> > > > due >>>>> >> > > > > > to >>>>> >> > > > > > > emits multi rows(a “sub-table”) for a single group. To >>>>> solve >>>>> >> this >>>>> >> > > > > > problem, >>>>> >> > > > > > > we need to find a way to define keys on flatAggregate, >>>>> so >>>>> >> that we >>>>> >> > > can >>>>> >> > > > > > also >>>>> >> > > > > > > use upsert sink to emit the result table after >>>>> flatAggregate. >>>>> >> > > > > > > >>>>> >> > > > > > > So, Aljoscha, Hequn and I prepared a design document >>>>> for how >>>>> >> to >>>>> >> > > > define >>>>> >> > > > > > the >>>>> >> > > > > > > update keys for `flatAggregate` in upsert mode. The >>>>> detail >>>>> >> can >>>>> >> > be >>>>> >> > > > > found >>>>> >> > > > > > > here: >>>>> >> > > > > > > >>>>> >> > > > > > > >>>>> >> > > > > > > >>>>> >> > > > > > >>>>> >> > > > > >>>>> >> > > > >>>>> >> > > >>>>> >> > >>>>> >> >>>>> https://docs.google.com/document/d/183qHo8PDG-xserDi_AbGP6YX9aPY0rVr80p3O3Gyz6U/edit?usp=sharing >>>>> >> > > > > > > >>>>> >> > > > > > > I appreciate it if you can have look at the document >>>>> and any >>>>> >> > > comments >>>>> >> > > > > are >>>>> >> > > > > > > welcome! >>>>> >> > > > > > > >>>>> >> > > > > > > >>>>> >> > > > > > > Best, >>>>> >> > > > > > > >>>>> >> > > > > > > Jincheng >>>>> >> > > > > > > >>>>> >> > > > > > > >>>>> >> > > > > > > [1] >>>>> >> > > > > > > >>>>> >> > > > > > >>>>> >> > > > > >>>>> >> > > > >>>>> >> > > >>>>> >> > >>>>> >> >>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739 >>>>> >> > > > > > > >>>>> >> > > > > > > [2] >>>>> >> > > > > > > >>>>> >> > > > > > > >>>>> >> > > > > > >>>>> >> > > > > >>>>> >> > > > >>>>> >> > > >>>>> >> > >>>>> >> >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sourceSinks.html#defining-a-streamtablesource >>>>> >> > > > > > > >>>>> >> > > > > > >>>>> >> > > > > >>>>> >> > > > >>>>> >> > > >>>>> >> > >>>>> >> >>>>> > >>>>> >>>>