[ 
https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049679#comment-17049679
 ] 

Niels Basjes commented on FLINK-16142:
--------------------------------------

Let me add my context because I also see this problem OOM occur.

I have a batch job which I run in sequence for many days (each day is a batch 
job).
I have on my on premise K8s cluster Ceph installed which provides an S3 
compatible storage facility.
The input files (mulitple per day) are stored in this internal S3.

I start my K8s with a custom image that is built with this Dockerfile
{code}FROM flink:1.10.0-scala_2.12
RUN mkdir /opt/flink/plugins/s3-fs-presto && cp 
/opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto
{code}

I then run my K8s cluster where I extend the 
{code}resourcemanager.taskmanager-timeout{code} to 1 hour to keep the pods 
alive after a job finishes.

My Batch job is roughly the following steps:
- env.readTextFile(path)
- Put that DataSet into a BatchTableEnvironment
- Register this UDF:  https://yauaa.basjes.nl/UDF-ApacheFlinkTable.html
- Put the data through an SQL that puts the data through this UDF and 
aggregates the result per hour.
- Then use the elasticsearch 6 connector to store this in my elastic search.

The first few runs work fine yet over time I get more and more runs that fail 
with this exception:
{code}java.lang.OutOfMemoryError: Metaspace
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at org.apache.logging.log4j.LogManager.<clinit>(LogManager.java:60)
    at 
org.elasticsearch.common.logging.ESLoggerFactory.getLogger(ESLoggerFactory.java:45)
    at 
org.elasticsearch.common.logging.ESLoggerFactory.getLogger(ESLoggerFactory.java:53)
    at org.elasticsearch.common.logging.Loggers.getLogger(Loggers.java:104)
    at 
org.elasticsearch.common.unit.ByteSizeValue.<clinit>(ByteSizeValue.java:39)
    at 
org.elasticsearch.action.bulk.BulkProcessor$Builder.<init>(BulkProcessor.java:88)
    at 
org.elasticsearch.action.bulk.BulkProcessor$Builder.<init>(BulkProcessor.java:80)
    at 
org.elasticsearch.action.bulk.BulkProcessor.builder(BulkProcessor.java:174)
{code}

or this

{code}
java.lang.OutOfMemoryError: Metaspace
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
        at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        at 
org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:72)
        at 
org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:47)
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299)
        at 
nl.basjes.parse.useragent.elasticsearch.ElasticSearchOutputFormat.open(ElasticSearchOutputFormat.java:38)
        at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        at java.lang.Thread.run(Thread.java:748)
{code}



> Memory Leak causes Metaspace OOM error on repeated job submission
> -----------------------------------------------------------------
>
>                 Key: FLINK-16142
>                 URL: https://issues.apache.org/jira/browse/FLINK-16142
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission
>    Affects Versions: 1.10.0
>            Reporter: Thomas Wozniakowski
>            Priority: Blocker
>             Fix For: 1.10.1, 1.11.0
>
>         Attachments: Leak-GC-root.png, java_pid1.hprof, java_pid1.hprof
>
>
> Hi Guys,
> We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our 
> use-case exactly (RocksDB state backend running in a containerised cluster). 
> Unfortunately, it seems like there is a memory leak somewhere in the job 
> submission logic. We are getting this error:
> {code:java}
> 2020-02-18 10:22:10,020 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME 
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: Metaspace
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.<clinit>(AwsSdkMetrics.java:359)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:389)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:279)
> at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:686)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:287)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> {code}
> (The only change in the above text is the OPERATOR_NAME text where I removed 
> some of the internal specifics of our system).
> This will reliably happen on a fresh cluster after submitting and cancelling 
> our job 3 times.
> We are using the presto-s3 plugin, the CEP library and the Kinesis connector.
> Please let me know what other diagnostics would be useful.
> Tom



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to