Hi Gaurav,

Yes, you are right. It is really not allowed to use RichFunction. I will
Ping Timo, he may give you a more professional answer.

Thanks, vino.

Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午4:27写道:

> Hi Vino,
>
> Kindly check below flink code.
>
> package org.apache.flink.streaming.api.datastream.WindowedStream
>
> @PublicEvolving
> public <ACC, R> SingleOutputStreamOperator<R>
> aggregate(AggregateFunction<T, ACC, R> function) {
> checkNotNull(function, "function");
>
> if (*function instanceof RichFunction*) {
> throw new *UnsupportedOperationException("This aggregation function
> cannot be a RichFunction.")*;
> }
>
> TypeInformation<ACC> accumulatorType =
> TypeExtractor.getAggregateFunctionAccumulatorType(
> function, input.getType(), null, false);
>
> TypeInformation<R> resultType =
> TypeExtractor.getAggregateFunctionReturnType(
> function, input.getType(), null, false);
>
> return aggregate(function, accumulatorType, resultType);
> }
>
>
> Kindly, check above snapshot of flink;s aggregate() method, that got
> applied on windowed stream.
>
> Thanks & Regards
> Gaurav Luthra
> Mob:- +91-9901945206
>
>
> On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com> wrote:
>
>> Hi Gaurav,
>>
>> This is very strange, can you share your code and specific exceptions?
>> Under normal circumstances, it should not throw an exception.
>>
>> Thanks, vino.
>>
>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午3:27写道:
>>
>>> Hi Vino,
>>>
>>> RichAggregateFunction can surely access the state. But the problem is,
>>> In aggregate() method we can not use RichAggregateFunction.
>>> If we use then it throws exception.
>>>
>>> So, the option is to use AggregateFunction (not Rich) with aggregate()
>>> method on windowed stream. Now, In AggregateFunction, we cannot access
>>> RuntimeContext. Hence we can not use state.
>>>
>>> Thanks & Regards
>>> Gaurav
>>>
>>>
>>>
>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com> wrote:
>>>
>>>> Hi Gaurav,
>>>>
>>>> Why do you think the RichAggregateFunction cannot access the State API?
>>>> RichAggregateFunction inherits from AbstractRichFunction (it provides a
>>>> RuntimeContext that allows you to access the state API).
>>>>
>>>> Thanks, vino.
>>>>
>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午1:38写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> As we are aware, Currently we cannot use RichAggregateFunction in
>>>>> aggregate() method upon windowed stream. So, To access the state in
>>>>> your
>>>>> customAggregateFunction, you can implement it using a ProcessFuntion.
>>>>> This issue is faced by many developers.
>>>>> So, someone must have implemented or tried to implement it. So, kindly
>>>>> share
>>>>> your feedback on this.
>>>>> As I need to implement this.
>>>>>
>>>>> Thanks & Regards
>>>>> Gaurav Luthra
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>

Reply via email to