[ 
https://issues.apache.org/jira/browse/FLINK-24302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744604#comment-17744604
 ] 

Zili Chen commented on FLINK-24302:
-----------------------------------

Pending to close. This should be an issue on the Pulsar side. Little thing we 
can do only from the connector side.

> Direct buffer memory leak on Pulsar connector with Java 11
> ----------------------------------------------------------
>
>                 Key: FLINK-24302
>                 URL: https://issues.apache.org/jira/browse/FLINK-24302
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Pulsar
>    Affects Versions: 1.14.0
>            Reporter: Yufan Sheng
>            Priority: Major
>              Labels: pull-request-available, test-stability
>             Fix For: pulsar-4.0.1
>
>
> Running the Pulsar connector with multiple split readers on Java 11 could 
> throw {{a java.lang.OutOfMemoryError exception}}.
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>       at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
>       at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
>       at 
> java.base/java.util.concurrent.CompletableFuture$OrApply.tryFire(CompletableFuture.java:1503)
>       ... 42 more
> Caused by: 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException:
>  Could not complete the operation. Number of retries has been exhausted. 
> Failed reason: java.lang.OutOfMemoryError: Direct buffer memory
>       at 
> org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.lambda$retryOperation$4(AsyncHttpConnector.java:249)
>       ... 39 more
> Caused by: org.apache.pulsar.shade.io.netty.handler.codec.EncoderException: 
> java.lang.OutOfMemoryError: Direct buffer memory
>       at 
> org.apache.pulsar.shade.io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:104)
>       at 
> org.apache.pulsar.shade.io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
>       at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>       at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
>       at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
>       at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
>       at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.doFlush(ChunkedWriteHandler.java:303)
>       at 
> org.apache.pulsar.shade.io.netty.handler.stream.ChunkedWriteHandler.flush(ChunkedWriteHandler.java:132)
>       at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>       at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>       at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>       at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>       at 
> org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020)
>       at 
> org.apache.pulsar.shade.io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:311)
>       at 
> org.apache.pulsar.shade.org.asynchttpclient.netty.request.NettyRequestSender.writeRequest(NettyRequestSender.java:420)
>       ... 23 more
> {code}
> The reason is that under Java 11, the Netty will allocate memory from the 
> pool of Java Direct Memory and is affected by the MaxDirectMemory limit. 
> Under Java 8, it allocates native memory and is not affected by that setting.
> We have to reduce the direct memory usage by using a newer Pulsar client 
> which has a memory-limits configuration.
> This issue is addressed on Pulsar, and 
> [PIP-74|https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits]
>  has been created for resolving this issue.
> We should keep this issue open with no resolved versions until Pulsar 
> provides a new client with memory limits.
> h2. Update: 2022/08/04
> The memory limit on consumer API has been released 
> https://github.com/apache/pulsar/pull/15216, we need to add 
> autoScaledReceiverQueueSizeEnabled option to enable this feature. This memory 
> limit will get released on Pulsar 2.11.0. We will get this fixed after that.
> h2. Update: 2023/02/13
> Pulsar has a client [memory allocator 
> configuration|https://pulsar.apache.org/docs/2.11.x/client-libraries-java/#client-memory-allocator-configuration]
>  which can drop the use of direct buffer memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to