Please see: https://issues.apache.org/jira/browse/FLINK-10250

On 28.09.2018 11:27, vino yang wrote:
Hi Gaurav,

Yes, you are right. It is really not allowed to use RichFunction. I will Ping Timo, he may give you a more professional answer.

Thanks, vino.

Gaurav Luthra <gauravluthra6...@gmail.com <mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五 下午4:27写道:

    Hi Vino,

    Kindly check below flink code.

    package org.apache.flink.streaming.api.datastream.WindowedStream

    @PublicEvolving
    public <ACC, R> SingleOutputStreamOperator<R>
    aggregate(AggregateFunction<T, ACC, R> function) {
    checkNotNull(function, "function");

    if (*function instanceof RichFunction*) {
    throw new *UnsupportedOperationException("This aggregation
    function cannot be a RichFunction.")*;
    }

    TypeInformation<ACC> accumulatorType =
    TypeExtractor.getAggregateFunctionAccumulatorType(
    function, input.getType(), null, false);

    TypeInformation<R> resultType =
    TypeExtractor.getAggregateFunctionReturnType(
    function, input.getType(), null, false);

    return aggregate(function, accumulatorType, resultType);
    }


    Kindly, check above snapshot of flink;s aggregate() method, that
    got applied on windowed stream.

    Thanks & Regards
    Gaurav Luthra
    Mob:- +91-9901945206


    On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com
    <mailto:yanghua1...@gmail.com>> wrote:

        Hi Gaurav,

        This is very strange, can you share your code and specific
        exceptions? Under normal circumstances, it should not throw an
        exception.

        Thanks, vino.

        Gaurav Luthra <gauravluthra6...@gmail.com
        <mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五
        下午3:27写道:

            Hi Vino,

            RichAggregateFunction can surely access the state. But the
            problem is, In aggregate() method we can not use
            RichAggregateFunction.
            If we use then it throws exception.

            So, the option is to use AggregateFunction (not Rich) with
            aggregate() method on windowed stream. Now, In
            AggregateFunction, we cannot access RuntimeContext. Hence
            we can not use state.

            Thanks & Regards
            Gaurav



            On Fri, 28 Sep, 2018, 12:40 PM vino yang,
            <yanghua1...@gmail.com <mailto:yanghua1...@gmail.com>> wrote:

                Hi Gaurav,

                Why do you think the RichAggregateFunction cannot
                access the State API?
                RichAggregateFunction inherits from
                AbstractRichFunction (it provides a RuntimeContext
                that allows you to access the state API).

                Thanks, vino.

                Gaurav Luthra <gauravluthra6...@gmail.com
                <mailto:gauravluthra6...@gmail.com>> 于2018年9月28日周五
                下午1:38写道:

                    Hi,

                    As we are aware, Currently we cannot use
                    RichAggregateFunction in
                    aggregate() method upon windowed stream. So, To
                    access the state in your
                    customAggregateFunction, you can implement it
                    using a ProcessFuntion.
                    This issue is faced by many developers.
                    So, someone must have implemented or tried to
                    implement it. So, kindly share
                    your feedback on this.
                    As I need to implement this.

                    Thanks & Regards
                    Gaurav Luthra



                    --
                    Sent from:
                    
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to