Hi Gaurav, I’m curious - for your use case, what are the windowing & aggregation requirements?
E.g. is it a 10 second sliding window? And what’s the aggregation you’re trying to do? Thanks, — Ken > On Sep 28, 2018, at 4:00 AM, Gaurav Luthra <gauravluthra6...@gmail.com> wrote: > > Hi Chesnay, > > I know it is an issue, And won't be fixed because of window merging feature > in case of session window. > But I am looking if someone has implemented aggregation function using > ProcessFunction and process() method instead of AggregationFunction and > aggregate() method. > I hope you got my point. > > Thanks & Regards > Gaurav Luthra > > > > On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <ches...@apache.org > <mailto:ches...@apache.org>> wrote: > Please see: https://issues.apache.org/jira/browse/FLINK-10250 > <https://issues.apache.org/jira/browse/FLINK-10250> > > On 28.09.2018 11:27, vino yang wrote: >> Hi Gaurav, >> >> Yes, you are right. It is really not allowed to use RichFunction. I will >> Ping Timo, he may give you a more professional answer. >> >> Thanks, vino. >> >> Gaurav Luthra <gauravluthra6...@gmail.com >> <mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五 下午4:27写道: >> Hi Vino, >> >> Kindly check below flink code. >> >> package org.apache.flink.streaming.api.datastream.WindowedStream >> >> @PublicEvolving >> public <ACC, R> SingleOutputStreamOperator<R> >> aggregate(AggregateFunction<T, ACC, R> function) { >> checkNotNull(function, "function"); >> >> if (function instanceof RichFunction) { >> throw new UnsupportedOperationException("This >> aggregation function cannot be a RichFunction."); >> } >> >> TypeInformation<ACC> accumulatorType = >> TypeExtractor.getAggregateFunctionAccumulatorType( >> function, input.getType(), null, false); >> >> TypeInformation<R> resultType = >> TypeExtractor.getAggregateFunctionReturnType( >> function, input.getType(), null, false); >> >> return aggregate(function, accumulatorType, resultType); >> } >> >> >> Kindly, check above snapshot of flink;s aggregate() method, that got applied >> on windowed stream. >> >> Thanks & Regards >> Gaurav Luthra >> Mob:- +91-9901945206 >> >> >> On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com >> <mailto:yanghua1...@gmail.com>> wrote: >> Hi Gaurav, >> >> This is very strange, can you share your code and specific exceptions? Under >> normal circumstances, it should not throw an exception. >> >> Thanks, vino. >> >> Gaurav Luthra <gauravluthra6...@gmail.com >> <mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五 下午3:27写道: >> Hi Vino, >> >> RichAggregateFunction can surely access the state. But the problem is, In >> aggregate() method we can not use RichAggregateFunction. >> If we use then it throws exception. >> >> So, the option is to use AggregateFunction (not Rich) with aggregate() >> method on windowed stream. Now, In AggregateFunction, we cannot access >> RuntimeContext. Hence we can not use state. >> >> Thanks & Regards >> Gaurav >> >> >> >> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com >> <mailto:yanghua1...@gmail.com>> wrote: >> Hi Gaurav, >> >> Why do you think the RichAggregateFunction cannot access the State API? >> RichAggregateFunction inherits from AbstractRichFunction (it provides a >> RuntimeContext that allows you to access the state API). >> >> Thanks, vino. >> >> Gaurav Luthra <gauravluthra6...@gmail.com >> <mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五 下午1:38写道: >> Hi, >> >> As we are aware, Currently we cannot use RichAggregateFunction in >> aggregate() method upon windowed stream. So, To access the state in your >> customAggregateFunction, you can implement it using a ProcessFuntion. >> This issue is faced by many developers. >> So, someone must have implemented or tried to implement it. So, kindly share >> your feedback on this. >> As I need to implement this. >> >> Thanks & Regards >> Gaurav Luthra -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra