Hi Fabian,

Thanks for explaining in detail. But we know and you also mentioned the
issues in 1) and 2). So, I am continuing with point 3).

Thanks & Regards
Gaurav Luthra
Mob:- +91-9901945206


On Mon, Oct 1, 2018 at 3:11 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> There are basically three options:
> 1) Use an AggregateFunction and store everything that you would put into
> state into the Accumulator. This can become quite expensive because the
> Accumulator is de/serialized for every function call if you use RocksDB.
> The advantage is that you don't have to store all records in state but only
> the data you need. Simple aggregations like COUNT or SUM are quite cheap.
> 2) Use Flink's window primitiives and a WindowProcessFunction. In this
> case, all records of a Window are stored in a ListState. Adding a record to
> the LIst is cheap, but the state might grow quite large for longer windows.
> When the window is evaluated, all records are loaded into memory and
> iterated by the WindowProcessFunction.
> 3) Implement the windowing logic in a ProcessFunction. This requires a lot
> of additional logic, depending on what types of windows you want to support.
>
> Flink's SQL / Table API implements the first approach.
>
> Best, Fabian
>
> Am So., 30. Sep. 2018 um 12:48 Uhr schrieb Gaurav Luthra <
> gauravluthra6...@gmail.com>:
>
>> Hi ken,
>>
>> Mine is very generic use case. Means I am building an aggregation
>> function using flink, which can be configured according to any use case.
>> Actually, It will not be for a specific use case and every user can enter
>> their business logic and use this aggregator to get result.
>> And about windowing also, user can configure the type of window and my
>> aggregator will ask about the required properties for that window.
>>
>> I hope you got some idea.
>>
>> But for make it generic I need to use processfunction and process()
>> method to implement it. Instead of more specific AggregateFunction and
>> aggregate() method.
>>
>> So, I am looking for inputs if anyone has tried implementing aggregation
>> using ProcessFunction and process() function. As it very much needed thing
>> with flink.
>>
>> Thanks and Regards,
>> Gaurav Luthra
>> Mob:- +91-9901945206
>>
>>
>> On Sun, Sep 30, 2018 at 5:12 AM Ken Krugler <kkrugler_li...@transpac.com>
>> wrote:
>>
>>> Hi Gaurav,
>>>
>>> I’m curious - for your use case, what are the windowing & aggregation
>>> requirements?
>>>
>>> E.g. is it a 10 second sliding window?
>>>
>>> And what’s the aggregation you’re trying to do?
>>>
>>> Thanks,
>>>
>>> — Ken
>>>
>>>
>>> On Sep 28, 2018, at 4:00 AM, Gaurav Luthra <gauravluthra6...@gmail.com>
>>> wrote:
>>>
>>> Hi Chesnay,
>>>
>>> I know it is an issue, And won't be fixed because of window merging
>>> feature in case of session window.
>>> But I am looking if someone has implemented aggregation function using
>>> ProcessFunction and process() method instead of AggregationFunction and
>>> aggregate() method.
>>> I hope you got my point.
>>>
>>> Thanks & Regards
>>> Gaurav Luthra
>>>
>>>
>>>
>>> On Fri, Sep 28, 2018 at 4:22 PM Chesnay Schepler <ches...@apache.org>
>>> wrote:
>>>
>>>> Please see: https://issues.apache.org/jira/browse/FLINK-10250
>>>>
>>>> On 28.09.2018 11:27, vino yang wrote:
>>>>
>>>> Hi Gaurav,
>>>>
>>>> Yes, you are right. It is really not allowed to use RichFunction. I
>>>> will Ping Timo, he may give you a more professional answer.
>>>>
>>>> Thanks, vino.
>>>>
>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午4:27写道:
>>>>
>>>>> Hi Vino,
>>>>>
>>>>> Kindly check below flink code.
>>>>>
>>>>> package org.apache.flink.streaming.api.datastream.WindowedStream
>>>>>
>>>>> @PublicEvolving
>>>>> public <ACC, R> SingleOutputStreamOperator<R>
>>>>> aggregate(AggregateFunction<T, ACC, R> function) {
>>>>> checkNotNull(function, "function");
>>>>>
>>>>> if (*function instanceof RichFunction*) {
>>>>> throw new *UnsupportedOperationException("This aggregation function
>>>>> cannot be a RichFunction.")*;
>>>>> }
>>>>>
>>>>> TypeInformation<ACC> accumulatorType =
>>>>> TypeExtractor.getAggregateFunctionAccumulatorType(
>>>>> function, input.getType(), null, false);
>>>>>
>>>>> TypeInformation<R> resultType =
>>>>> TypeExtractor.getAggregateFunctionReturnType(
>>>>> function, input.getType(), null, false);
>>>>>
>>>>> return aggregate(function, accumulatorType, resultType);
>>>>> }
>>>>>
>>>>>
>>>>> Kindly, check above snapshot of flink;s aggregate() method, that got
>>>>> applied on windowed stream.
>>>>>
>>>>> Thanks & Regards
>>>>> Gaurav Luthra
>>>>> Mob:- +91-9901945206
>>>>>
>>>>>
>>>>> On Fri, Sep 28, 2018 at 1:40 PM vino yang <yanghua1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Gaurav,
>>>>>>
>>>>>> This is very strange, can you share your code and specific
>>>>>> exceptions? Under normal circumstances, it should not throw an exception.
>>>>>>
>>>>>> Thanks, vino.
>>>>>>
>>>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午3:27写道:
>>>>>>
>>>>>>> Hi Vino,
>>>>>>>
>>>>>>> RichAggregateFunction can surely access the state. But the problem
>>>>>>> is, In aggregate() method we can not use RichAggregateFunction.
>>>>>>> If we use then it throws exception.
>>>>>>>
>>>>>>> So, the option is to use AggregateFunction (not Rich) with
>>>>>>> aggregate() method on windowed stream. Now, In AggregateFunction, we 
>>>>>>> cannot
>>>>>>> access RuntimeContext. Hence we can not use state.
>>>>>>>
>>>>>>> Thanks & Regards
>>>>>>> Gaurav
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, 28 Sep, 2018, 12:40 PM vino yang, <yanghua1...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Gaurav,
>>>>>>>>
>>>>>>>> Why do you think the RichAggregateFunction cannot access the State
>>>>>>>> API?
>>>>>>>> RichAggregateFunction inherits from AbstractRichFunction (it
>>>>>>>> provides a RuntimeContext that allows you to access the state API).
>>>>>>>>
>>>>>>>> Thanks, vino.
>>>>>>>>
>>>>>>>> Gaurav Luthra <gauravluthra6...@gmail.com> 于2018年9月28日周五 下午1:38写道:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> As we are aware, Currently we cannot use RichAggregateFunction in
>>>>>>>>> aggregate() method upon windowed stream. So, To access the state
>>>>>>>>> in your
>>>>>>>>> customAggregateFunction, you can implement it using a
>>>>>>>>> ProcessFuntion.
>>>>>>>>> This issue is faced by many developers.
>>>>>>>>> So, someone must have implemented or tried to implement it. So,
>>>>>>>>> kindly share
>>>>>>>>> your feedback on this.
>>>>>>>>> As I need to implement this.
>>>>>>>>>
>>>>>>>>> Thanks & Regards
>>>>>>>>> Gaurav Luthra
>>>>>>>>>
>>>>>>>>
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>
>>>

Reply via email to