Hi Ashish,

I had a look at your Trigger and couldn't spot anything that would explain
leaking state.
You're properly cleaning up in clear().

However, I might have found the problem for the increasing state size.
A window is only completely deleted when the time passes its end timestamp
(Window.maxTimestamp()).
Deletion of a window includes deletion of the window object, purging the
window content (same as TriggerResult.PURGE), and calling Trigger.clear().

The max timestamp of a GlobalWindow is Long.MAX_VALUE. Hence, a
GlobalWindow is never really deleted (the window content is gone if you
PURGE  the window).
If you are running the window operator on a key with an evolving domain,
this means you are accumulating state forever.

One way to solve the issue is to implement a custom window assigner with a
conservative max timestamp.
Or you keep your ProcessFunction implementation, which is probably a lot
easier to maintain than a custom window assigner, trigger, and window
function.

Best, Fabian

Btw. you don't need to implement the merging logic of a Trigger, if the
window assigner does not support merging (GlobalWindows does not).

2018-05-15 2:55 GMT+02:00 ashish pok <ashish...@yahoo.com>:

> Thanks Fabian, Kostas,
>
> Here is what I had in the Trigger - idea is to run bitwise OR until a
> threshold is reached or a timeout is reached (nothing too fancy here). Let
> me know what you guys think. Like I said, I moved this logic to Process
> Function and I haven't seen the same issue I was with this.
>
>
> @PublicEvolving
> public class BitwiseOrTrigger<W extends Window> extends
> Trigger<FactoredEvent, W> {
> private static final long serialVersionUID = 1L;
> private final int threshold;
> private final long epocDelta;
> private final ReducingStateDescriptor<Tuple2<Integer, Long>> stateDesc =
> new ReducingStateDescriptor<>("bitwiseOr", new BitwiseOr(),
> TypeInformation.of(new TypeHint<Tuple2<Integer,Long>>() {}));
>
>
> private BitwiseOrTrigger(int threshold, long allowedLateness) {
> this.threshold = threshold;
> this.epocDelta = allowedLateness;
> }
>
> @Override
> public TriggerResult onElement(FactoredEvent event, long timestamp, W
> window, TriggerContext ctx) throws Exception {
> ReducingState<Tuple2<Integer,Long>> currState = ctx.getPartitionedState(
> stateDesc);
> if (this.epocDelta>0) {
> ctx.registerProcessingTimeTimer(System.currentTimeMillis() +
> this.epocDelta);
> }
> currState.add(new Tuple2<Integer,Long>(event.getFactor(),
> this.epocDelta));
> if (currState.get().f0 >= threshold) {
> currState.clear();
> return TriggerResult.FIRE_AND_PURGE;
> }
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
> return TriggerResult.FIRE_AND_PURGE;
> }
>
> @Override
> public TriggerResult onProcessingTime(long time, W window, TriggerContext
> ctx) throws Exception {
> return TriggerResult.FIRE_AND_PURGE;
> }
>
> @Override
> public void clear(W window, TriggerContext ctx) throws Exception {
> ctx.getPartitionedState(stateDesc).clear();
> }
>
> @Override
> public boolean canMerge() {
> return true;
> }
>
> @Override
> public void onMerge(W window, OnMergeContext ctx) throws Exception {
> ctx.mergePartitionedState(stateDesc);
> }
>
> @Override
> public String toString() {
> return "BitwiseOrTrigger(" +  threshold + ")";
> }
>
> public static <W extends Window> BitwiseOrTrigger<W> of(int threshold,
> long expirationEpoc) {
> return new BitwiseOrTrigger<>(threshold, expirationEpoc);
> }
>
> private static class BitwiseOr implements ReduceFunction<Tuple2<Integer,
> Long>> {
> private static final long serialVersionUID = 1L;
>
> @Override
> public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> tup1,
> Tuple2<Integer, Long> tup2) throws Exception {
> Tuple2<Integer, Long> retTup = tup1;
> retTup.f0 = tup1.f0 | tup2.f0;
> return retTup;
> }
>
> }
> }
>
>
> On Monday, May 14, 2018, 6:00:11 AM EDT, Fabian Hueske <fhue...@gmail.com>
> wrote:
>
>
> Hi Ashish,
>
> Did you use per-window state (also called partitioned state) in your
> Trigger?
> If yes, you need to make sure that it is completely removed in the clear()
> method because processing time timers won't fire once a window was purged.
> So you cannot (fully) rely on timers to clean up per-window state.
>
> Best, Fabian
>
> 2018-05-14 9:34 GMT+02:00 Kostas Kloudas <k.klou...@data-artisans.com>:
>
> Hi Ashish,
>
> It would be helpful to share the code of your custom trigger for the first
> case.
> Without that, we cannot tell what state you create and how/when you
> update/clear it.
>
> Cheers,
> Kostas
>
> On May 14, 2018, at 1:04 AM, ashish pok <ashish...@yahoo.com> wrote:
>
> Hi Till,
>
> Thanks for getting back. I am sure that will fix the issue but I feel like
> that would potentially mask an issue. I have been going back and forth with
> Fabian on a use case where for some of our highly transient datasets, it
> might make sense to just use memory based state (except of course data loss
> becomes an issue when apps occasionally hit a problem and whole job
> restarts or app has to be taken down etc - ie. handling graceful shutdowns
> / restarts better essentially). I was on the hook to create a business case
> and post it back to this forum (which I am hoping I can get around to at
> some point soon). Long story short, this is one of those datasets.
>
> States in this case are either fired and cleared normally or on processing
> timeout. So technically, unless there is a memory leak in app code, memory
> usage should plateau out at a high-point. What I was noticing was memory
> would start to creep up ever so slowly.
>
> I couldn't tell exactly why heap utilization kept on growing (ever so
> slowly but it had upward trend for sure) because the states should
> technically be cleared if not as part of a reducing function then on
> timeout. App after running for couple of days would then run into Java Heap
> issues. So changing to RocksDB probably will fix the issue but not
> necessarily leak of states that should be cleared IMO. Interestingly, I
> switched my app from using something like this:
>
> WindowedStream<BasicFactTuple, String, GlobalWindow> windowedStats =
> statsStream
>         .keyBy(BasicFactTuple::getKey)
>         .window(GlobalWindows.create() )
>         .trigger(BitwiseOrTrigger.of( 60, AppConfigs.getWindowSize(5*60*
> 1000)))
>         ;
>
> To
>
>  DataStream<PlatformEvent> processStats = pipStatsStream
>         .keyBy(BasicFactTuple::getKey)
>         .process(new IfStatsReduceProcessFn(
> AppConfigs.getWindowSize(5*60* 1000), 60))
>
> I basically moved logic of trigger to process function over the weekend.
> Once I did that, heap is completely stabilized. In trigger implementation,
> I was using FIRE_AND_PURGE on trigger condition or onProcessingTime and in
> process implementation I am using .clear() method for same.
>
> I seem to have solved the problem by using process but I'd be interested
> to understand the cause of why heap would creep up in trigger scenario.
>
> Hope this makes sense,
>
> Ashish
>
> On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann <
> till.rohrm...@gmail.com> wrote:
>
>
> Hi Ashish,
>
> have you tried using Flink's RocksDBStateBackend? If your job accumulates
> state exceeding the available main memory, then you have to use a state
> backend which can spill to disk. The RocksDBStateBackend offers you exactly
> this functionality.
>
> Cheers,
> Till
>
> On Mon, Apr 30, 2018 at 3:54 PM, ashish pok <ashish...@yahoo.com> wrote:
>
> All,
>
> I am using noticing heap utilization creeping up slowly in couple of apps
> which eventually lead to OOM issue. Apps only have 1 process function that
> cache state. I did make sure I have a clear method invoked when events are
> collected normally, on exception and on timeout.
>
> Are any other best practices others follow for memory backed states?
>
> Thanks,
>
> -- Ashish
>
>
>
>
>

Reply via email to