Hi Ashish, I had a look at your Trigger and couldn't spot anything that would explain leaking state. You're properly cleaning up in clear().
However, I might have found the problem for the increasing state size. A window is only completely deleted when the time passes its end timestamp (Window.maxTimestamp()). Deletion of a window includes deletion of the window object, purging the window content (same as TriggerResult.PURGE), and calling Trigger.clear(). The max timestamp of a GlobalWindow is Long.MAX_VALUE. Hence, a GlobalWindow is never really deleted (the window content is gone if you PURGE the window). If you are running the window operator on a key with an evolving domain, this means you are accumulating state forever. One way to solve the issue is to implement a custom window assigner with a conservative max timestamp. Or you keep your ProcessFunction implementation, which is probably a lot easier to maintain than a custom window assigner, trigger, and window function. Best, Fabian Btw. you don't need to implement the merging logic of a Trigger, if the window assigner does not support merging (GlobalWindows does not). 2018-05-15 2:55 GMT+02:00 ashish pok <ashish...@yahoo.com>: > Thanks Fabian, Kostas, > > Here is what I had in the Trigger - idea is to run bitwise OR until a > threshold is reached or a timeout is reached (nothing too fancy here). Let > me know what you guys think. Like I said, I moved this logic to Process > Function and I haven't seen the same issue I was with this. > > > @PublicEvolving > public class BitwiseOrTrigger<W extends Window> extends > Trigger<FactoredEvent, W> { > private static final long serialVersionUID = 1L; > private final int threshold; > private final long epocDelta; > private final ReducingStateDescriptor<Tuple2<Integer, Long>> stateDesc = > new ReducingStateDescriptor<>("bitwiseOr", new BitwiseOr(), > TypeInformation.of(new TypeHint<Tuple2<Integer,Long>>() {})); > > > private BitwiseOrTrigger(int threshold, long allowedLateness) { > this.threshold = threshold; > this.epocDelta = allowedLateness; > } > > @Override > public TriggerResult onElement(FactoredEvent event, long timestamp, W > window, TriggerContext ctx) throws Exception { > ReducingState<Tuple2<Integer,Long>> currState = ctx.getPartitionedState( > stateDesc); > if (this.epocDelta>0) { > ctx.registerProcessingTimeTimer(System.currentTimeMillis() + > this.epocDelta); > } > currState.add(new Tuple2<Integer,Long>(event.getFactor(), > this.epocDelta)); > if (currState.get().f0 >= threshold) { > currState.clear(); > return TriggerResult.FIRE_AND_PURGE; > } > return TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { > return TriggerResult.FIRE_AND_PURGE; > } > > @Override > public TriggerResult onProcessingTime(long time, W window, TriggerContext > ctx) throws Exception { > return TriggerResult.FIRE_AND_PURGE; > } > > @Override > public void clear(W window, TriggerContext ctx) throws Exception { > ctx.getPartitionedState(stateDesc).clear(); > } > > @Override > public boolean canMerge() { > return true; > } > > @Override > public void onMerge(W window, OnMergeContext ctx) throws Exception { > ctx.mergePartitionedState(stateDesc); > } > > @Override > public String toString() { > return "BitwiseOrTrigger(" + threshold + ")"; > } > > public static <W extends Window> BitwiseOrTrigger<W> of(int threshold, > long expirationEpoc) { > return new BitwiseOrTrigger<>(threshold, expirationEpoc); > } > > private static class BitwiseOr implements ReduceFunction<Tuple2<Integer, > Long>> { > private static final long serialVersionUID = 1L; > > @Override > public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> tup1, > Tuple2<Integer, Long> tup2) throws Exception { > Tuple2<Integer, Long> retTup = tup1; > retTup.f0 = tup1.f0 | tup2.f0; > return retTup; > } > > } > } > > > On Monday, May 14, 2018, 6:00:11 AM EDT, Fabian Hueske <fhue...@gmail.com> > wrote: > > > Hi Ashish, > > Did you use per-window state (also called partitioned state) in your > Trigger? > If yes, you need to make sure that it is completely removed in the clear() > method because processing time timers won't fire once a window was purged. > So you cannot (fully) rely on timers to clean up per-window state. > > Best, Fabian > > 2018-05-14 9:34 GMT+02:00 Kostas Kloudas <k.klou...@data-artisans.com>: > > Hi Ashish, > > It would be helpful to share the code of your custom trigger for the first > case. > Without that, we cannot tell what state you create and how/when you > update/clear it. > > Cheers, > Kostas > > On May 14, 2018, at 1:04 AM, ashish pok <ashish...@yahoo.com> wrote: > > Hi Till, > > Thanks for getting back. I am sure that will fix the issue but I feel like > that would potentially mask an issue. I have been going back and forth with > Fabian on a use case where for some of our highly transient datasets, it > might make sense to just use memory based state (except of course data loss > becomes an issue when apps occasionally hit a problem and whole job > restarts or app has to be taken down etc - ie. handling graceful shutdowns > / restarts better essentially). I was on the hook to create a business case > and post it back to this forum (which I am hoping I can get around to at > some point soon). Long story short, this is one of those datasets. > > States in this case are either fired and cleared normally or on processing > timeout. So technically, unless there is a memory leak in app code, memory > usage should plateau out at a high-point. What I was noticing was memory > would start to creep up ever so slowly. > > I couldn't tell exactly why heap utilization kept on growing (ever so > slowly but it had upward trend for sure) because the states should > technically be cleared if not as part of a reducing function then on > timeout. App after running for couple of days would then run into Java Heap > issues. So changing to RocksDB probably will fix the issue but not > necessarily leak of states that should be cleared IMO. Interestingly, I > switched my app from using something like this: > > WindowedStream<BasicFactTuple, String, GlobalWindow> windowedStats = > statsStream > .keyBy(BasicFactTuple::getKey) > .window(GlobalWindows.create() ) > .trigger(BitwiseOrTrigger.of( 60, AppConfigs.getWindowSize(5*60* > 1000))) > ; > > To > > DataStream<PlatformEvent> processStats = pipStatsStream > .keyBy(BasicFactTuple::getKey) > .process(new IfStatsReduceProcessFn( > AppConfigs.getWindowSize(5*60* 1000), 60)) > > I basically moved logic of trigger to process function over the weekend. > Once I did that, heap is completely stabilized. In trigger implementation, > I was using FIRE_AND_PURGE on trigger condition or onProcessingTime and in > process implementation I am using .clear() method for same. > > I seem to have solved the problem by using process but I'd be interested > to understand the cause of why heap would creep up in trigger scenario. > > Hope this makes sense, > > Ashish > > On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann < > till.rohrm...@gmail.com> wrote: > > > Hi Ashish, > > have you tried using Flink's RocksDBStateBackend? If your job accumulates > state exceeding the available main memory, then you have to use a state > backend which can spill to disk. The RocksDBStateBackend offers you exactly > this functionality. > > Cheers, > Till > > On Mon, Apr 30, 2018 at 3:54 PM, ashish pok <ashish...@yahoo.com> wrote: > > All, > > I am using noticing heap utilization creeping up slowly in couple of apps > which eventually lead to OOM issue. Apps only have 1 process function that > cache state. I did make sure I have a clear method invoked when events are > collected normally, on exception and on timeout. > > Are any other best practices others follow for memory backed states? > > Thanks, > > -- Ashish > > > > >