I understand that. Let me elaborate. The sequence of events is

1. Round robin dispatch to kafka cluster  ( it is not partitioned on the
key which we may ultimately do  and than I will have more questions on how
to key y and still keep order, pbly avoid shuffle :) ) .
2. key by a high cardinality key
3. Sessionize
4. B'coz of the RR on kafka ( and even if partitioned on the key and a
subsequent key by ), the sort order is not retained and the ACC has to hold
on to the elements in a List . When the Window is finalized  we sort the in
ACC  List  and do pagination, We are looking for paths within a session
from . a source to a sink event based. I was hoping to use ROCKS DB state
as a final merged list and thus off heap and  use a Count based Trigger to
evaluate the ACC and merge the inter Trigger collection to the  master copy
rather than keeping all events in the ACC ( I would imagine a very general
pattern to use ).

Does that make sense ?










On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> If you use an AggregatingFunction in this way (i.e. for a window) the ACC
> should in fact be kept in the state backend. Did you configure the job to
> use RocksDB? How are the memory problems manifesting?
>
> Best,
> Aljoscha
>
>
> On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Vishal,
>
> you are right, it is not possible to use state in an AggregateFunction
> because windows need to be mergeable.
> An AggregateFunction knows how to merge its accumulators but merging
> generic state is not possible.
>
> I am not aware of an efficient and easy work around for this.
> If you want to use the provided session window logic, you can use a
> WindowFunction that performs all computations when the window is triggered.
> This means that aggregations do not happen eagerly and all events for a
> window are collected and held in state.
> Another approach could be to implement the whole logic (incl. the session
> windowing) using a ProcessFunction. This would be a major effort though.
>
> Best,
> Fabian
>
> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>
>> It seems that this has to do with session windows tbat are mergeable ? I
>> tried the RixhWindow function and that seems to suggest that one cannot use
>> state ? Any ideas folks...
>>
>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com>
>> wrote:
>>
>>> I have a simple Aggregation with one caveat. For some reason I have to
>>> keep a large amount of state till the window is GCed. The state is within
>>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
>>> offload the state  to the states backend ( ROCKSDB), keeping the between
>>> checkpoint state in memory ( seems to be an obvious fix). I am not though
>>> allowed to have a RichAggregateFunction in the aggregate method of a
>>> windowed stream . That begs 2 questions
>>>
>>> 1. Why
>>> 2. Is there an alternative for stateful window aggregation where we
>>> manage the state. ?
>>>
>>> Thanks Vishal
>>>
>>>
>>> Here is the code ( generics but it works  )
>>>
>>> SingleOutputStreamOperator<OUT> retVal = input
>>>         .keyBy(keySelector)
>>>         .window(EventTimeSessionWindows.withGap(gap))
>>>         .aggregate(
>>>                 new AggregateFunction<IN, ACC, OUT>() {
>>>
>>>                     @Override
>>>                     public ACC createAccumulator() {
>>>                         ACC newInstance = (ACC) accumulator.clone();
>>>                         newInstance.resetLocal();
>>>                         return newInstance;
>>>                     }
>>>
>>>                     @Override
>>>                     public void add(IN value, ACC accumulator) {
>>>                         accumulator.add(value);
>>>
>>>                     }
>>>
>>>                     @Override
>>>                     public OUT getResult(ACC accumulator) {
>>>                         return accumulator.getLocalValue();
>>>                     }
>>>
>>>                     @Override
>>>                     public ACC merge(ACC a, ACC b) {
>>>                         a.merge(b);
>>>                         return a;
>>>                     }
>>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>>                     @Override
>>>                     public void apply(KEY s, TimeWindow window, 
>>> Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>>                             out.collect(input.iterator().next());
>>>                     }
>>>                 }, accType, aggregationResultType, aggregationResultType);
>>>
>>>
>
>

Reply via email to