Hi Ashish, It would be helpful to share the code of your custom trigger for the first case. Without that, we cannot tell what state you create and how/when you update/clear it.
Cheers, Kostas > On May 14, 2018, at 1:04 AM, ashish pok <ashish...@yahoo.com> wrote: > > Hi Till, > > Thanks for getting back. I am sure that will fix the issue but I feel like > that would potentially mask an issue. I have been going back and forth with > Fabian on a use case where for some of our highly transient datasets, it > might make sense to just use memory based state (except of course data loss > becomes an issue when apps occasionally hit a problem and whole job restarts > or app has to be taken down etc - ie. handling graceful shutdowns / restarts > better essentially). I was on the hook to create a business case and post it > back to this forum (which I am hoping I can get around to at some point > soon). Long story short, this is one of those datasets. > > States in this case are either fired and cleared normally or on processing > timeout. So technically, unless there is a memory leak in app code, memory > usage should plateau out at a high-point. What I was noticing was memory > would start to creep up ever so slowly. > > I couldn't tell exactly why heap utilization kept on growing (ever so slowly > but it had upward trend for sure) because the states should technically be > cleared if not as part of a reducing function then on timeout. App after > running for couple of days would then run into Java Heap issues. So changing > to RocksDB probably will fix the issue but not necessarily leak of states > that should be cleared IMO. Interestingly, I switched my app from using > something like this: > > WindowedStream<BasicFactTuple, String, GlobalWindow> windowedStats = > statsStream > .keyBy(BasicFactTuple::getKey) > .window(GlobalWindows.create()) > .trigger(BitwiseOrTrigger.of(60, > AppConfigs.getWindowSize(5*60*1000))) > ; > > To > > DataStream<PlatformEvent> processStats = pipStatsStream > .keyBy(BasicFactTuple::getKey) > .process(new > IfStatsReduceProcessFn(AppConfigs.getWindowSize(5*60*1000), 60)) > > I basically moved logic of trigger to process function over the weekend. Once > I did that, heap is completely stabilized. In trigger implementation, I was > using FIRE_AND_PURGE on trigger condition or onProcessingTime and in process > implementation I am using .clear() method for same. > > I seem to have solved the problem by using process but I'd be interested to > understand the cause of why heap would creep up in trigger scenario. > > Hope this makes sense, > > Ashish > > On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann > <till.rohrm...@gmail.com> wrote: > > > Hi Ashish, > > have you tried using Flink's RocksDBStateBackend? If your job accumulates > state exceeding the available main memory, then you have to use a state > backend which can spill to disk. The RocksDBStateBackend offers you exactly > this functionality. > > Cheers, > Till > > On Mon, Apr 30, 2018 at 3:54 PM, ashish pok <ashish...@yahoo.com > <mailto:ashish...@yahoo.com>> wrote: > All, > > I am using noticing heap utilization creeping up slowly in couple of apps > which eventually lead to OOM issue. Apps only have 1 process function that > cache state. I did make sure I have a clear method invoked when events are > collected normally, on exception and on timeout. > > Are any other best practices others follow for memory backed states? > > Thanks, > > -- Ashish >