I'm glad that solved your GC problem. I think dipose() is a good place, it is meant for cleanup.

In your case the DoFn is a NOOP, so the PipelineOptions are probably loaded through your UnboundedSource. If both happen to be scheduled in the same TaskManager that is fine. However, just for precaution we should also include the cache invalidation in UnboundedSourceWrapper.

This way we should be good for the streaming execution. Will try to get this into 2.10.0.

Thanks,
Max

Issue: https://jira.apache.org/jira/browse/BEAM-6460

On 17.01.19 12:50, Daniel Harper wrote:
Max, Juan,

Just tried patching this class
https://github.com/apache/beam/blob/v2.7.0/runners/flink/src/main/java/org/
apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#
L389 and putting the clearCache call in the finally block.

Redoing the test causes the GC to kick in (see screenshot)

I¹m not sure if this is the best place to put this clean up code though,
is this the final place where all BEAM related stuff get terminated?

Daniel.



On 17/01/2019, 16:18, "Maximilian Michels" <m...@apache.org> wrote:

Hi Daniel, hi Juan,

@Daniel Thanks a lot for investigating and reporting the issue.

Your analysis looks convincing, it may be that Jackson is holding on to
the
Classloader. Beam uses Jackson to parse the FlinkPipelineOptions.

Have you already tried to call TypeFactory.defaultInstance().clearCache()
in a
catch-all block within your synthetic Beam job, before actually failing?
That
way we could see if the classloader is garbage-collected after a restart.

Let me also investigate in the meantime. We are in the progress of
getting the
2.10.0 release ready with a few pending issues. So it would be a good
time to
fix this issue.

Thanks,
Max

On 17.01.19 09:50, Juan Carlos Garcia wrote:
Nice finding, we are also experiencing the same (Flink 1.5.4)  where
few jobs
are dying of OOM for the metaspace as well after multiple restart, in
our case
we have
a HA flink cluster and not using YARN for orchestration.

Good job with the diagnosing .

JC

On Thu, Jan 17, 2019 at 3:23 PM Daniel Harper <daniel.har...@bbc.co.uk
<mailto:daniel.har...@bbc.co.uk>> wrote:

     Environment:

     BEAM 2.7.0
     Flink 1.5.2
     AWS EMR 5.17.0
     Hadoop YARN for orchestration


     We¹ve noticed the metaspace usage increasing when our Flink job
restarts,
     which in turn sometimes causes YARN to kill the container for going
beyond
     its physical memory limits. After setting the MaxMetaspaceSize
setting and
     making the JVM dump its heap on OOM, we noticed quite a few
instances of the
     FlinkUserClassLoader class hanging around, which corresponded with
the
     number of restarts that happened.

     Originally I posted this issue on the FLINK mailing list here

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/User-
ClassLoader-leak-on-job-restart-td25547.html



     After investigation I think this is related to something in the
BEAM code,
     or the way BEAM interacts with the Flink class loading mechanism,
because I
     can see the following when selecting one of the Œold¹ classloaders
-> Path
     to GC Roots using Eclipse MAT in one of the heap dumps





     This looks to me like this issue
     https://github.com/FasterXML/jackson-databind/issues/1363


     It sounds like to resolve it, user code should call
     TypeFactory.defaultInstance().clearCache()when threads are
shutdown. I¹m not
     sure where in the FlinkRunner codebase this should be though


     To try and narrow it down as much as possible/reduce the number of
     dependencies I¹ve managed to reproduce this with a really really
simple job
     that just reads from a synthetic unbounded source (back-ported from
the
     master branch) and does nothing
https://github.com/djhworld/streaming-job,
     this will run on a Flink environment.

     To reproduce the OOM I just ran the job with MaxMetaspaceSize=125M,
and then
     killed a random task manager every 60 seconds, which yielded the
following



     As you can see the number of classes increases on each restart,
which causes
     the metaspace to increase and eventually cause an OOM.

     Is there anything we could do to fix this? I¹ve not tested this on
2.7.0
     because we are waiting for 2.10 to drop so we can run Flink 1.6/1.7
on EMR

     With thanks,

     Daniel






     ----------------------------

     http://www.bbc.co.uk <http://www.bbc.co.uk>
     This e-mail (and any attachments) is confidential and may contain
personal
     views which are not the views of the BBC unless specifically stated.
     If you have received it in error, please delete it from your system.
     Do not use, copy or disclose the information in any way nor act in
reliance
     on it and notify the sender immediately.
     Please note that the BBC monitors e-mails sent or received.
     Further communication will signify your consent to this.

     ---------------------



--

JC




-----------------------------
http://www.bbc.co.uk
This e-mail (and any attachments) is confidential and
may contain personal views which are not the views of the BBC unless 
specifically stated.
If you have received it in
error, please delete it from your system.
Do not use, copy or disclose the
information in any way nor act in reliance on it and notify the sender
immediately.
Please note that the BBC monitors e-mails
sent or received.
Further communication will signify your consent to
this.
-----------------------------

Reply via email to