Any help/pointers on this please ?

Thanks.

On Thu, 11 Oct 2018 at 10:33, Ahmad Hassan <ahmad.has...@gmail.com> wrote:

> Hi All,
>
> Thanks for the replies. Here is the code snippet of what we want to
> achieve:
>
> We have sliding windows of 24hrs with 5 minutes apart.
>
> inStream
>  .filter(Objects::nonNull)
>  .keyBy("tenant")
>  .window(SlidingProcessingTimeWindows.of(Time.minutes(1440),
> Time.minutes(5)))
>  .fold(new DefaultVector(), new CalculationFold(), new
> MetricCalculationApply());
>
> public class CalculationFold implements FoldFunction<Event, DefaultVector>
> {
> private final MapState<String, DefaultProductMetricVector> products;
> private transient MapStateDescriptor<String, DefaultProductMetricVector>
> descr;
>
> @Override
> public DefaultVector fold(DefaultVector stats, Event event)
> {
> if (products.contains(event.getProductId))
> {
> DefaultProductMetricVector product = products.get(event.getProductId);
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> else
> {
> DefaultProductMetricVector product = new DefaultProductMetricVector();
> product.updatePrice(event.getPrice);
> products.put(event.getProductId, product);
> }
> return stats;
> }
>
> *        // Fold function do not allow the open method and
> this.getRuntimeContext*
> //public void open(Configuration parameters) throws Exception
> //{
> // descr = new MapStateDescriptor<>("product", String.class,
> DefaultProductMetricVector.class);
> // products = this.getRuntimeContext().getMapState(descr);
> //}
> }
>
>
> We expect millions of unique products in 24 hour window so that is the
> reason we want to store state on rocksdb of each product class
> DefaultProductMetricVector instance. Otherwise, my understanding is that is
> that if i instantiate a java hashmap of products within DefaultVector fold
> accumulator then for each incoming event the full set of products will be
> deserialised and stored on heap which will eventually cause heap overflow
> error.
>
> Please can you tell us how to solve this problem.
>
> Thanks.
>
> Best Regards,
>
>
> On Wed, 10 Oct 2018 at 10:21, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Yes, it would be good to post your code.
>> Are you using a FoldFunction in a window (if yes, what window) or as a
>> running aggregate?
>>
>> In general, collecting state in a FoldFunction is usually not something
>> that you should do. Did you consider using an AggregateFunction?
>>
>> Fabian
>>
>> Am Mi., 10. Okt. 2018 um 11:08 Uhr schrieb Chesnay Schepler <
>> ches...@apache.org>:
>>
>>> In which method are you calling getRuntimeContext()? This method can
>>> only be used after open() has been called.
>>>
>>> On 09.10.2018 17:09, Ahmad Hassan wrote:
>>>
>>> Hi,
>>>
>>> We want to use MapState inside fold function to keep the map of all
>>> products that we see in 24 hour window to store huge state in rocksdb
>>> rather than overflowing heap. However, I don't seem to initialise mapstate
>>> within foldfunction or any class that is extending RichMapFunction
>>>
>>> private transient MapStateDescriptor<String, String> descr = new
>>> MapStateDescriptor<>("mymap", String.class, String.class);
>>> this.getRuntimeContext().getMapState(descr);
>>>
>>> I get error
>>>
>>> java.lang.IllegalStateException: The runtime context has not been
>>> initialized.
>>> at
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
>>>
>>>
>>> Any clues how to get the runtime context please?
>>>
>>> Thanks.
>>>
>>> Best regards
>>>
>>>
>>>

Reply via email to