It seems that this has to do with session windows tbat are mergeable ? I
tried the RixhWindow function and that seems to suggest that one cannot use
state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com>
wrote:

> I have a simple Aggregation with one caveat. For some reason I have to
> keep a large amount of state till the window is GCed. The state is within
> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
> offload the state  to the states backend ( ROCKSDB), keeping the between
> checkpoint state in memory ( seems to be an obvious fix). I am not though
> allowed to have a RichAggregateFunction in the aggregate method of a
> windowed stream . That begs 2 questions
>
> 1. Why
> 2. Is there an alternative for stateful window aggregation where we manage
> the state. ?
>
> Thanks Vishal
>
>
> Here is the code ( generics but it works  )
>
> SingleOutputStreamOperator<OUT> retVal = input
>         .keyBy(keySelector)
>         .window(EventTimeSessionWindows.withGap(gap))
>         .aggregate(
>                 new AggregateFunction<IN, ACC, OUT>() {
>
>                     @Override
>                     public ACC createAccumulator() {
>                         ACC newInstance = (ACC) accumulator.clone();
>                         newInstance.resetLocal();
>                         return newInstance;
>                     }
>
>                     @Override
>                     public void add(IN value, ACC accumulator) {
>                         accumulator.add(value);
>
>                     }
>
>                     @Override
>                     public OUT getResult(ACC accumulator) {
>                         return accumulator.getLocalValue();
>                     }
>
>                     @Override
>                     public ACC merge(ACC a, ACC b) {
>                         a.merge(b);
>                         return a;
>                     }
>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>                     @Override
>                     public void apply(KEY s, TimeWindow window, Iterable<OUT> 
> input, Collector<OUT> out) throws Exception {
>                             out.collect(input.iterator().next());
>                     }
>                 }, accType, aggregationResultType, aggregationResultType);
>
>

Reply via email to