All,
I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap,
they are 5GB to 8GB. I did some compares and what I can see is heap shows data
tuples (basically instances of object that is maintained as states) counts
going up slowly.
Only thing I could possibly relate that to were
streaming.api.operators.InternalTimer and
streaming.api.windowing.windows.TimeWindow both were trending up as well. There
are definitely lot more windows created than the increments I could notice but
nevertheless those objects are trending up. Input stream has a very consistent
sin wave throughput. So it really doesn't make sense for windows and tuples to
keep trending up. There is also no event storm or anything of that sort (ie.
source stream has been very steady as far as throughput is concerned).
Here is a plot of heap utilization:
So it has a typical sin wave pattern which is definitely expected as input
stream has the same pattern but source doesnt have a trend upwards like heap
utilization shown above. Screenshot above is showing spike from 60% utilization
to 80% and trend keeps going up until an issue occurs that resets the app.
Since processing is based on ProcessingTime, I really would have expected
memory to reach a steady state and remain sort of flat from a trending
perspective.
Appreciate any pointers anyone might have.
Thanks, Ashish
On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok <[email protected]>
wrote:
Right, thats where I am headed now but was wondering there are any “gochas” I
am missing before I try and dig into a few gigs of heap dump.
Thanks, Ashish
Sent from Yahoo Mail for iPhone
On Monday, June 18, 2018, 3:37 AM, Stefan Richter <[email protected]>
wrote:
Hi,
can you take a heap dump from a JVM that runs into the problem and share it
with us? That would make finding the cause a lot easier.
Best,Stefan
Am 15.06.2018 um 23:01 schrieb ashish pok <[email protected]>:
All,
I have another slow Memory Leak situation using basic TimeSession Window
(earlier it was GlobalWindow related that Fabian helped clarify).
I have a very simple data pipeline:
DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780))))
.trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent") ;
I initially didnt even have ProcessingTmePurgeTrigger and it was using default
Trigger. In an effort to fix this issue, I created my own Trigger from default
ProcessingTimeTrigger with simple override to onProcessingTime method
(essentially replacing FIRE with FIRE_AND_PURGE)
@Override public TriggerResult onProcessingTime(long time,
TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; }
This seems to have done nothing (may have delayed issue by couple of hours -
not certain). But, I still see heap utilization creep up slowly and eventually
reaches a point when GC starts to take too long and then the dreaded OOM.
For completeness here is my Window Function (still using old function
interface). It creates few metrics for reporting and applies logic by looping
over the Iterable. NO states are explicitly kept in this function, needed
RichWindowFunction to generate metrics basically.
public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple,
BasicFactTuple, String, TimeWindow> {
private static final long serialVersionUID = 1L;
private static Logger logger =
LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;
private Meter error;
@Override
public void open(Configuration conf) throws Exception {
this.in = getRuntimeContext()
.getMetricGroup()
.addGroup(AppConstants.APP_METRICS.PROCESS)
.meter(AppConstants.APP_METRICS.IN, new
MeterView(AppConstants.APP_METRICS.INTERVAL_30));
this.out = getRuntimeContext()
.getMetricGroup()
.addGroup(AppConstants.APP_METRICS.PROCESS)
.meter(AppConstants.APP_METRICS.OUT, new
MeterView(AppConstants.APP_METRICS.INTERVAL_30));
this.error = getRuntimeContext()
.getMetricGroup()
.addGroup(AppConstants.APP_METRICS.PROCESS)
.meter(AppConstants.APP_METRICS.ERROR, new
MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}
@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple>
events, Collector<BasicFactTuple> collector) throws Exception {
}
}
Appreciate any pointers on what could be causing leaks here. This seems pretty
straight-forward.
Thanks, Ashish