Hi Gordon,

Thanks for the details. I am using fold to process events and maintain
statistics per each product ID within WindowStats instance. So fold is much
efficient because events can be in millions but unique products will be
less than 50k. However, if i use generic window function, It will be less
efficient because window function will receive a collection of millions of
events and they will be replicated for each sliding window as Flink
replicate events for each sliding window.

However, can you give an idea on how to use aggregateFunction in latest
flink to replace the following fold function?

final DataStream<WindowStats> eventStream = inputStream
.keyBy(TENANT, CATEGORY)
.window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
*.fold(new WindowStats(),* newProductAggregationMapper(),
newProductAggregationWindowFunction());

Thanks!

On 29 June 2017 at 12:57, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:

> Hi Ahmad,
>
> Yes, that is correct. The aggregated fold value (i.e. your WindowStats
> instance) will be checkpointed by Flink as managed state, and restored from
> the last complete checkpoint in case of failures.
> One comment on using the fold function: if what you’re essentially doing
> in the fold is just collecting the elements of the windows per key and
> performing the actual aggregation in the window function, then you don't
> need the fold.
> A generic window function should suit that case. See [1].
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/windows.html#windowfunction---the-generic-case
>
>
> On 29 June 2017 at 5:11:58 PM, Ahmad Hassan (ahmad.has...@gmail.com)
> wrote:
>
> Any thoughts on this problem please?
>
>
> Hi All,
>
> I am collecting millions of events per 24hour for 'N' number of products
> where 'N' can be 50k. I use the following fold mechanism with sliding
> window:
>
> final DataStream<WindowStats> eventStream = inputStream
> .keyBy(TENANT, CATEGORY)
> .window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
> *.fold(new WindowStats(),* newProductAggregationMapper(),
> newProductAggregationWindowFunction());
>
> In WindowStats class, I keep a map of HashMap<String productID,
> ProductMetric ProductMetric>> which keeps products event count and other
> various metrics. So for 50k products I will have 50k entries in the map
> within WindowStats instance instead of millions of Events as fold
> function will process them as the event arrives.
>
> My question is, if I set (env.enableCheckpointing(1000)), then the WindowStats
> instance for each existing window will automatically be checkpointed and
> restored on recovery? If not then how can I better a implement above
> usecase to store the state of WindowStats object within fold operation
> please?
>
> Thanks for all the help.
>
> Best Regards,
>
>

Reply via email to