Based on these two settings... taskmanager.memory.flink.size: 16384m taskmanager.memory.jvm-metaspace.size: 3072m
Reading the docs here I'm not sure how to calculate the formula. My suspicion is that I may have allocated too much of taskmanager.memory.flink.size and the total including MaxDirectMemory is more than what the physical OS has, is that possible? https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size Are you able to tell me what these numbers come out for this formula MaxDirectMemory of TM = Network Memory + Task Off-Heap + Framework Off-heap? On Thu, May 23, 2024 at 9:01 AM John Smith <java.dev....@gmail.com> wrote: > Ok, but I still don't get why it's doing it... It's the same version of > flink... Only difference is java 11 and also I allocated more JVM heap and > the actual physical is has more ram. Maybe I should reduce the JVM heap by > a a gigabyte or two? > > On Wed, May 22, 2024, 12:37 PM Zhanghao Chen <zhanghao.c...@outlook.com> > wrote: > >> Hi John, >> >> A side note here: Flink will set the MaxDirectMemory of TM = Network >> Memory + Task Off-Heap + Framework Off-heap, and overwrites JVM's default >> setting, regardless of the version of JVM. >> >> Best, >> Zhanghao Chen >> ------------------------------ >> *From:* John Smith <java.dev....@gmail.com> >> *Sent:* Wednesday, May 22, 2024 22:56 >> *To:* Biao Geng <biaoge...@gmail.com> >> *Cc:* user <user@flink.apache.org> >> *Subject:* Re: Would Java 11 cause Getting OutOfMemoryError: Direct >> buffer memory? >> >> Hi, apologies I hit reply instead of reply all. So not sure who saw this >> or didn't. We have not switched to SSL and also our assumption here >> would be that if we did switch to SSL the jobs would not work or produce >> invalid results. The jobs work absolutely fine for a week or so and then >> they fail. >> >> Here is the consumer config from the task logs, which says PLAINTEXT and >> port 9092 is used. Also I attached a screen of the task manager memory >> usage. As well I read up on MaxDirectMemory setting of Java 8 vs Java 11. >> Java 8 by default calculates the direct memory size to 87% of the max heap >> size. While Java 11 set it to 100% of the max heap size. >> >> [image: Screen Shot 2024-05-22 at 9.50.38 AM.png] >> >> allow.auto.create.topics = true >> auto.commit.interval.ms = 5000 >> auto.offset.reset = latest >> bootstrap.servers = [xxxxxx-kafka-0001:9092, xxxxxx-0002:9092, >> xxxxxx-kafka-0003:9092] >> check.crcs = true >> client.dns.lookup = default >> client.id = xxxxxx >> client.rack = >> connections.max.idle.ms = 540000 >> default.api.timeout.ms = 60000 >> enable.auto.commit = false >> exclude.internal.topics = true >> fetch.max.bytes = 52428800 >> fetch.max.wait.ms = 500 >> fetch.min.bytes = 1 >> group.id = xxxxxx >> group.instance.id = null >> heartbeat.interval.ms = 3000 >> interceptor.classes = [] >> internal.leave.group.on.close = true >> isolation.level = read_uncommitted >> key.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> max.partition.fetch.bytes = 1048576 >> max.poll.interval.ms = 300000 >> max.poll.records = 500 >> metadata.max.age.ms = 300000 >> metric.reporters = [] >> metrics.num.samples = 2 >> metrics.recording.level = INFO >> metrics.sample.window.ms = 30000 >> partition.assignment.strategy = [class >> org.apache.kafka.clients.consumer.RangeAssignor] >> receive.buffer.bytes = 65536 >> reconnect.backoff.max.ms = 1000 >> reconnect.backoff.ms = 50 >> request.timeout.ms = 60000 >> retry.backoff.ms = 100 >> sasl.client.callback.handler.class = null >> sasl.jaas.config = null >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.min.time.before.relogin = 60000 >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> sasl.login.callback.handler.class = null >> sasl.login.class = null >> sasl.login.refresh.buffer.seconds = 300 >> sasl.login.refresh.min.period.seconds = 60 >> sasl.login.refresh.window.factor = 0.8 >> sasl.login.refresh.window.jitter = 0.05 >> sasl.mechanism = GSSAPI >> security.protocol = PLAINTEXT >> security.providers = null >> send.buffer.bytes = 131072 >> session.timeout.ms = 10000 >> ssl.cipher.suites = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.endpoint.identification.algorithm = https >> ssl.key.password = null >> ssl.keymanager.algorithm = SunX509 >> ssl.keystore.location = null >> ssl.keystore.password = null >> ssl.keystore.type = JKS >> ssl.protocol = TLS >> ssl.provider = null >> ssl.secure.random.implementation = null >> ssl.trustmanager.algorithm = PKIX >> ssl.truststore.location = null >> ssl.truststore.password = null >> ssl.truststore.type = JKS >> value.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer >> >> On Thu, May 16, 2024 at 3:20 AM Biao Geng <biaoge...@gmail.com> wrote: >> >> Hi John, >> >> Just want to check, have you ever changed the kafka protocol in your job >> after using the new cluster? The error message shows that it is caused by >> the kafka client and there is a similar error in this issue >> <https://ververica.zendesk.com/hc/en-us/articles/4413642980498-Direct-buffer-OutOfMemoryError-when-using-Kafka-Connector-in-Flink> >> . >> >> Best, >> Biao Geng >> >> >> John Smith <java.dev....@gmail.com> 于2024年5月16日周四 09:01写道: >> >> I deployed a new cluster, same version as my old cluster(1.14.4 ), only >> difference using Java 11 and it seems after a week of usage the below >> exception happens. >> >> The task manager is... >> >> 32GB total >> >> And i have the ONLY following memory settings >> >> taskmanager.memory.flink.size: 16384m >> taskmanager.memory.jvm-metaspace.size: 3072m >> >> >> >> >> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct >> out-of-memory error has occurred. This can mean two things: either job(s) >> require(s) a larger size of JVM direct memory or there is a direct memory >> leak. The direct memory can be allocated by user code or some of its >> dependencies. In this case 'taskmanager.memory.task.off-heap.size' >> configuration option should be increased. Flink framework and its >> dependencies also consume the direct memory, mostly for network >> communication. The most of network memory is managed by Flink and should >> not result in out-of-memory error. In certain special cases, in particular >> for jobs with high parallelism, the framework may require more direct >> memory which is not managed by Flink. In this case >> 'taskmanager.memory.framework.off-heap.size' configuration option should be >> increased. If the error persists then there is probably a direct memory >> leak in user code or some of its dependencies which has to be investigated >> and fixed. The task executor has to be shutdown... >> at java.base/java.nio.Bits.reserveMemory(Bits.java:175) >> at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) >> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) >> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) >> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242) >> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223) >> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) >> at >> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) >> at >> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) >> at >> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) >> at >> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) >> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) >> at >> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) >> at org.apache.kafka.common.network.Selector.poll(Selector.java:483) >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) >> at >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) >> at >> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97) >> at >> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) >> at >> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) >> at >> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) >> at >> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) >> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> at >> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> ... 1 more >> >>