[ https://issues.apache.org/jira/browse/FLINK-30218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lukas Mahl updated FLINK-30218: ------------------------------- Labels: bug (was: ) > [Kafka Connector] java.lang.OutOfMemoryError: Metaspace > ------------------------------------------------------- > > Key: FLINK-30218 > URL: https://issues.apache.org/jira/browse/FLINK-30218 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.15.1 > Environment: +*AWS EMR*+ > * Standard AWS EMR Cluster (1 master YARN node, 1 core node) - 2 vCore, 8 GB > memory each > * JDK 11 Coretto > +*Kafka Consumer Config*+ > {code:java} > acks = 1 > batch.size = 16384 > bootstrap.servers = [...] > buffer.memory = 33554432 > client.dns.lookup = use_all_dns_ips > client.id = producer-1 > compression.type = none > connections.max.idle.ms = 540000 > delivery.timeout.ms = 120000 > enable.idempotence = false > interceptor.classes = [] > internal.auto.downgrade.txn.commit = false > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > linger.ms = 0 > max.block.ms = 60000 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 300000 > metadata.max.idle.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retries = 2147483647 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = class > software.amazon.msk.auth.iam.IAMClientCallbackHandler > sasl.jaas.config = [hidden] > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.mechanism = AWS_MSK_IAM > security.protocol = SASL_SSL > security.providers = null > send.buffer.bytes = 131072 > socket.connection.setup.timeout.max.ms = 30000 > socket.connection.setup.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.3] > ssl.endpoint.identification.algorithm = https > ssl.engine.factory.class = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.certificate.chain = null > ssl.keystore.key = null > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLSv1.3 > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.certificates = null > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 3600000 > transactional.id = null > value.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer{code} > +*Flink Config*+ > {code:java} > taskmanager.memory.process.size=3g > taskmanager.memory.jvm-metaspace.size=512m > taskmanager.numberOfTaskSlots=2 > jobmanager.memory.process.size=3g > jobmanager.memory.jvm-metaspace.size=256m > jobmanager.web.address0.0.0.0 > env.java.opts.all-XX:+HeapDumpOnOutOfMemoryError > -XX:HeapDumpPath=${FLINK_LOG_PREFIX}.hprof {code} > > Reporter: Lukas Mahl > Priority: Major > Labels: bug > Attachments: dump.hprof.zip, image-2022-11-25-15-55-43-559.png > > > Hello! > I'm running a Flink application on AWS EMR which consumes from a Kafka Topic > using the official Flink Kafka consumer. I'm running the application as a > Flink batch job every 30 minutes and I see that the JobManager's metaspace is > increasing every time I submit a new job and doesn't reduce once a job has > finished executing. Eventually the metaspace overflows and I get a > OutOfMemory metaspace exception. I've tried increasing the metaspace to 512m, > but this just delays the problem - hence it's definitely a classloading leak. > I debugged the issue by creating a simple Flink application with a Kafka > consumer only and the issue still occurred, hence I suppose the issue is > somewhere in the Kafka consumer. Only other third party plugin I was using > when doing so was the AWS IAM Kafka dependency > (software.amazon.msk:aws-msk-iam-auth:1.1.5). > I also tried debugging the issue by generating a heap dump as described here > ([https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leak|[https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks]),] > but I wasn't really able to spot the origin of the memory leak. I can see > references to org.apache.kafka.common.utils.AppInfoParser$AppInfo though: > !image-2022-11-25-15-55-43-559.png|width=860,height=359! > I have attached the heap dump - if you need any more information from my set > up feel free to ask, I can provide anything you need. > Many thanks in advance! -- This message was sent by Atlassian Jira (v8.20.10#820010)