Hi Ahmad,

Yes, that is correct. The aggregated fold value (i.e. your WindowStats 
instance) will be checkpointed by Flink as managed state, and restored from the 
last complete checkpoint in case of failures.
One comment on using the fold function: if what you’re essentially doing in the 
fold is just collecting the elements of the windows per key and performing the 
actual aggregation in the window function, then you don't need the fold.
A generic window function should suit that case. See [1].

Cheers,
Gordon

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction---the-generic-case


On 29 June 2017 at 5:11:58 PM, Ahmad Hassan (ahmad.has...@gmail.com) wrote:

Any thoughts on this problem please?


Hi All,

I am collecting millions of events per 24hour for 'N' number of products where 
'N' can be 50k. I use the following fold mechanism with sliding window:

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
.fold(new WindowStats(), newProductAggregationMapper(), 
newProductAggregationWindowFunction());

In WindowStats class, I keep a map of HashMap<String productID, ProductMetric 
ProductMetric>> which keeps products event count and other various metrics. So 
for 50k products I will have 50k entries in the map within WindowStats instance 
instead of millions of Events as fold function will process them as the event 
arrives.

My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats 
instance for each existing window will automatically be checkpointed and 
restored on recovery? If not then how can I better a implement above usecase to 
store the state of WindowStats object within fold operation please?

Thanks for all the help.

Best Regards,

Reply via email to