Radu, Thanks for the feedback. ProcessFunction is a lower level execution operator which is not able to be accessed by tableAPI and SQL users. FLINK-6544 is trying to create a generic interface to let tableAPI & SQL users access backend state via UDAGG. It will be eventually code-generated to the processFunction, similar as what you have described.
Regards, Shaoxuan On Fri, May 12, 2017 at 7:46 PM, Radu Tudoran <radu.tudo...@huawei.com> wrote: > Hi, > > In general I believe it is a good idea to expose the state backend to the > functions. You can always optimize the data processing based on the data > storage. Hence, as the level of the processing (aggregation here) you would > be able to control the access to data, you can implement this in a smart > way. Moreover, we can also construct different data organizations/partition > strategy/etc based on the specific computation. I understand that this > would be quite an effort, but at some point it is worth making it. > > Meanwhile if it would not be possible to have the aggregation function > extending the rich interface, wouldn't we be able to supplement this with > some extra logic in the process function that would provide the aggregates > the needed data or at least pointers to the required state? > > As far as I know it would be legal now to have something like: > > ProcessFunction () { > > ValueState state = ... > > processElement(newElement) { > > acc.accumulate(newElement, state) > > } > } > > WeightedAvgAccum { > > public void accumulate(Row newElement, ValueState state) { > > state.value.... > } > } > Would something like this at least partially solve the problem? ...it > would allow you to manage the intermediate data directly in the state > instead of the memory > > > -----Original Message----- > From: Shaoxuan Wang [mailto:wshaox...@gmail.com] > Sent: Friday, May 12, 2017 1:20 PM > To: Dev > Cc: Stephan Ewen > Subject: Re: [DISCUSS] Expose State Backend Interface for UDAGG > > Fabian, > Thanks for your quick reply. > The goal of "FLINK-6544" is not to expose state backend in all UDAGG cases. > It is designed to provide an interface which provides an ability for user > to access state backend when it is allowed (yes, right now this is only > allowed by ProcessFunction). This interface itself does not make the > things better. Instead, it provides a generic interface for the future > adoption of exposing backend state in all different UDAGG cases, and the > current over Aggregate and unbounded group aggregate can enjoy the benefits > of accessing state backend. > > In the meanwhile, I am also curious why we cannot build AggregateFunction > on RichFunction. We will lose lots of benefit of having state backend for > window Aggregate if it does not provide runtime context. > @Stephan It is really appreciate if you can share the concerns or blocking > reasons of not having AggregateFunction designed on top of RichFunction. > > Regards, > Shaoxuan > > > On Fri, May 12, 2017 at 6:21 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > > Hi, thanks for the proposal. > > > > I think exposing state to UDAGGs would be very difficult and a lot of > work. > > > > UDAGGs are called from ProcessFunctions (stream, OVER window and > > non-window aggs), AggregateFunctions (stream, group-window aggs), > > CombineFunctions > > (batch) and GroupReduceFunctions (batch). The batch functions do not > > support state backends at all, ProcessFunctions can register state, > > and AggregateFunction cannot. > > Even when putting the batch case aside this is very hard. > > > > AggregateFunctions support merging of windows. Right now, this only > > involves merging of accumulators. If we allow AggregateFunctions to > > have state, we would also need to provide logic to merge the state. > > Moreover, it is not clearly defined when AggregateFunctions are called > > (similar to Combiners in MapReduce) which would make state handling very > complex. > > Changing this would be a major effort in the DataStream API. > > > > An alternative would be to reimplement the group-window logic in the > > Table API, but this will he a huge effort as well (maybe we have to do > > it anyway at some point though). > > > > @Stephan knows more about the implications of allowing state in > > AggregateFunctions. > > > > Best, Fabian > > > > 2017-05-12 11:53 GMT+02:00 Shaoxuan Wang <wshaox...@gmail.com>: > > > > > Hi everyone, > > > > > > We made some progress in the implementation of UDAGG (FLINK-5564). > > However, > > > we realized that there are cases where users may want to use state > > backend > > > to store the data. For instance, the built-in > > > MaxWithRetractAggFunction currently create a hashMap to store the > > > historical data. It will have problem when the # of keys are huge > enough, thereby leading to OOM. > > > > > > In FLINK-6544, we have proposed an approach to expose State Backend > > > Interface for UDAGG. A brief design doc can be found in > > > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26 > > > nWscLIOn50c/edit > > > > > > I am opening this discussion thread, as I realized there are > > > recently > > some > > > open jiras which are towards to implement some special aggregators, > > > such > > as > > > "count distinct". IMO, "count distinct" is just an UDAGG. With the > > > new proposed FLINK-6544, we can just make it as a built-in agg > > > without > > changing > > > the current UDAGG framework. > > > > > > @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me > > > know what you think. > > > Btw, we do not need include this change for release 1.3 in our opinion. > > > > > > Regards, > > > Shaoxuan > > > > > >