An additional question is that if the source is key partitioned  ( kafka )
does a keyBy retain the order of  a kafka partirion across a shuffle ?

On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> I understand that. Let me elaborate. The sequence of events is
>
> 1. Round robin dispatch to kafka cluster  ( it is not partitioned on the
> key which we may ultimately do  and than I will have more questions on how
> to key y and still keep order, pbly avoid shuffle :) ) .
> 2. key by a high cardinality key
> 3. Sessionize
> 4. B'coz of the RR on kafka ( and even if partitioned on the key and a
> subsequent key by ), the sort order is not retained and the ACC has to hold
> on to the elements in a List . When the Window is finalized  we sort the in
> ACC  List  and do pagination, We are looking for paths within a session
> from . a source to a sink event based. I was hoping to use ROCKS DB state
> as a final merged list and thus off heap and  use a Count based Trigger to
> evaluate the ACC and merge the inter Trigger collection to the  master copy
> rather than keeping all events in the ACC ( I would imagine a very general
> pattern to use ).
>
> Does that make sense ?
>
>
>
>
>
>
>
>
>
>
> On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>>
>> If you use an AggregatingFunction in this way (i.e. for a window) the ACC
>> should in fact be kept in the state backend. Did you configure the job to
>> use RocksDB? How are the memory problems manifesting?
>>
>> Best,
>> Aljoscha
>>
>>
>> On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>> Hi Vishal,
>>
>> you are right, it is not possible to use state in an AggregateFunction
>> because windows need to be mergeable.
>> An AggregateFunction knows how to merge its accumulators but merging
>> generic state is not possible.
>>
>> I am not aware of an efficient and easy work around for this.
>> If you want to use the provided session window logic, you can use a
>> WindowFunction that performs all computations when the window is triggered.
>> This means that aggregations do not happen eagerly and all events for a
>> window are collected and held in state.
>> Another approach could be to implement the whole logic (incl. the session
>> windowing) using a ProcessFunction. This would be a major effort though.
>>
>> Best,
>> Fabian
>>
>> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>
>>> It seems that this has to do with session windows tbat are mergeable ? I
>>> tried the RixhWindow function and that seems to suggest that one cannot use
>>> state ? Any ideas folks...
>>>
>>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com>
>>> wrote:
>>>
>>>> I have a simple Aggregation with one caveat. For some reason I have to
>>>> keep a large amount of state till the window is GCed. The state is within
>>>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
>>>> offload the state  to the states backend ( ROCKSDB), keeping the between
>>>> checkpoint state in memory ( seems to be an obvious fix). I am not though
>>>> allowed to have a RichAggregateFunction in the aggregate method of a
>>>> windowed stream . That begs 2 questions
>>>>
>>>> 1. Why
>>>> 2. Is there an alternative for stateful window aggregation where we
>>>> manage the state. ?
>>>>
>>>> Thanks Vishal
>>>>
>>>>
>>>> Here is the code ( generics but it works  )
>>>>
>>>> SingleOutputStreamOperator<OUT> retVal = input
>>>>         .keyBy(keySelector)
>>>>         .window(EventTimeSessionWindows.withGap(gap))
>>>>         .aggregate(
>>>>                 new AggregateFunction<IN, ACC, OUT>() {
>>>>
>>>>                     @Override
>>>>                     public ACC createAccumulator() {
>>>>                         ACC newInstance = (ACC) accumulator.clone();
>>>>                         newInstance.resetLocal();
>>>>                         return newInstance;
>>>>                     }
>>>>
>>>>                     @Override
>>>>                     public void add(IN value, ACC accumulator) {
>>>>                         accumulator.add(value);
>>>>
>>>>                     }
>>>>
>>>>                     @Override
>>>>                     public OUT getResult(ACC accumulator) {
>>>>                         return accumulator.getLocalValue();
>>>>                     }
>>>>
>>>>                     @Override
>>>>                     public ACC merge(ACC a, ACC b) {
>>>>                         a.merge(b);
>>>>                         return a;
>>>>                     }
>>>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>>>                     @Override
>>>>                     public void apply(KEY s, TimeWindow window, 
>>>> Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>>>                             out.collect(input.iterator().next());
>>>>                     }
>>>>                 }, accType, aggregationResultType, aggregationResultType);
>>>>
>>>>
>>
>>
>

Reply via email to