[ https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049679#comment-17049679 ]
Niels Basjes commented on FLINK-16142: -------------------------------------- Let me add my context because I also see this problem OOM occur. I have a batch job which I run in sequence for many days (each day is a batch job). I have on my on premise K8s cluster Ceph installed which provides an S3 compatible storage facility. The input files (mulitple per day) are stored in this internal S3. I start my K8s with a custom image that is built with this Dockerfile {code}FROM flink:1.10.0-scala_2.12 RUN mkdir /opt/flink/plugins/s3-fs-presto && cp /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto {code} I then run my K8s cluster where I extend the {code}resourcemanager.taskmanager-timeout{code} to 1 hour to keep the pods alive after a job finishes. My Batch job is roughly the following steps: - env.readTextFile(path) - Put that DataSet into a BatchTableEnvironment - Register this UDF: https://yauaa.basjes.nl/UDF-ApacheFlinkTable.html - Put the data through an SQL that puts the data through this UDF and aggregates the result per hour. - Then use the elasticsearch 6 connector to store this in my elastic search. The first few runs work fine yet over time I get more and more runs that fail with this exception: {code}java.lang.OutOfMemoryError: Metaspace at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:757) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:757) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at org.apache.logging.log4j.LogManager.<clinit>(LogManager.java:60) at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(ESLoggerFactory.java:45) at org.elasticsearch.common.logging.ESLoggerFactory.getLogger(ESLoggerFactory.java:53) at org.elasticsearch.common.logging.Loggers.getLogger(Loggers.java:104) at org.elasticsearch.common.unit.ByteSizeValue.<clinit>(ByteSizeValue.java:39) at org.elasticsearch.action.bulk.BulkProcessor$Builder.<init>(BulkProcessor.java:88) at org.elasticsearch.action.bulk.BulkProcessor$Builder.<init>(BulkProcessor.java:80) at org.elasticsearch.action.bulk.BulkProcessor.builder(BulkProcessor.java:174) {code} or this {code} java.lang.OutOfMemoryError: Metaspace at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:757) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:72) at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:47) at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299) at nl.basjes.parse.useragent.elasticsearch.ElasticSearchOutputFormat.open(ElasticSearchOutputFormat.java:38) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:205) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) {code} > Memory Leak causes Metaspace OOM error on repeated job submission > ----------------------------------------------------------------- > > Key: FLINK-16142 > URL: https://issues.apache.org/jira/browse/FLINK-16142 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission > Affects Versions: 1.10.0 > Reporter: Thomas Wozniakowski > Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > Attachments: Leak-GC-root.png, java_pid1.hprof, java_pid1.hprof > > > Hi Guys, > We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our > use-case exactly (RocksDB state backend running in a containerised cluster). > Unfortunately, it seems like there is a memory leak somewhere in the job > submission logic. We are getting this error: > {code:java} > 2020-02-18 10:22:10,020 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME > switched from RUNNING to FAILED. > java.lang.OutOfMemoryError: Metaspace > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:757) > at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) > at java.net.URLClassLoader.access$100(URLClassLoader.java:74) > at java.net.URLClassLoader$1.run(URLClassLoader.java:369) > at java.net.URLClassLoader$1.run(URLClassLoader.java:363) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:362) > at java.lang.ClassLoader.loadClass(ClassLoader.java:419) > at > org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60) > at java.lang.ClassLoader.loadClass(ClassLoader.java:352) > at > org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.<clinit>(AwsSdkMetrics.java:359) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:389) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:279) > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:686) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:287) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > {code} > (The only change in the above text is the OPERATOR_NAME text where I removed > some of the internal specifics of our system). > This will reliably happen on a fresh cluster after submitting and cancelling > our job 3 times. > We are using the presto-s3 plugin, the CEP library and the Kinesis connector. > Please let me know what other diagnostics would be useful. > Tom -- This message was sent by Atlassian Jira (v8.3.4#803005)