This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a89c6c260f203ea33b2763bb9387524c5aea3c85
Author: Philipp Dolif <[email protected]>
AuthorDate: Tue Sep 16 05:30:58 2025 +0200

    [fix][client] Fix receiver queue auto-scale without memory limit (#24743)
    
    (cherry picked from commit 8e35e34350866e150220872e1ed6a5c7009a43e2)
---
 .../impl/AutoScaledReceiverQueueSizeTest.java      | 34 ++++++++++++++++++++++
 .../pulsar/client/impl/MemoryLimitController.java  |  3 ++
 .../client/impl/MemoryLimitControllerTest.java     |  7 +++++
 3 files changed, 44 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
index 114231b98e5..efd9e32b877 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
@@ -33,7 +33,9 @@ import org.apache.pulsar.client.api.BatchReceivePolicy;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
@@ -319,4 +321,36 @@ public class AutoScaledReceiverQueueSizeTest extends 
MockedPulsarServiceBaseTest
         Assert.assertEquals(controller.currentUsage(), 0);
         Assert.assertEquals(controller.currentUsagePercent(), 0);
     }
+
+    @Test
+    public void testWithoutMemoryLimit() throws PulsarClientException {
+        @Cleanup
+        PulsarClient clientWithoutMemoryLimit = PulsarClient.builder()
+                .memoryLimit(0, SizeUnit.BYTES)
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .build();
+
+        String topic = "persistent://public/default/testWithoutMemoryLimit";
+        @Cleanup
+        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) 
clientWithoutMemoryLimit.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .receiverQueueSize(3)
+                .autoScaledReceiverQueueSizeEnabled(true)
+                .subscribe();
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 1);
+
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+        byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+        producer.send(data);
+        Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+        Assert.assertNotNull(consumer.receive());
+
+        // this will trigger a receiver queue size expansion
+        Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+
+        Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 2);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index d7acfd69128..4f8a6fdb06a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -139,6 +139,9 @@ public class MemoryLimitController {
     }
 
     public double currentUsagePercent() {
+        if (!isMemoryLimited()) {
+            return 0.0;
+        }
         return 1.0 * currentUsage.get() / memoryLimit;
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
index 1aaf3f77da4..027479511dd 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
@@ -49,6 +49,7 @@ public class MemoryLimitControllerTest {
     @Test
     public void testLimit() throws Exception {
         MemoryLimitController mlc = new MemoryLimitController(100);
+        assertTrue(mlc.isMemoryLimited());
 
         for (int i = 0; i < 101; i++) {
             mlc.reserveMemory(1);
@@ -75,6 +76,12 @@ public class MemoryLimitControllerTest {
         mlc.releaseMemory(50);
         assertTrue(mlc.tryReserveMemory(1));
         assertEquals(mlc.currentUsagePercent(), 1.01);
+
+        MemoryLimitController mlcNoLimit = new MemoryLimitController(0);
+        assertFalse(mlcNoLimit.isMemoryLimited());
+        assertEquals(mlcNoLimit.currentUsagePercent(), 0.0);
+        assertTrue(mlcNoLimit.tryReserveMemory(1));
+        assertEquals(mlcNoLimit.currentUsagePercent(), 0.0);
     }
 
     @Test

Reply via email to