Online resources including my previous question to this problem said there
was some client bug connecting to SSL broker that caused memory issues. As
far as memory setup I have the following...

Here is the link and there's a link to a JIRA...
https://stackoverflow.com/questions/64697973/java-lang-outofmemoryerror-direct-buffer-memory-error-while-listening-kafka-top

taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

My task managers are 32GB each.


On Fri, Aug 23, 2024 at 11:21 AM Yaroslav Tkachenko <yaros...@goldsky.com>
wrote:

> Hi John,
>
> I've experienced this issue recently; it's likely caused either by:
>
> - the size of the producer record batch, it can be reduced by configuring
> lower linger.ms and batch.size values
> - the size of an individual record
>
>
> On Fri, Aug 23, 2024 at 7:20 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:
>
>> Why do you believe it is an SSL issue?
>> The error trace seems like a memory issue. you could refer to
>> taskmanager memory setup guide[1].
>>
>> 1-
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/
>>
>> Best Regards
>> Ahmed Hamdy
>>
>>
>> On Fri, 23 Aug 2024 at 13:47, John Smith <java.dev....@gmail.com> wrote:
>>
>>> I'm pretty sure it's not SSL is there a way to confirm, since the take
>>> does work. And/or is there other settings I can try?
>>>
>>> On Thu, Aug 22, 2024, 11:06 AM John Smith <java.dev....@gmail.com>
>>> wrote:
>>>
>>>> Hi getting this exception, a lot of resources online point to an SSL
>>>> misconfiguration.
>>>>
>>>> We are NOT using SSL. Neither on the broker or the consumer side. Our
>>>> jobs work absolutely fine as in the flink task is able to consume from
>>>> kafka parse the json and then push it to the JDBC database sink.
>>>>
>>>> I would assume if SSL was enabled on one side or the other that the
>>>> records would be completely mangled and unparsable from not being able to
>>>> encrypt/decrypt. Also this seems to happen about once a week.
>>>>
>>>> 2024-08-22 10:17:09
>>>> java.lang.RuntimeException: One or more fetchers have encountered
>>>> exception
>>>> at
>>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>>>> at
>>>> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>>>> at
>>>> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>>>> at
>>>> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>>>> at
>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>> at java.base/java.lang.Thread.run(Thread.java:829)
>>>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
>>>> out-of-memory error has occurred. This can mean two things: either job(s)
>>>> require(s) a larger size of JVM direct memory or there is a direct memory
>>>> leak. The direct memory can be allocated by user code or some of its
>>>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>>>> configuration option should be increased. Flink framework and its
>>>> dependencies also consume the direct memory, mostly for network
>>>> communication. The most of network memory is managed by Flink and should
>>>> not result in out-of-memory error. In certain special cases, in particular
>>>> for jobs with high parallelism, the framework may require more direct
>>>> memory which is not managed by Flink. In this case
>>>> 'taskmanager.memory.framework.off-heap.size' configuration option should be
>>>> increased. If the error persists then there is probably a direct memory
>>>> leak in user code or some of its dependencies which has to be investigated
>>>> and fixed. The task executor has to be shutdown...
>>>> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
>>>> at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
>>>> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
>>>> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
>>>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
>>>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
>>>> at
>>>> java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
>>>> at
>>>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>>>> at
>>>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>>>> at
>>>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>>>> at
>>>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>>>> at
>>>> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>>>> at
>>>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>>>> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>>>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>>>> at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>>>> at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>>>> at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
>>>> at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
>>>> at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>>>> at
>>>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>>>> at
>>>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>>>> at
>>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>>>> at
>>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>>>> at
>>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>> at
>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>> ... 1 more
>>>>
>>>>

Reply via email to