All,
I have been doing a little digging on this and to Stefan's point incrementing
memory (not necessarily leak) was essentially because of keys that were
incrementing as I was using time buckets concatenated with actual key to make
unique sessions.
Taking a couple of steps back, use case is very simple tumbling window of 15
mins by keys. Stream can be viewed simply as:
<timestamp>|<key>|<value>
We have a few of these type of pipelines and one catch here is we wanted to
create an app which can process historical and current data. HIstorical data is
mainly because users adhoc request for "backfill". In order to easily manage
processing pipeline, we are making no distinction between historical and
current data as processing is based on event time.
1) Of course, easiest way to solve this problem is to create TumblingWindow of
15mins with some allowed lateness. One issue here was watermarks are moved
forward and backfill data appeared to be viewed as late arrival data, which is
a correct behavior from Flink perspective but seems to be causing issues in how
we are trying to handle streams.
2) Another issue is our data collectors are highly distributed - we regularly
get data from later event time buckets faster than older buckets. Also, it is
also more consistent to actually create 15min buckets using concept of Session
instead. So I am creating a key with <timestamp_floor_15mins>|<key> and a
session gap of say 10 mins. This works perfectly from business logic
perspective. However, now I am introducing quite a lot of keys which based on
my heap dumps seem to be hanging around causing memory issues.
3) We converted the apps to a Process function and manage all states using key
defined in step (2) and registering/unregistering timeouts.
Solution (3) seems to be working pretty stable from memory perspective.
However, it just feels like with so much high-level APIs available, we are not
using them properly and keep reverting back to low level Process APIs - in the
last month we have migrated about 5 or 6 apps to Process now :)
For solution (2) it feels like any other Session aggregation use case will have
the issue of keys hanging around (eg: for click streams with user sessions
etc). Isn't there a way to clear those session windows? Sorry, I just feel like
we are missing something simple and have been reverting to low level APIs
instead.
Thanks,
On Friday, June 22, 2018, 9:00:14 AM EDT, ashish pok <[email protected]>
wrote:
Stefan, All,
If there are no further thoughts on this I am going to switch my app to low
level Process API. I still think there is an easier solution here which I am
missing but I will revisit that after I fix Production issue.
Thanks, Ashish
On Thursday, June 21, 2018, 7:28 AM, ashish pok <[email protected]> wrote:
Hi Stefan,
Thanks for outlining the steps and are similar to what we have been doing for
OOM issues.
However, I was looking for something more high level on whether state / key
handling needs some sort of cleanup specifically that is not done by default. I
am about 99% (nothing is certain:)) sure that if I switch this app to a lower
lever API like Process Function and manage my own state and timers, I will not
see this issue. When I had same issue in the past it was for Global Window and
Fabian point d out that new keys are constantly being created. I built a simple
Process Function for that and issue went away. I think your first statement
sort of hints that as well. So let me hone in on that. I am processing a time
series data for network elements. Keys are 10 mins floor of event time concat
with element ID. Idea here was to create 10 min buckets of data with windows
that start with first event in that bucket and fire when no events arrive for
12 or so minutes.So new keys are definitely being created. So,
1- Am I adding to the memory constantly by doing that? Sounds like it based on
your comments.2- If so, whats the way to clear those keys when windows fire if
any?3- It seems like a very simple use case, so now I am wondering if I am even
using the right high level API?
Thanks, Ashish
Sent from Yahoo Mail for iPhone
On Wednesday, June 20, 2018, 4:17 AM, Stefan Richter
<[email protected]> wrote:
Hi,
it is possible that the number of processing time timers can grow, because
internal timers are scoped by time, key, and namespace (typically this means
„window“, because each key can be part of multiple windows). So if the number
of keys in your application is steadily growing this can happen.
To analyse the heap dump, I usually take the following approach:- obviously
include only reachable objects. If dumps are very big, try limit the size or to
trigger the OOM earlier by configuring a lower heap size. It should still give
you the problematic object accumulation, if there is one.- like at the
statistics of „heavy hitter“ classes, i.e. classes for which the instances
contribute the most to the overall heap consumption. Sometimes this will show
you classes that are also part of classes that rank higher up, e.g. 1st place
could be string, and second place char[]. But you can figure that out in the
next step.- explore the instances of the top heavy hitter class(es). If there
is a leak, if you just randomly sample into some objects, the likelihood is
usually *very* high that you catch an object that is part of the leak (as
determined in the next step). Otherwise just repeat and sample another object.-
inspect the object instance and follow the reference links to the parent
objects in the object graph that hold a reference to the leak object candidate.
You will typically end up in some array where the leak accumulates. Inspect the
object holding references to the leaking objects. You can see the field values
and this can help to determine if the collection of objects is justified or if
data is actually leaking. So in your case, you can start from some
InternalTimer or Window object, backwards through the reference chain to see
what class is holding onto them and why (e.g. should they already be gone
w.r.t. to their timestamp). Walking through the references should be supported
by all major heap analysis tools, including JVisualVM that comes with your JDK.
You can also use OQL[1] to query for timers or windows that should already be
gone.
Overall I think it could at least be helpful to see the statistics for heavy
hitter classes and screenshots of representative reference chains to objects to
figure out the problem cause. If it is not possible to share heap dumps,
unfortunately I think giving you this strategy is currently the best I can
offer to help.
Best,Stefan
[1] https://blogs.oracle.com/sundararajan/querying-java-heap-with-oql
Am 20.06.2018 um 02:33 schrieb ashish pok <[email protected]>:
All,
I took a few heap dumps (when app restarts and at 2 hour intervals) using jmap,
they are 5GB to 8GB. I did some compares and what I can see is heap shows data
tuples (basically instances of object that is maintained as states) counts
going up slowly.
Only thing I could possibly relate that to were
streaming.api.operators.InternalTimer and
streaming.api.windowing.windows.TimeWindow both were trending up as well. There
are definitely lot more windows created than the increments I could notice but
nevertheless those objects are trending up. Input stream has a very consistent
sin wave throughput. So it really doesn't make sense for windows and tuples to
keep trending up. There is also no event storm or anything of that sort (ie.
source stream has been very steady as far as throughput is concerned).
Here is a plot of heap utilization:
<1529454480422blob.jpg>
So it has a typical sin wave pattern which is definitely expected as input
stream has the same pattern but source doesnt have a trend upwards like heap
utilization shown above. Screenshot above is showing spike from 60% utilization
to 80% and trend keeps going up until an issue occurs that resets the app.
Since processing is based on ProcessingTime, I really would have expected
memory to reach a steady state and remain sort of flat from a trending
perspective.
Appreciate any pointers anyone might have.
Thanks, Ashish
On Monday, June 18, 2018, 12:54:03 PM EDT, ashish pok <[email protected]>
wrote:
Right, thats where I am headed now but was wondering there are any “gochas” I
am missing before I try and dig into a few gigs of heap dump.
Thanks, Ashish
Sent from Yahoo Mail for iPhone
On Monday, June 18, 2018, 3:37 AM, Stefan Richter <[email protected]>
wrote:
Hi,
can you take a heap dump from a JVM that runs into the problem and share it
with us? That would make finding the cause a lot easier.
Best,Stefan
Am 15.06.2018 um 23:01 schrieb ashish pok <[email protected]>:
All,
I have another slow Memory Leak situation using basic TimeSession Window
(earlier it was GlobalWindow related that Fabian helped clarify).
I have a very simple data pipeline:
DataStream<PlatformEvent> processedData = rawTuples
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(AppConfigs.getWindowSize(780))))
.trigger(new ProcessingTimePurgeTrigger()) .apply(new IPSLAMetricWindowFn())
.name("windowFunctionTuple") .map(new TupleToPlatformEventMapFn())
.name("mapTupleEvent") ;
I initially didnt even have ProcessingTmePurgeTrigger and it was using default
Trigger. In an effort to fix this issue, I created my own Trigger from default
ProcessingTimeTrigger with simple override to onProcessingTime method
(essentially replacing FIRE with FIRE_AND_PURGE)
@Override public TriggerResult onProcessingTime(long time,
TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; }
This seems to have done nothing (may have delayed issue by couple of hours -
not certain). But, I still see heap utilization creep up slowly and eventually
reaches a point when GC starts to take too long and then the dreaded OOM.
For completeness here is my Window Function (still using old function
interface). It creates few metrics for reporting and applies logic by looping
over the Iterable. NO states are explicitly kept in this function, needed
RichWindowFunction to generate metrics basically.
public class IPSLAMetricWindowFn extends RichWindowFunction<NumericFactTuple,
BasicFactTuple, String, TimeWindow> {
private static final long serialVersionUID = 1L;
private static Logger logger =
LoggerFactory.getLogger(IPSLAMetricWindowFn.class);
private Meter in;
private Meter out;
private Meter error;
@Override
public void open(Configuration conf) throws Exception {
this.in = getRuntimeContext()
.getMetricGroup()
.addGroup(AppConstants.APP_METRICS.PROCESS)
.meter(AppConstants.APP_METRICS.IN, new
MeterView(AppConstants.APP_METRICS.INTERVAL_30));
this.out = getRuntimeContext()
.getMetricGroup()
.addGroup(AppConstants.APP_METRICS.PROCESS)
.meter(AppConstants.APP_METRICS.OUT, new
MeterView(AppConstants.APP_METRICS.INTERVAL_30));
this.error = getRuntimeContext()
.getMetricGroup()
.addGroup(AppConstants.APP_METRICS.PROCESS)
.meter(AppConstants.APP_METRICS.ERROR, new
MeterView(AppConstants.APP_METRICS.INTERVAL_30));
super.open(conf);
}
@Override
public void apply(String key, TimeWindow window, Iterable<NumericFactTuple>
events, Collector<BasicFactTuple> collector) throws Exception {
}
}
Appreciate any pointers on what could be causing leaks here. This seems pretty
straight-forward.
Thanks, Ashish
<1529454480422blob.jpg>