An additional question is that if the source is key partitioned ( kafka ) does a keyBy retain the order of a kafka partirion across a shuffle ?
On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > I understand that. Let me elaborate. The sequence of events is > > 1. Round robin dispatch to kafka cluster ( it is not partitioned on the > key which we may ultimately do and than I will have more questions on how > to key y and still keep order, pbly avoid shuffle :) ) . > 2. key by a high cardinality key > 3. Sessionize > 4. B'coz of the RR on kafka ( and even if partitioned on the key and a > subsequent key by ), the sort order is not retained and the ACC has to hold > on to the elements in a List . When the Window is finalized we sort the in > ACC List and do pagination, We are looking for paths within a session > from . a source to a sink event based. I was hoping to use ROCKS DB state > as a final merged list and thus off heap and use a Count based Trigger to > evaluate the ACC and merge the inter Trigger collection to the master copy > rather than keeping all events in the ACC ( I would imagine a very general > pattern to use ). > > Does that make sense ? > > > > > > > > > > > On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> >> If you use an AggregatingFunction in this way (i.e. for a window) the ACC >> should in fact be kept in the state backend. Did you configure the job to >> use RocksDB? How are the memory problems manifesting? >> >> Best, >> Aljoscha >> >> >> On 6. Dec 2017, at 14:57, Fabian Hueske <fhue...@gmail.com> wrote: >> >> Hi Vishal, >> >> you are right, it is not possible to use state in an AggregateFunction >> because windows need to be mergeable. >> An AggregateFunction knows how to merge its accumulators but merging >> generic state is not possible. >> >> I am not aware of an efficient and easy work around for this. >> If you want to use the provided session window logic, you can use a >> WindowFunction that performs all computations when the window is triggered. >> This means that aggregations do not happen eagerly and all events for a >> window are collected and held in state. >> Another approach could be to implement the whole logic (incl. the session >> windowing) using a ProcessFunction. This would be a major effort though. >> >> Best, >> Fabian >> >> 2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> It seems that this has to do with session windows tbat are mergeable ? I >>> tried the RixhWindow function and that seems to suggest that one cannot use >>> state ? Any ideas folks... >>> >>> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com> >>> wrote: >>> >>>> I have a simple Aggregation with one caveat. For some reason I have to >>>> keep a large amount of state till the window is GCed. The state is within >>>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to >>>> offload the state to the states backend ( ROCKSDB), keeping the between >>>> checkpoint state in memory ( seems to be an obvious fix). I am not though >>>> allowed to have a RichAggregateFunction in the aggregate method of a >>>> windowed stream . That begs 2 questions >>>> >>>> 1. Why >>>> 2. Is there an alternative for stateful window aggregation where we >>>> manage the state. ? >>>> >>>> Thanks Vishal >>>> >>>> >>>> Here is the code ( generics but it works ) >>>> >>>> SingleOutputStreamOperator<OUT> retVal = input >>>> .keyBy(keySelector) >>>> .window(EventTimeSessionWindows.withGap(gap)) >>>> .aggregate( >>>> new AggregateFunction<IN, ACC, OUT>() { >>>> >>>> @Override >>>> public ACC createAccumulator() { >>>> ACC newInstance = (ACC) accumulator.clone(); >>>> newInstance.resetLocal(); >>>> return newInstance; >>>> } >>>> >>>> @Override >>>> public void add(IN value, ACC accumulator) { >>>> accumulator.add(value); >>>> >>>> } >>>> >>>> @Override >>>> public OUT getResult(ACC accumulator) { >>>> return accumulator.getLocalValue(); >>>> } >>>> >>>> @Override >>>> public ACC merge(ACC a, ACC b) { >>>> a.merge(b); >>>> return a; >>>> } >>>> }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() { >>>> @Override >>>> public void apply(KEY s, TimeWindow window, >>>> Iterable<OUT> input, Collector<OUT> out) throws Exception { >>>> out.collect(input.iterator().next()); >>>> } >>>> }, accType, aggregationResultType, aggregationResultType); >>>> >>>> >> >> >