I understand that. Let me elaborate. The sequence of events is 1. Round robin dispatch to kafka cluster ( it is not partitioned on the key which we may ultimately do and than I will have more questions on how to key y and still keep order, pbly avoid shuffle :) ) . 2. key by a high cardinality key 3. Sessionize 4. B'coz of the RR on kafka ( and even if partitioned on the key and a subsequent key by ), the sort order is not retained and the ACC has to hold on to the elements in a List . When the Window is finalized we sort the in ACC List and do pagination, We are looking for paths within a session from . a source to a sink event based. I was hoping to use ROCKS DB state as a final merged list and thus off heap and use a Count based Trigger to evaluate the ACC and merge the inter Trigger collection to the master copy rather than keeping all events in the ACC ( I would imagine a very general pattern to use ).
Does that make sense ? On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > If you use an AggregatingFunction in this way (i.e. for a window) the ACC > should in fact be kept in the state backend. Did you configure the job to > use RocksDB? How are the memory problems manifesting? > > Best, > Aljoscha > > > On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Vishal, > > you are right, it is not possible to use state in an AggregateFunction > because windows need to be mergeable. > An AggregateFunction knows how to merge its accumulators but merging > generic state is not possible. > > I am not aware of an efficient and easy work around for this. > If you want to use the provided session window logic, you can use a > WindowFunction that performs all computations when the window is triggered. > This means that aggregations do not happen eagerly and all events for a > window are collected and held in state. > Another approach could be to implement the whole logic (incl. the session > windowing) using a ProcessFunction. This would be a major effort though. > > Best, > Fabian > > 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> It seems that this has to do with session windows tbat are mergeable ? I >> tried the RixhWindow function and that seems to suggest that one cannot use >> state ? Any ideas folks... >> >> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com> >> wrote: >> >>> I have a simple Aggregation with one caveat. For some reason I have to >>> keep a large amount of state till the window is GCed. The state is within >>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to >>> offload the state to the states backend ( ROCKSDB), keeping the between >>> checkpoint state in memory ( seems to be an obvious fix). I am not though >>> allowed to have a RichAggregateFunction in the aggregate method of a >>> windowed stream . That begs 2 questions >>> >>> 1. Why >>> 2. Is there an alternative for stateful window aggregation where we >>> manage the state. ? >>> >>> Thanks Vishal >>> >>> >>> Here is the code ( generics but it works ) >>> >>> SingleOutputStreamOperator<OUT> retVal = input >>> .keyBy(keySelector) >>> .window(EventTimeSessionWindows.withGap(gap)) >>> .aggregate( >>> new AggregateFunction<IN, ACC, OUT>() { >>> >>> @Override >>> public ACC createAccumulator() { >>> ACC newInstance = (ACC) accumulator.clone(); >>> newInstance.resetLocal(); >>> return newInstance; >>> } >>> >>> @Override >>> public void add(IN value, ACC accumulator) { >>> accumulator.add(value); >>> >>> } >>> >>> @Override >>> public OUT getResult(ACC accumulator) { >>> return accumulator.getLocalValue(); >>> } >>> >>> @Override >>> public ACC merge(ACC a, ACC b) { >>> a.merge(b); >>> return a; >>> } >>> }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() { >>> @Override >>> public void apply(KEY s, TimeWindow window, >>> Iterable<OUT> input, Collector<OUT> out) throws Exception { >>> out.collect(input.iterator().next()); >>> } >>> }, accType, aggregationResultType, aggregationResultType); >>> >>> > >