poorbarcode opened a new pull request, #24424:
URL: https://github.com/apache/pulsar/pull/24424

   ### Motivation
   
   - There is a configuration named `replicationProducerQueueSize`, limiting 
the rate of replication.
   - `PersistentReplicator` has implemented the rate limitation.
   - Issue: `NonPersistentReplicator` has not implemented it, but the 
configuration affects `NonPersistentReplicator`, leading to the below error if 
users set a small value of `replicationProducerQueueSize`
   
   ```
   2025-06-13T11:53:28,121+0000 [pulsar-io-6-1] ERROR 
org.apache.pulsar.broker.service.persistent.PersistentReplicator - 
[non-persistent://xxx/xxx/xxx-partition-0 | c1-->c2] Error producing on remote 
broker
   org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: 
Producer send queue is full
        at 
org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055)
 ~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) 
~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator.sendMessage(NonPersistentReplicator.java:124)
 ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$3(NonPersistentTopic.java:227)
 ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:583)
 ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:297)
 ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:222)
 ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:295)
 ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:203) 
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1942) 
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222)
 ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202) 
~[io.netty-netty-handler-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164)
 ~[io.netty-netty-handler-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
 ~[io.netty-netty-codec-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
 ~[io.netty-netty-codec-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
 ~[io.netty-netty-handler-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
 ~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
 ~[io.netty-netty-transport-classes-epoll-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501) 
~[io.netty-netty-transport-classes-epoll-4.1.119.Final.jar:4.1.119.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399) 
~[io.netty-netty-transport-classes-epoll-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
 ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
        at java.base/java.lang.Thread.run(Unknown Source) [?:?]
   2025-06-13T11:53:28,458+0000 [pulsar-io-6-1] ERROR 
org.apache.pulsar.broker.service.persistent.PersistentReplicator - 
[non-persistent://cvc/cvc-publisher/health-readiness-partition-0 | 
bdrck-cvc-pulsar-live-us-east4-->bdrck-cvc-pulsar-live-us-central1] Error 
producing on remote broker
   org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: 
Producer send queue is full
        at 
org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055)
 ~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) 
~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator.sendMessage(NonPersistentReplicator.java:124)
 ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$3(NonPersistentTopic.java:227)
 ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:583)
 ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:297)
 ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:222)
 ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:295)
 ~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:203) 
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1942) 
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222)
 ~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
   ```
   
   ### Modifications
   - Make `replicationProducerQueueSize` does not affect 
`NonPersistentReplicator`.
   
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: x
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to