Hi John, Based on the Memory config screenshot provided before, each of your TM should have MaxDirectMemory=1GB (network mem) + 128 MB (framework off-heap) = 1152 MB. Nor will taskmanager.memory.flink.size and the total including MaxDirectMemory exceed pod physical mem, you may check the detailed TM memory model [1] and double check for yourself.
Maybe you can further analyze the direct memory usage using tools like JVM Native Memory Tracking (NMT). [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/#detailed-memory-model Best, Zhanghao Chen ________________________________ From: John Smith <java.dev....@gmail.com> Sent: Thursday, May 23, 2024 22:40 To: Zhanghao Chen <zhanghao.c...@outlook.com> Cc: Biao Geng <biaoge...@gmail.com>; user <user@flink.apache.org> Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory? Based on these two settings... taskmanager.memory.flink.size: 16384m taskmanager.memory.jvm-metaspace.size: 3072m Reading the docs here I'm not sure how to calculate the formula. My suspicion is that I may have allocated too much of taskmanager.memory.flink.size and the total including MaxDirectMemory is more than what the physical OS has, is that possible? https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size Are you able to tell me what these numbers come out for this formula MaxDirectMemory of TM = Network Memory + Task Off-Heap + Framework Off-heap? On Thu, May 23, 2024 at 9:01 AM John Smith <java.dev....@gmail.com<mailto:java.dev....@gmail.com>> wrote: Ok, but I still don't get why it's doing it... It's the same version of flink... Only difference is java 11 and also I allocated more JVM heap and the actual physical is has more ram. Maybe I should reduce the JVM heap by a a gigabyte or two? On Wed, May 22, 2024, 12:37 PM Zhanghao Chen <zhanghao.c...@outlook.com<mailto:zhanghao.c...@outlook.com>> wrote: Hi John, A side note here: Flink will set the MaxDirectMemory of TM = Network Memory + Task Off-Heap + Framework Off-heap, and overwrites JVM's default setting, regardless of the version of JVM. Best, Zhanghao Chen ________________________________ From: John Smith <java.dev....@gmail.com<mailto:java.dev....@gmail.com>> Sent: Wednesday, May 22, 2024 22:56 To: Biao Geng <biaoge...@gmail.com<mailto:biaoge...@gmail.com>> Cc: user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory? Hi, apologies I hit reply instead of reply all. So not sure who saw this or didn't. We have not switched to SSL and also our assumption here would be that if we did switch to SSL the jobs would not work or produce invalid results. The jobs work absolutely fine for a week or so and then they fail. Here is the consumer config from the task logs, which says PLAINTEXT and port 9092 is used. Also I attached a screen of the task manager memory usage. As well I read up on MaxDirectMemory setting of Java 8 vs Java 11. Java 8 by default calculates the direct memory size to 87% of the max heap size. While Java 11 set it to 100% of the max heap size. [Screen Shot 2024-05-22 at 9.50.38 AM.png] allow.auto.create.topics = true auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000 auto.offset.reset = latest bootstrap.servers = [xxxxxx-kafka-0001:9092, xxxxxx-0002:9092, xxxxxx-kafka-0003:9092] check.crcs = true client.dns.lookup = default client.id<http://client.id> = xxxxxx client.rack = connections.max.idle.ms<http://connections.max.idle.ms> = 540000 default.api.timeout.ms<http://default.api.timeout.ms> = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms<http://fetch.max.wait.ms> = 500 fetch.min.bytes = 1 group.id<http://group.id> = xxxxxx group.instance.id<http://group.instance.id> = null heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms<http://max.poll.interval.ms> = 300000 max.poll.records = 500 metadata.max.age.ms<http://metadata.max.age.ms> = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms<http://metrics.sample.window.ms> = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000 reconnect.backoff.ms<http://reconnect.backoff.ms> = 50 request.timeout.ms<http://request.timeout.ms> = 60000 retry.backoff.ms<http://retry.backoff.ms> = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms<http://session.timeout.ms> = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer On Thu, May 16, 2024 at 3:20 AM Biao Geng <biaoge...@gmail.com<mailto:biaoge...@gmail.com>> wrote: Hi John, Just want to check, have you ever changed the kafka protocol in your job after using the new cluster? The error message shows that it is caused by the kafka client and there is a similar error in this issue<https://ververica.zendesk.com/hc/en-us/articles/4413642980498-Direct-buffer-OutOfMemoryError-when-using-Kafka-Connector-in-Flink>. Best, Biao Geng John Smith <java.dev....@gmail.com<mailto:java.dev....@gmail.com>> 于2024年5月16日周四 09:01写道: I deployed a new cluster, same version as my old cluster(1.14.4 ), only difference using Java 11 and it seems after a week of usage the below exception happens. The task manager is... 32GB total And i have the ONLY following memory settings taskmanager.memory.flink.size: 16384m taskmanager.memory.jvm-metaspace.size: 3072m Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown... at java.base/java.nio.Bits.reserveMemory(Bits.java:175) at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242) at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223) at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more