This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a89c6c260f203ea33b2763bb9387524c5aea3c85 Author: Philipp Dolif <[email protected]> AuthorDate: Tue Sep 16 05:30:58 2025 +0200 [fix][client] Fix receiver queue auto-scale without memory limit (#24743) (cherry picked from commit 8e35e34350866e150220872e1ed6a5c7009a43e2) --- .../impl/AutoScaledReceiverQueueSizeTest.java | 34 ++++++++++++++++++++++ .../pulsar/client/impl/MemoryLimitController.java | 3 ++ .../client/impl/MemoryLimitControllerTest.java | 7 +++++ 3 files changed, 44 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java index 114231b98e5..efd9e32b877 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java @@ -33,7 +33,9 @@ import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; @@ -319,4 +321,36 @@ public class AutoScaledReceiverQueueSizeTest extends MockedPulsarServiceBaseTest Assert.assertEquals(controller.currentUsage(), 0); Assert.assertEquals(controller.currentUsagePercent(), 0); } + + @Test + public void testWithoutMemoryLimit() throws PulsarClientException { + @Cleanup + PulsarClient clientWithoutMemoryLimit = PulsarClient.builder() + .memoryLimit(0, SizeUnit.BYTES) + .serviceUrl(pulsar.getBrokerServiceUrl()) + .build(); + + String topic = "persistent://public/default/testWithoutMemoryLimit"; + @Cleanup + ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) clientWithoutMemoryLimit.newConsumer() + .topic(topic) + .subscriptionName("my-sub") + .receiverQueueSize(3) + .autoScaledReceiverQueueSizeEnabled(true) + .subscribe(); + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 1); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + byte[] data = "data".getBytes(StandardCharsets.UTF_8); + + producer.send(data); + Awaitility.await().until(consumer.scaleReceiverQueueHint::get); + Assert.assertNotNull(consumer.receive()); + + // this will trigger a receiver queue size expansion + Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS)); + + Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 2); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java index d7acfd69128..4f8a6fdb06a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java @@ -139,6 +139,9 @@ public class MemoryLimitController { } public double currentUsagePercent() { + if (!isMemoryLimited()) { + return 0.0; + } return 1.0 * currentUsage.get() / memoryLimit; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java index 1aaf3f77da4..027479511dd 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java @@ -49,6 +49,7 @@ public class MemoryLimitControllerTest { @Test public void testLimit() throws Exception { MemoryLimitController mlc = new MemoryLimitController(100); + assertTrue(mlc.isMemoryLimited()); for (int i = 0; i < 101; i++) { mlc.reserveMemory(1); @@ -75,6 +76,12 @@ public class MemoryLimitControllerTest { mlc.releaseMemory(50); assertTrue(mlc.tryReserveMemory(1)); assertEquals(mlc.currentUsagePercent(), 1.01); + + MemoryLimitController mlcNoLimit = new MemoryLimitController(0); + assertFalse(mlcNoLimit.isMemoryLimited()); + assertEquals(mlcNoLimit.currentUsagePercent(), 0.0); + assertTrue(mlcNoLimit.tryReserveMemory(1)); + assertEquals(mlcNoLimit.currentUsagePercent(), 0.0); } @Test
