This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c1e9bb22c4c [fix][client] Fix receiver queue auto-scale without memory
limit (#24743)
c1e9bb22c4c is described below
commit c1e9bb22c4c251045da67147f97b2799d7c1a94e
Author: Philipp Dolif <[email protected]>
AuthorDate: Tue Sep 16 05:30:58 2025 +0200
[fix][client] Fix receiver queue auto-scale without memory limit (#24743)
(cherry picked from commit 8e35e34350866e150220872e1ed6a5c7009a43e2)
---
.../impl/AutoScaledReceiverQueueSizeTest.java | 34 ++++++++++++++++++++++
.../pulsar/client/impl/MemoryLimitController.java | 3 ++
.../client/impl/MemoryLimitControllerTest.java | 7 +++++
3 files changed, 44 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
index 2fefc7f13f9..ac3d5488995 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/AutoScaledReceiverQueueSizeTest.java
@@ -33,7 +33,9 @@ import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
@@ -319,4 +321,36 @@ public class AutoScaledReceiverQueueSizeTest extends
MockedPulsarServiceBaseTest
Assert.assertEquals(controller.currentUsage(), 0);
Assert.assertEquals(controller.currentUsagePercent(), 0);
}
+
+ @Test
+ public void testWithoutMemoryLimit() throws PulsarClientException {
+ @Cleanup
+ PulsarClient clientWithoutMemoryLimit = PulsarClient.builder()
+ .memoryLimit(0, SizeUnit.BYTES)
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .build();
+
+ String topic = "persistent://public/default/testWithoutMemoryLimit";
+ @Cleanup
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
clientWithoutMemoryLimit.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .receiverQueueSize(3)
+ .autoScaledReceiverQueueSizeEnabled(true)
+ .subscribe();
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 1);
+
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ byte[] data = "data".getBytes(StandardCharsets.UTF_8);
+
+ producer.send(data);
+ Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
+ Assert.assertNotNull(consumer.receive());
+
+ // this will trigger a receiver queue size expansion
+ Assert.assertNull(consumer.receive(0, TimeUnit.MILLISECONDS));
+
+ Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), 2);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
index c15821c0543..19f56cf98cb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MemoryLimitController.java
@@ -139,6 +139,9 @@ public class MemoryLimitController {
}
public double currentUsagePercent() {
+ if (!isMemoryLimited()) {
+ return 0.0;
+ }
return 1.0 * currentUsage.get() / memoryLimit;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
index 1aaf3f77da4..027479511dd 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MemoryLimitControllerTest.java
@@ -49,6 +49,7 @@ public class MemoryLimitControllerTest {
@Test
public void testLimit() throws Exception {
MemoryLimitController mlc = new MemoryLimitController(100);
+ assertTrue(mlc.isMemoryLimited());
for (int i = 0; i < 101; i++) {
mlc.reserveMemory(1);
@@ -75,6 +76,12 @@ public class MemoryLimitControllerTest {
mlc.releaseMemory(50);
assertTrue(mlc.tryReserveMemory(1));
assertEquals(mlc.currentUsagePercent(), 1.01);
+
+ MemoryLimitController mlcNoLimit = new MemoryLimitController(0);
+ assertFalse(mlcNoLimit.isMemoryLimited());
+ assertEquals(mlcNoLimit.currentUsagePercent(), 0.0);
+ assertTrue(mlcNoLimit.tryReserveMemory(1));
+ assertEquals(mlcNoLimit.currentUsagePercent(), 0.0);
}
@Test