Hi Fabian, Thanks for explaining in detail. But we know and you also mentioned the issues in 1) and 2). So, I am continuing with point 3).
Thanks & Regards Gaurav Luthra Mob:- +91-9901945206 On Mon, Oct 1, 2018 at 3:11 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > There are basically three options: > 1) Use an AggregateFunction and store everything that you would put into > state into the Accumulator. This can become quite expensive because the > Accumulator is de/serialized for every function call if you use RocksDB. > The advantage is that you don't have to store all records in state but only > the data you need. Simple aggregations like COUNT or SUM are quite cheap. > 2) Use Flink's window primitiives and a WindowProcessFunction. In this > case, all records of a Window are stored in a ListState. Adding a record to > the LIst is cheap, but the state might grow quite large for longer windows. > When the window is evaluated, all records are loaded into memory and > iterated by the WindowProcessFunction. > 3) Implement the windowing logic in a ProcessFunction. This requires a lot > of additional logic, depending on what types of windows you want to support. > > Flink's SQL / Table API implements the first approach. > > Best, Fabian > > Am So., 30. Sep. 2018 um 12:48 Uhr schrieb Gaurav Luthra < > gauravluthra6...@gmail.com>: > >> Hi ken, >> >> Mine is very generic use case. Means I am building an aggregation >> function using flink, which can be configured according to any use case. >> Actually, It will not be for a specific use case and every user can enter >> their business logic and use this aggregator to get result. >> And about windowing also, user can configure the type of window and my >> aggregator will ask about the required properties for that window. >> >> I hope you got some idea. >> >> But for make it generic I need to use processfunction and process() >> method to implement it. Instead of more specific AggregateFunction and >> aggregate() method. >> >> So, I am looking for inputs if anyone has tried implementing aggregation >> using ProcessFunction and process() function. As it very much needed thing >> with flink. >> >> Thanks and Regards, >> Gaurav Luthra >> Mob:- +91-9901945206 >> >> >> On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler <kkrugler_li...@transpac.com> >> wrote: >> >>> Hi Gaurav, >>> >>> I’m curious - for your use case, what are the windowing & aggregation >>> requirements? >>> >>> E.g. is it a 10 second sliding window? >>> >>> And what’s the aggregation you’re trying to do? >>> >>> Thanks, >>> >>> — Ken >>> >>> >>> On Sep 28, 2018, at 4:00 AM, Gaurav Luthra <gauravluthra6...@gmail.com> >>> wrote: >>> >>> Hi Chesnay, >>> >>> I know it is an issue, And won't be fixed because of window merging >>> feature in case of session window. >>> But I am looking if someone has implemented aggregation function using >>> ProcessFunction and process() method instead of AggregationFunction and >>> aggregate() method. >>> I hope you got my point. >>> >>> Thanks & Regards >>> Gaurav Luthra >>> >>> >>> >>> On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>>> Please see: https://issues.apache.org/jira/browse/FLINK-10250 >>>> >>>> On 28.09.2018 11:27, vino yang wrote: >>>> >>>> Hi Gaurav, >>>> >>>> Yes, you are right. It is really not allowed to use RichFunction. I >>>> will Ping Timo, he may give you a more professional answer. >>>> >>>> Thanks, vino. >>>> >>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午4:27写道: >>>> >>>>> Hi Vino, >>>>> >>>>> Kindly check below flink code. >>>>> >>>>> package org.apache.flink.streaming.api.datastream.WindowedStream >>>>> >>>>> @PublicEvolving >>>>> public <ACC, R> SingleOutputStreamOperator<R> >>>>> aggregate(AggregateFunction<T, ACC, R> function) { >>>>> checkNotNull(function, "function"); >>>>> >>>>> if (*function instanceof RichFunction*) { >>>>> throw new *UnsupportedOperationException("This aggregation function >>>>> cannot be a RichFunction.")*; >>>>> } >>>>> >>>>> TypeInformation<ACC> accumulatorType = >>>>> TypeExtractor.getAggregateFunctionAccumulatorType( >>>>> function, input.getType(), null, false); >>>>> >>>>> TypeInformation<R> resultType = >>>>> TypeExtractor.getAggregateFunctionReturnType( >>>>> function, input.getType(), null, false); >>>>> >>>>> return aggregate(function, accumulatorType, resultType); >>>>> } >>>>> >>>>> >>>>> Kindly, check above snapshot of flink;s aggregate() method, that got >>>>> applied on windowed stream. >>>>> >>>>> Thanks & Regards >>>>> Gaurav Luthra >>>>> Mob:- +91-9901945206 >>>>> >>>>> >>>>> On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Gaurav, >>>>>> >>>>>> This is very strange, can you share your code and specific >>>>>> exceptions? Under normal circumstances, it should not throw an exception. >>>>>> >>>>>> Thanks, vino. >>>>>> >>>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午3:27写道: >>>>>> >>>>>>> Hi Vino, >>>>>>> >>>>>>> RichAggregateFunction can surely access the state. But the problem >>>>>>> is, In aggregate() method we can not use RichAggregateFunction. >>>>>>> If we use then it throws exception. >>>>>>> >>>>>>> So, the option is to use AggregateFunction (not Rich) with >>>>>>> aggregate() method on windowed stream. Now, In AggregateFunction, we >>>>>>> cannot >>>>>>> access RuntimeContext. Hence we can not use state. >>>>>>> >>>>>>> Thanks & Regards >>>>>>> Gaurav >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Gaurav, >>>>>>>> >>>>>>>> Why do you think the RichAggregateFunction cannot access the State >>>>>>>> API? >>>>>>>> RichAggregateFunction inherits from AbstractRichFunction (it >>>>>>>> provides a RuntimeContext that allows you to access the state API). >>>>>>>> >>>>>>>> Thanks, vino. >>>>>>>> >>>>>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午1:38写道: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> As we are aware, Currently we cannot use RichAggregateFunction in >>>>>>>>> aggregate() method upon windowed stream. So, To access the state >>>>>>>>> in your >>>>>>>>> customAggregateFunction, you can implement it using a >>>>>>>>> ProcessFuntion. >>>>>>>>> This issue is faced by many developers. >>>>>>>>> So, someone must have implemented or tried to implement it. So, >>>>>>>>> kindly share >>>>>>>>> your feedback on this. >>>>>>>>> As I need to implement this. >>>>>>>>> >>>>>>>>> Thanks & Regards >>>>>>>>> Gaurav Luthra >>>>>>>>> >>>>>>>> >>> -------------------------- >>> Ken Krugler >>> +1 530-210-6378 >>> http://www.scaleunlimited.com >>> Custom big data solutions & training >>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >>>