Hi Gaurav, Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer.
Thanks, vino. Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午4:27写道: > Hi Vino, > > Kindly check below flink code. > > package org.apache.flink.streaming.api.datastream.WindowedStream > > @PublicEvolving > public <ACC, R> SingleOutputStreamOperator<R> > aggregate(AggregateFunction<T, ACC, R> function) { > checkNotNull(function, "function"); > > if (*function instanceof RichFunction*) { > throw new *UnsupportedOperationException("This aggregation function > cannot be a RichFunction.")*; > } > > TypeInformation<ACC> accumulatorType = > TypeExtractor.getAggregateFunctionAccumulatorType( > function, input.getType(), null, false); > > TypeInformation<R> resultType = > TypeExtractor.getAggregateFunctionReturnType( > function, input.getType(), null, false); > > return aggregate(function, accumulatorType, resultType); > } > > > Kindly, check above snapshot of flink;s aggregate() method, that got > applied on windowed stream. > > Thanks & Regards > Gaurav Luthra > Mob:- +91-9901945206 > > > On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com> wrote: > >> Hi Gaurav, >> >> This is very strange, can you share your code and specific exceptions? >> Under normal circumstances, it should not throw an exception. >> >> Thanks, vino. >> >> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午3:27写道: >> >>> Hi Vino, >>> >>> RichAggregateFunction can surely access the state. But the problem is, >>> In aggregate() method we can not use RichAggregateFunction. >>> If we use then it throws exception. >>> >>> So, the option is to use AggregateFunction (not Rich) with aggregate() >>> method on windowed stream. Now, In AggregateFunction, we cannot access >>> RuntimeContext. Hence we can not use state. >>> >>> Thanks & Regards >>> Gaurav >>> >>> >>> >>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com> wrote: >>> >>>> Hi Gaurav, >>>> >>>> Why do you think the RichAggregateFunction cannot access the State API? >>>> RichAggregateFunction inherits from AbstractRichFunction (it provides a >>>> RuntimeContext that allows you to access the state API). >>>> >>>> Thanks, vino. >>>> >>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午1:38写道: >>>> >>>>> Hi, >>>>> >>>>> As we are aware, Currently we cannot use RichAggregateFunction in >>>>> aggregate() method upon windowed stream. So, To access the state in >>>>> your >>>>> customAggregateFunction, you can implement it using a ProcessFuntion. >>>>> This issue is faced by many developers. >>>>> So, someone must have implemented or tried to implement it. So, kindly >>>>> share >>>>> your feedback on this. >>>>> As I need to implement this. >>>>> >>>>> Thanks & Regards >>>>> Gaurav Luthra >>>>> >>>>> >>>>> >>>>> -- >>>>> Sent from: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>> >>>>