Hi Till,
Thanks for getting back. I am sure that will fix the issue but I feel like that
would potentially mask an issue. I have been going back and forth with Fabian
on a use case where for some of our highly transient datasets, it might make
sense to just use memory based state (except of course data loss becomes an
issue when apps occasionally hit a problem and whole job restarts or app has to
be taken down etc - ie. handling graceful shutdowns / restarts better
essentially). I was on the hook to create a business case and post it back to
this forum (which I am hoping I can get around to at some point soon). Long
story short, this is one of those datasets.
States in this case are either fired and cleared normally or on processing
timeout. So technically, unless there is a memory leak in app code, memory
usage should plateau out at a high-point. What I was noticing was memory would
start to creep up ever so slowly.
I couldn't tell exactly why heap utilization kept on growing (ever so slowly
but it had upward trend for sure) because the states should technically be
cleared if not as part of a reducing function then on timeout. App after
running for couple of days would then run into Java Heap issues. So changing to
RocksDB probably will fix the issue but not necessarily leak of states that
should be cleared IMO. Interestingly, I switched my app from using something
like this:
WindowedStream<BasicFactTuple, String, GlobalWindow> windowedStats =
statsStream .keyBy(BasicFactTuple::getKey)
.window(GlobalWindows.create()) .trigger(BitwiseOrTrigger.of(60,
AppConfigs.getWindowSize(5*60*1000))) ;
To
DataStream<PlatformEvent> processStats = pipStatsStream
.keyBy(BasicFactTuple::getKey) .process(new
IfStatsReduceProcessFn(AppConfigs.getWindowSize(5*60*1000), 60))
I basically moved logic of trigger to process function over the weekend. Once I
did that, heap is completely stabilized. In trigger implementation, I was using
FIRE_AND_PURGE on trigger condition or onProcessingTime and in process
implementation I am using .clear() method for same.
I seem to have solved the problem by using process but I'd be interested to
understand the cause of why heap would creep up in trigger scenario.
Hope this makes sense,
Ashish
On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann
<[email protected]> wrote:
Hi Ashish,
have you tried using Flink's RocksDBStateBackend? If your job accumulates state
exceeding the available main memory, then you have to use a state backend which
can spill to disk. The RocksDBStateBackend offers you exactly this
functionality.
Cheers,Till
On Mon, Apr 30, 2018 at 3:54 PM, ashish pok <[email protected]> wrote:
All,
I am using noticing heap utilization creeping up slowly in couple of apps which
eventually lead to OOM issue. Apps only have 1 process function that cache
state. I did make sure I have a clear method invoked when events are collected
normally, on exception and on timeout.
Are any other best practices others follow for memory backed states?
Thanks,
-- Ashish