Any help/pointers on this please ? Thanks.
On Thu, 11 Oct 2018 at 10:33, Ahmad Hassan <ahmad.has...@gmail.com> wrote: > Hi All, > > Thanks for the replies. Here is the code snippet of what we want to > achieve: > > We have sliding windows of 24hrs with 5 minutes apart. > > inStream > .filter(Objects::nonNull) > .keyBy("tenant") > .window(SlidingProcessingTimeWindows.of(Time.minutes(1440), > Time.minutes(5))) > .fold(new DefaultVector(), new CalculationFold(), new > MetricCalculationApply()); > > public class CalculationFold implements FoldFunction<Event, DefaultVector> > { > private final MapState<String, DefaultProductMetricVector> products; > private transient MapStateDescriptor<String, DefaultProductMetricVector> > descr; > > @Override > public DefaultVector fold(DefaultVector stats, Event event) > { > if (products.contains(event.getProductId)) > { > DefaultProductMetricVector product = products.get(event.getProductId); > product.updatePrice(event.getPrice); > products.put(event.getProductId, product); > } > else > { > DefaultProductMetricVector product = new DefaultProductMetricVector(); > product.updatePrice(event.getPrice); > products.put(event.getProductId, product); > } > return stats; > } > > * // Fold function do not allow the open method and > this.getRuntimeContext* > //public void open(Configuration parameters) throws Exception > //{ > // descr = new MapStateDescriptor<>("product", String.class, > DefaultProductMetricVector.class); > // products = this.getRuntimeContext().getMapState(descr); > //} > } > > > We expect millions of unique products in 24 hour window so that is the > reason we want to store state on rocksdb of each product class > DefaultProductMetricVector instance. Otherwise, my understanding is that is > that if i instantiate a java hashmap of products within DefaultVector fold > accumulator then for each incoming event the full set of products will be > deserialised and stored on heap which will eventually cause heap overflow > error. > > Please can you tell us how to solve this problem. > > Thanks. > > Best Regards, > > > On Wed, 10 Oct 2018 at 10:21, Fabian Hueske <fhue...@gmail.com> wrote: > >> Yes, it would be good to post your code. >> Are you using a FoldFunction in a window (if yes, what window) or as a >> running aggregate? >> >> In general, collecting state in a FoldFunction is usually not something >> that you should do. Did you consider using an AggregateFunction? >> >> Fabian >> >> Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler < >> ches...@apache.org>: >> >>> In which method are you calling getRuntimeContext()? This method can >>> only be used after open() has been called. >>> >>> On 09.10.2018 17:09, Ahmad Hassan wrote: >>> >>> Hi, >>> >>> We want to use MapState inside fold function to keep the map of all >>> products that we see in 24 hour window to store huge state in rocksdb >>> rather than overflowing heap. However, I don't seem to initialise mapstate >>> within foldfunction or any class that is extending RichMapFunction >>> >>> private transient MapStateDescriptor<String, String> descr = new >>> MapStateDescriptor<>("mymap", String.class, String.class); >>> this.getRuntimeContext().getMapState(descr); >>> >>> I get error >>> >>> java.lang.IllegalStateException: The runtime context has not been >>> initialized. >>> at >>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) >>> >>> >>> Any clues how to get the runtime context please? >>> >>> Thanks. >>> >>> Best regards >>> >>> >>>