poorbarcode opened a new pull request, #24424:
URL: https://github.com/apache/pulsar/pull/24424
### Motivation
- There is a configuration named `replicationProducerQueueSize`, limiting
the rate of replication.
- `PersistentReplicator` has implemented the rate limitation.
- Issue: `NonPersistentReplicator` has not implemented it, but the
configuration affects `NonPersistentReplicator`, leading to the below error if
users set a small value of `replicationProducerQueueSize`
```
2025-06-13T11:53:28,121+0000 [pulsar-io-6-1] ERROR
org.apache.pulsar.broker.service.persistent.PersistentReplicator -
[non-persistent://xxx/xxx/xxx-partition-0 | c1-->c2] Error producing on remote
broker
org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError:
Producer send queue is full
at
org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055)
~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534)
~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator.sendMessage(NonPersistentReplicator.java:124)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$3(NonPersistentTopic.java:227)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:583)
~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:297)
~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:222)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:295)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:203)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1942)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222)
~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202)
~[io.netty-netty-handler-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164)
~[io.netty-netty-handler-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
~[io.netty-netty-codec-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
~[io.netty-netty-codec-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
~[io.netty-netty-handler-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
~[io.netty-netty-transport-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
~[io.netty-netty-transport-classes-epoll-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
~[io.netty-netty-transport-classes-epoll-4.1.119.Final.jar:4.1.119.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
~[io.netty-netty-transport-classes-epoll-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
at java.base/java.lang.Thread.run(Unknown Source) [?:?]
2025-06-13T11:53:28,458+0000 [pulsar-io-6-1] ERROR
org.apache.pulsar.broker.service.persistent.PersistentReplicator -
[non-persistent://cvc/cvc-publisher/health-readiness-partition-0 |
bdrck-cvc-pulsar-live-us-east4-->bdrck-cvc-pulsar-live-us-central1] Error
producing on remote broker
org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError:
Producer send queue is full
at
org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055)
~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534)
~[io.streamnative-pulsar-client-original-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator.sendMessage(NonPersistentReplicator.java:124)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$3(NonPersistentTopic.java:227)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:583)
~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:297)
~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:222)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:295)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:203)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1942)
~[io.streamnative-pulsar-broker-3.3.5.5.jar:3.3.5.5]
at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222)
~[io.streamnative-pulsar-common-3.3.5.5.jar:3.3.5.5]
```
### Modifications
- Make `replicationProducerQueueSize` does not affect
`NonPersistentReplicator`.
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
### Matching PR in forked repository
PR in forked repository: x
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]