Please see: https://issues.apache.org/jira/browse/FLINK-10250
On 28.09.2018 11:27, vino yang wrote:
Hi Gaurav,
Yes, you are right. It is really not allowed to use RichFunction. I
will Ping Timo, he may give you a more professional answer.
Thanks, vino.
Gaurav Luthra <gauravluthra6...@gmail.com
<mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五 下午4:27写道:
Hi Vino,
Kindly check below flink code.
package org.apache.flink.streaming.api.datastream.WindowedStream
@PublicEvolving
public <ACC, R> SingleOutputStreamOperator<R>
aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");
if (*function instanceof RichFunction*) {
throw new *UnsupportedOperationException("This aggregation
function cannot be a RichFunction.")*;
}
TypeInformation<ACC> accumulatorType =
TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);
TypeInformation<R> resultType =
TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);
return aggregate(function, accumulatorType, resultType);
}
Kindly, check above snapshot of flink;s aggregate() method, that
got applied on windowed stream.
Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206
On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com
<mailto:yanghua1...@gmail.com>> wrote:
Hi Gaurav,
This is very strange, can you share your code and specific
exceptions? Under normal circumstances, it should not throw an
exception.
Thanks, vino.
Gaurav Luthra <gauravluthra6...@gmail.com
<mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五
下午3:27写道:
Hi Vino,
RichAggregateFunction can surely access the state. But the
problem is, In aggregate() method we can not use
RichAggregateFunction.
If we use then it throws exception.
So, the option is to use AggregateFunction (not Rich) with
aggregate() method on windowed stream. Now, In
AggregateFunction, we cannot access RuntimeContext. Hence
we can not use state.
Thanks & Regards
Gaurav
On Fri, 28 Sep, 2018, 12:40 PM vino yang,
<yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> wrote:
Hi Gaurav,
Why do you think the RichAggregateFunction cannot
access the State API?
RichAggregateFunction inherits from
AbstractRichFunction (it provides a RuntimeContext
that allows you to access the state API).
Thanks, vino.
Gaurav Luthra <gauravluthra6...@gmail.com
<mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五
下午1:38写道:
Hi,
As we are aware, Currently we cannot use
RichAggregateFunction in
aggregate() method upon windowed stream. So, To
access the state in your
customAggregateFunction, you can implement it
using a ProcessFuntion.
This issue is faced by many developers.
So, someone must have implemented or tried to
implement it. So, kindly share
your feedback on this.
As I need to implement this.
Thanks & Regards
Gaurav Luthra
--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/