Radu,
Thanks for the feedback. ProcessFunction is a lower level execution
operator which is not able to be accessed by tableAPI and SQL users.
FLINK-6544 is trying to create a generic interface to let tableAPI &
SQL users access backend state via UDAGG. It will be eventually
code-generated to the processFunction, similar as what you have described.

Regards,
Shaoxuan



On Fri, May 12, 2017 at 7:46 PM, Radu Tudoran <radu.tudo...@huawei.com>
wrote:

> Hi,
>
> In general I believe it is a good idea to expose the state backend to the
> functions. You can always optimize the data processing based on the data
> storage. Hence, as the level of the processing (aggregation here) you would
> be able to control the access to data, you can implement this in a smart
> way. Moreover, we can also construct different data organizations/partition
> strategy/etc based on the specific computation. I understand that this
> would be quite an effort, but at some point it is worth making it.
>
> Meanwhile if it would not be possible to have the aggregation function
> extending the rich interface, wouldn't we be able to supplement this with
> some extra logic in the process function that would provide the aggregates
> the needed data or at least pointers to the required state?
>
> As far as I know it would be legal now to have something like:
>
> ProcessFunction () {
>
> ValueState state = ...
>
> processElement(newElement) {
>
> acc.accumulate(newElement, state)
>
> }
> }
>
> WeightedAvgAccum {
>
> public void accumulate(Row newElement, ValueState state) {
>
>     state.value....
> }
> }
> Would something like this at least partially solve the problem? ...it
> would allow you to manage the intermediate data directly in the state
> instead of the memory
>
>
> -----Original Message-----
> From: Shaoxuan Wang [mailto:wshaox...@gmail.com]
> Sent: Friday, May 12, 2017 1:20 PM
> To: Dev
> Cc: Stephan Ewen
> Subject: Re: [DISCUSS] Expose State Backend Interface for UDAGG
>
> Fabian,
> Thanks for your quick reply.
> The goal of "FLINK-6544" is not to expose state backend in all UDAGG cases.
> It is designed to provide an interface which provides an ability for user
> to access state backend when it is allowed (yes, right now this is only
> allowed by ProcessFunction).  This interface itself does not make the
> things better. Instead, it provides a generic interface for the future
> adoption of exposing backend state in all different UDAGG cases, and the
> current over Aggregate and unbounded group aggregate can enjoy the benefits
> of accessing state backend.
>
> In the meanwhile, I am also curious why we cannot build AggregateFunction
> on RichFunction. We will lose lots of benefit of having state backend for
> window Aggregate if it does not provide runtime context.
> @Stephan It is really appreciate if you can share the concerns or blocking
> reasons of not having AggregateFunction designed on top of RichFunction.
>
> Regards,
> Shaoxuan
>
>
> On Fri, May 12, 2017 at 6:21 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi, thanks for the proposal.
> >
> > I think exposing state to UDAGGs would be very difficult and a lot of
> work.
> >
> > UDAGGs are called from ProcessFunctions (stream, OVER window and
> > non-window aggs), AggregateFunctions (stream, group-window aggs),
> > CombineFunctions
> > (batch) and GroupReduceFunctions (batch). The batch functions do not
> > support state backends at all, ProcessFunctions can register state,
> > and AggregateFunction cannot.
> > Even when putting the batch case aside this is very hard.
> >
> > AggregateFunctions support merging of windows. Right now, this only
> > involves merging of accumulators. If we allow AggregateFunctions to
> > have state, we would also need to provide logic to merge the state.
> > Moreover, it is not clearly defined when AggregateFunctions are called
> > (similar to Combiners in MapReduce) which would make state handling very
> complex.
> > Changing this would be a major effort in the DataStream API.
> >
> > An alternative would be to reimplement the group-window logic in the
> > Table API, but this will he a huge effort as well (maybe we have to do
> > it anyway at some point though).
> >
> > @Stephan knows more about the implications of allowing state in
> > AggregateFunctions.
> >
> > Best, Fabian
> >
> > 2017-05-12 11:53 GMT+02:00 Shaoxuan Wang <wshaox...@gmail.com>:
> >
> > > Hi everyone,
> > >
> > > We made some progress in the implementation of UDAGG (FLINK-5564).
> > However,
> > > we realized that there are cases where users may want to use state
> > backend
> > > to store the data. For instance, the built-in
> > > MaxWithRetractAggFunction currently create a hashMap to store the
> > > historical data. It will have problem when the # of keys are huge
> enough, thereby leading to OOM.
> > >
> > > In FLINK-6544, we have proposed an approach to expose State Backend
> > > Interface for UDAGG. A brief design doc can be found in
> > > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26
> > > nWscLIOn50c/edit
> > >
> > > I am opening this discussion thread, as I realized there are
> > > recently
> > some
> > > open jiras which are towards to implement some special aggregators,
> > > such
> > as
> > > "count distinct". IMO, "count distinct" is just an UDAGG. With the
> > > new proposed FLINK-6544, we can just make it as a built-in agg
> > > without
> > changing
> > > the current UDAGG framework.
> > >
> > > @Fabian, @Haohui, @Radu, please take a look at FLINK-6544 and let me
> > > know what you think.
> > > Btw, we do not need include this change for release 1.3 in our opinion.
> > >
> > > Regards,
> > > Shaoxuan
> > >
> >
>

Reply via email to