The same exact task/code and exact same version of flink had no issues
before.

The only thing that changed is deployed flink to java 11. Added more memory
to the config and increased the parallelism of the Kafka source.

On Fri, Aug 23, 2024, 3:46 PM John Smith <java.dev....@gmail.com> wrote:

> Online resources including my previous question to this problem said there
> was some client bug connecting to SSL broker that caused memory issues. As
> far as memory setup I have the following...
>
> Here is the link and there's a link to a JIRA...
> https://stackoverflow.com/questions/64697973/java-lang-outofmemoryerror-direct-buffer-memory-error-while-listening-kafka-top
>
> taskmanager.memory.flink.size: 16384m
> taskmanager.memory.jvm-metaspace.size: 3072m
>
> My task managers are 32GB each.
>
>
> On Fri, Aug 23, 2024 at 11:21 AM Yaroslav Tkachenko <yaros...@goldsky.com>
> wrote:
>
>> Hi John,
>>
>> I've experienced this issue recently; it's likely caused either by:
>>
>> - the size of the producer record batch, it can be reduced by configuring
>> lower linger.ms and batch.size values
>> - the size of an individual record
>>
>>
>> On Fri, Aug 23, 2024 at 7:20 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:
>>
>>> Why do you believe it is an SSL issue?
>>> The error trace seems like a memory issue. you could refer to
>>> taskmanager memory setup guide[1].
>>>
>>> 1-
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/
>>>
>>> Best Regards
>>> Ahmed Hamdy
>>>
>>>
>>> On Fri, 23 Aug 2024 at 13:47, John Smith <java.dev....@gmail.com> wrote:
>>>
>>>> I'm pretty sure it's not SSL is there a way to confirm, since the take
>>>> does work. And/or is there other settings I can try?
>>>>
>>>> On Thu, Aug 22, 2024, 11:06 AM John Smith <java.dev....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi getting this exception, a lot of resources online point to an SSL
>>>>> misconfiguration.
>>>>>
>>>>> We are NOT using SSL. Neither on the broker or the consumer side. Our
>>>>> jobs work absolutely fine as in the flink task is able to consume from
>>>>> kafka parse the json and then push it to the JDBC database sink.
>>>>>
>>>>> I would assume if SSL was enabled on one side or the other that the
>>>>> records would be completely mangled and unparsable from not being able to
>>>>> encrypt/decrypt. Also this seems to happen about once a week.
>>>>>
>>>>> 2024-08-22 10:17:09
>>>>> java.lang.RuntimeException: One or more fetchers have encountered
>>>>> exception
>>>>> at
>>>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
>>>>> at
>>>>> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>>>>> at
>>>>> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>>>>> at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>>>> at
>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>>>> at java.base/java.lang.Thread.run(Thread.java:829)
>>>>> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The
>>>>> direct out-of-memory error has occurred. This can mean two things: either
>>>>> job(s) require(s) a larger size of JVM direct memory or there is a direct
>>>>> memory leak. The direct memory can be allocated by user code or some of 
>>>>> its
>>>>> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
>>>>> configuration option should be increased. Flink framework and its
>>>>> dependencies also consume the direct memory, mostly for network
>>>>> communication. The most of network memory is managed by Flink and should
>>>>> not result in out-of-memory error. In certain special cases, in particular
>>>>> for jobs with high parallelism, the framework may require more direct
>>>>> memory which is not managed by Flink. In this case
>>>>> 'taskmanager.memory.framework.off-heap.size' configuration option should 
>>>>> be
>>>>> increased. If the error persists then there is probably a direct memory
>>>>> leak in user code or some of its dependencies which has to be investigated
>>>>> and fixed. The task executor has to be shutdown...
>>>>> at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
>>>>> at
>>>>> java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
>>>>> at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
>>>>> at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
>>>>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:242)
>>>>> at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
>>>>> at
>>>>> java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
>>>>> at
>>>>> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
>>>>> at
>>>>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
>>>>> at
>>>>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
>>>>> at
>>>>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
>>>>> at
>>>>> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
>>>>> at
>>>>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
>>>>> at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
>>>>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
>>>>> at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>>>>> at
>>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>>>>> at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
>>>>> at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
>>>>> at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>>>>> at
>>>>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97)
>>>>> at
>>>>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>>>>> at
>>>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>>>>> at
>>>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>>>>> at
>>>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>>>> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>>>> at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>>> at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>> ... 1 more
>>>>>
>>>>>

Reply via email to