It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...
On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com> wrote: > I have a simple Aggregation with one caveat. For some reason I have to > keep a large amount of state till the window is GCed. The state is within > the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to > offload the state to the states backend ( ROCKSDB), keeping the between > checkpoint state in memory ( seems to be an obvious fix). I am not though > allowed to have a RichAggregateFunction in the aggregate method of a > windowed stream . That begs 2 questions > > 1. Why > 2. Is there an alternative for stateful window aggregation where we manage > the state. ? > > Thanks Vishal > > > Here is the code ( generics but it works ) > > SingleOutputStreamOperator<OUT> retVal = input > .keyBy(keySelector) > .window(EventTimeSessionWindows.withGap(gap)) > .aggregate( > new AggregateFunction<IN, ACC, OUT>() { > > @Override > public ACC createAccumulator() { > ACC newInstance = (ACC) accumulator.clone(); > newInstance.resetLocal(); > return newInstance; > } > > @Override > public void add(IN value, ACC accumulator) { > accumulator.add(value); > > } > > @Override > public OUT getResult(ACC accumulator) { > return accumulator.getLocalValue(); > } > > @Override > public ACC merge(ACC a, ACC b) { > a.merge(b); > return a; > } > }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() { > @Override > public void apply(KEY s, TimeWindow window, Iterable<OUT> > input, Collector<OUT> out) throws Exception { > out.collect(input.iterator().next()); > } > }, accType, aggregationResultType, aggregationResultType); > >