Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My
suspicion is that I may have allocated too much
of taskmanager.memory.flink.size and the total including MaxDirectMemory is
more than what the physical OS has, is that possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula
MaxDirectMemory
of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith <java.dev....@gmail.com> wrote:

> Ok, but I still don't get why it's doing it... It's the same version of
> flink... Only difference is java 11 and also I allocated more JVM heap and
> the actual physical is has more ram. Maybe I should reduce the JVM heap by
> a a gigabyte or two?
>
> On Wed, May 22, 2024, 12:37 PM Zhanghao Chen <zhanghao.c...@outlook.com>
> wrote:
>
>> Hi John,
>>
>> A side note here: Flink will set the MaxDirectMemory of TM = Network
>> Memory + Task Off-Heap + Framework Off-heap, and overwrites JVM's default
>> setting, regardless of the version of JVM.
>>
>> Best,
>> Zhanghao Chen
>> ------------------------------
>> *From:* John Smith <java.dev....@gmail.com>
>> *Sent:* Wednesday, May 22, 2024 22:56
>> *To:* Biao Geng <biaoge...@gmail.com>
>> *Cc:* user <user@flink.apache.org>
>> *Subject:* Re: Would Java 11 cause Getting OutOfMemoryError: Direct
>> buffer memory?
>>
>> Hi, apologies I hit reply instead of reply all. So not sure who saw this
>> or didn't. We have not switched to SSL and also our assumption here
>> would be that if we did switch to SSL the jobs would not work or produce
>> invalid results. The jobs work absolutely fine for a week or so and then
>> they fail.
>>
>> Here is the consumer config from the task logs, which says PLAINTEXT and
>> port 9092 is used. Also I attached a screen of the task manager memory
>> usage. As well I read up on MaxDirectMemory setting of Java 8 vs Java 11.
>> Java 8 by default calculates the direct memory size to 87% of the max heap
>> size. While Java 11 set it to 100% of the max heap size.
>>
>> [image: Screen Shot 2024-05-22 at 9.50.38 AM.png]
>>
>>  allow.auto.create.topics = true
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [xxxxxx-kafka-0001:9092, xxxxxx-0002:9092,
>> xxxxxx-kafka-0003:9092]
>> check.crcs = true
>> client.dns.lookup = default
>> client.id = xxxxxx
>> client.rack =
>> connections.max.idle.ms = 540000
>> default.api.timeout.ms = 60000
>> enable.auto.commit = false
>> exclude.internal.topics = true
>> fetch.max.bytes = 52428800
>> fetch.max.wait.ms = 500
>> fetch.min.bytes = 1
>> group.id = xxxxxx
>> group.instance.id = null
>> heartbeat.interval.ms = 3000
>> interceptor.classes = []
>> internal.leave.group.on.close = true
>> isolation.level = read_uncommitted
>> key.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>> max.partition.fetch.bytes = 1048576
>> max.poll.interval.ms = 300000
>> max.poll.records = 500
>> metadata.max.age.ms = 300000
>> metric.reporters = []
>> metrics.num.samples = 2
>> metrics.recording.level = INFO
>> metrics.sample.window.ms = 30000
>> partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>> receive.buffer.bytes = 65536
>> reconnect.backoff.max.ms = 1000
>> reconnect.backoff.ms = 50
>> request.timeout.ms = 60000
>> retry.backoff.ms = 100
>> sasl.client.callback.handler.class = null
>> sasl.jaas.config = null
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> sasl.kerberos.min.time.before.relogin = 60000
>> sasl.kerberos.service.name = null
>> sasl.kerberos.ticket.renew.jitter = 0.05
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> sasl.login.callback.handler.class = null
>> sasl.login.class = null
>> sasl.login.refresh.buffer.seconds = 300
>> sasl.login.refresh.min.period.seconds = 60
>> sasl.login.refresh.window.factor = 0.8
>> sasl.login.refresh.window.jitter = 0.05
>> sasl.mechanism = GSSAPI
>> security.protocol = PLAINTEXT
>> security.providers = null
>> send.buffer.bytes = 131072
>> session.timeout.ms = 10000
>> ssl.cipher.suites = null
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> ssl.endpoint.identification.algorithm = https
>> ssl.key.password = null
>> ssl.keymanager.algorithm = SunX509
>> ssl.keystore.location = null
>> ssl.keystore.password = null
>> ssl.keystore.type = JKS
>> ssl.protocol = TLS
>> ssl.provider = null
>> ssl.secure.random.implementation = null
>> ssl.trustmanager.algorithm = PKIX
>> ssl.truststore.location = null
>> ssl.truststore.password = null
>> ssl.truststore.type = JKS
>> value.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer
>>
>> On Thu, May 16, 2024 at 3:20 AM Biao Geng <biaoge...@gmail.com> wrote:
>>
>> Hi John,
>>
>> Just want to check, have you ever changed the kafka protocol in your job
>> after using the new cluster? The error message shows that it is caused by
>> the kafka client and there is a similar error in this issue
>> <https://ververica.zendesk.com/hc/en-us/articles/4413642980498-Direct-buffer-OutOfMemoryError-when-using-Kafka-Connector-in-Flink>
>> .
>>
>> Best,
>> Biao Geng
>>
>>
>> John Smith <java.dev....@gmail.com> 于2024年5月16日周四 09:01写道:
>>
>> I deployed a new cluster, same version as my old cluster(1.14.4 ), only
>> difference using Java 11 and it seems after a week of usage the below
>> exception happens.
>>
>> The task manager is...
>>
>> 32GB total
>>
>> And i have the ONLY following memory settings
>>
>> taskmanager.memory.flink.size: 16384m
>> taskmanager.memory.jvm-metaspace.size: 3072m
>>
>>
>>
>>
>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>> out-of-memory error has occurred. This can mean two things: either job(s)
>> require(s) a larger size of JVM direct memory or there is a direct memory
>> leak. The direct memory can be allocated by user code or some of its
>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>> configuration option should be increased. Flink framework and its
>> dependencies also consume the direct memory, mostly for network
>> communication. The most of network memory is managed by Flink and should
>> not result in out-of-memory error. In certain special cases, in particular
>> for jobs with high parallelism, the framework may require more direct
>> memory which is not managed by Flink. In this case
>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>> increased. If the error persists then there is probably a direct memory
>> leak in user code or some of its dependencies which has to be investigated
>> and fixed. The task executor has to be shutdown...
>> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
>> at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
>> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
>> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
>> at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>> at
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>> at
>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>> at
>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>> at
>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>> at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ... 1 more
>>
>>

Reply via email to