This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce5bc1364b1 [improve][test] Introduce shared Pulsar cluster for faster
integration tests (#25311)
ce5bc1364b1 is described below
commit ce5bc1364b108dfe69fadbaf9d838a44b17aa2ae
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 12 21:54:44 2026 -0700
[improve][test] Introduce shared Pulsar cluster for faster integration
tests (#25311)
---
.../pulsar/broker/service/NullValueTest.java | 65 +++----
.../broker/service/SharedPulsarBaseTest.java | 166 +++++++++++++++++
.../pulsar/broker/service/SharedPulsarCluster.java | 205 +++++++++++++++++++++
3 files changed, 396 insertions(+), 40 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
index f79ec3e14a5..059ca86a494 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
@@ -34,8 +34,6 @@ import
org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -44,41 +42,30 @@ import org.testng.annotations.Test;
*/
@Slf4j
@Test(groups = "broker")
-public class NullValueTest extends BrokerTestBase {
+public class NullValueTest extends SharedPulsarBaseTest {
- @BeforeMethod
- @Override
- protected void setup() throws Exception {
- super.baseSetup();
- }
-
- @AfterMethod(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- }
-
- @DataProvider(name = "topics")
- public static Object[][] topics() {
+ @DataProvider(name = "partitions")
+ public static Object[][] partitions() {
return new Object[][]{
- {"persistent://prop/ns-abc/null-value-test-0", 1},
- {"persistent://prop/ns-abc/null-value-test-1", 3},
+ {1},
+ {3},
};
}
- @Test(dataProvider = "topics")
- public void nullValueBytesSchemaTest(String topic, int partitions)
+ @Test(dataProvider = "partitions")
+ public void nullValueBytesSchemaTest(int partitions)
throws PulsarClientException, PulsarAdminException {
+ String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, partitions);
@Cleanup
- Producer producer = pulsarClient.newProducer()
+ Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
@Cleanup
- Consumer consumer = pulsarClient.newConsumer()
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();
@@ -93,7 +80,7 @@ public class NullValueTest extends BrokerTestBase {
}
for (int i = 0; i < numMessage; i++) {
- Message message = consumer.receive();
+ Message<byte[]> message = consumer.receive();
if (i % 2 == 0) {
Assert.assertNotNull(message.getData());
Assert.assertNotNull(message.getValue());
@@ -114,7 +101,7 @@ public class NullValueTest extends BrokerTestBase {
}
for (int i = 0; i < numMessage; i++) {
- CompletableFuture<Message> completableFuture =
consumer.receiveAsync();
+ CompletableFuture<Message<byte[]>> completableFuture =
consumer.receiveAsync();
final int index = i;
completableFuture.whenComplete((message, throwable) -> {
Assert.assertNull(throwable);
@@ -133,12 +120,12 @@ public class NullValueTest extends BrokerTestBase {
}
});
}
-
}
- @Test(dataProvider = "topics")
- public void nullValueBooleanSchemaTest(String topic, int partitions)
+ @Test(dataProvider = "partitions")
+ public void nullValueBooleanSchemaTest(int partitions)
throws PulsarClientException, PulsarAdminException {
+ String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, partitions);
@Cleanup
@@ -163,12 +150,12 @@ public class NullValueTest extends BrokerTestBase {
Assert.assertNull(message.getValue());
Assert.assertNull(message.getData());
}
-
}
- @Test(dataProvider = "topics")
- public void keyValueNullInlineTest(String topic, int partitions)
+ @Test(dataProvider = "partitions")
+ public void keyValueNullInlineTest(int partitions)
throws PulsarClientException, PulsarAdminException {
+ String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, partitions);
@Cleanup
@@ -198,11 +185,11 @@ public class NullValueTest extends BrokerTestBase {
message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
- Assert.assertEquals("test", keyValue.getValue());
+ Assert.assertEquals(keyValue.getValue(), "test");
message = consumer.receive();
keyValue = message.getValue();
- Assert.assertEquals("test", keyValue.getKey());
+ Assert.assertEquals(keyValue.getKey(), "test");
Assert.assertNull(keyValue.getValue());
message = consumer.receive();
@@ -210,12 +197,12 @@ public class NullValueTest extends BrokerTestBase {
Assert.assertNull(keyValue.getKey());
Assert.assertNull(keyValue.getValue());
}
-
}
- @Test(dataProvider = "topics")
- public void keyValueNullSeparatedTest(String topic, int partitions)
+ @Test(dataProvider = "partitions")
+ public void keyValueNullSeparatedTest(int partitions)
throws PulsarClientException, PulsarAdminException {
+ String topic = newTopicName();
admin.topics().createPartitionedTopic(topic, partitions);
@Cleanup
@@ -252,11 +239,11 @@ public class NullValueTest extends BrokerTestBase {
message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
- Assert.assertEquals("test", keyValue.getValue());
+ Assert.assertEquals(keyValue.getValue(), "test");
message = consumer.receive();
keyValue = message.getValue();
- Assert.assertEquals("test", keyValue.getKey());
+ Assert.assertEquals(keyValue.getKey(), "test");
Assert.assertNull(keyValue.getValue());
message = consumer.receive();
@@ -264,7 +251,5 @@ public class NullValueTest extends BrokerTestBase {
Assert.assertNull(keyValue.getKey());
Assert.assertNull(keyValue.getValue());
}
-
}
-
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
new file mode 100644
index 00000000000..dc04e513cca
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Base class for tests that share a single {@link SharedPulsarCluster}
singleton across all test classes.
+ *
+ * <p>Each test method gets its own unique namespace (created in {@link
#setupSharedTest()} and
+ * force-deleted in {@link #cleanupSharedTest()}), providing full isolation
without the overhead
+ * of starting a new broker per test.
+ *
+ * <p>Subclasses get access to {@link #admin} and {@link #pulsarClient}
(initialized once per class)
+ * and can use {@link #newTopicName()} to generate unique topic names within
the test namespace.
+ */
+@Slf4j
+public abstract class SharedPulsarBaseTest {
+
+ private PulsarService pulsar;
+ protected PulsarAdmin admin;
+ protected PulsarClient pulsarClient;
+
+ private String namespace;
+
+ /**
+ * Returns the unique namespace assigned to the current test method.
+ */
+ protected String getNamespace() {
+ return namespace;
+ }
+
+ /**
+ * Returns the broker service URL (pulsar://...) for creating dedicated
PulsarClient instances.
+ */
+ protected String getBrokerServiceUrl() {
+ return pulsar.getBrokerServiceUrl();
+ }
+
+ /**
+ * Returns the web service URL (http://...) for HTTP-based lookups and
admin operations.
+ */
+ protected String getWebServiceUrl() {
+ return pulsar.getWebServiceAddress();
+ }
+
+ /**
+ * Creates a new PulsarClient connected to the shared cluster. The caller
is responsible for closing it.
+ */
+ protected PulsarClient newPulsarClient() throws PulsarClientException {
+ return
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+ }
+
+ /**
+ * Returns the shared broker's {@link ServiceConfiguration} for runtime
config inspection or changes.
+ */
+ protected ServiceConfiguration getConfig() {
+ return pulsar.getConfig();
+ }
+
+ /**
+ * Returns the shared broker's {@link SchemaStorage} instance.
+ */
+ protected SchemaStorage getSchemaStorage() {
+ return pulsar.getSchemaStorage();
+ }
+
+ /**
+ * Returns the topic reference for an already-loaded topic, if present.
+ */
+ protected Optional<Topic> getTopicReference(String topic) {
+ return pulsar.getBrokerService().getTopicReference(topic);
+ }
+
+ /**
+ * Looks up a topic by name, optionally creating it if it doesn't exist.
+ */
+ protected CompletableFuture<Optional<Topic>> getTopic(String topic,
boolean createIfMissing) {
+ return pulsar.getBrokerService().getTopic(topic, createIfMissing);
+ }
+
+ /**
+ * Returns the topic if it exists in the broker, loading it from storage
if necessary.
+ */
+ protected CompletableFuture<Optional<Topic>> getTopicIfExists(String
topic) {
+ return pulsar.getBrokerService().getTopicIfExists(topic);
+ }
+
+ /**
+ * Initializes the shared cluster singleton and sets up {@link #admin} and
{@link #pulsarClient}.
+ * Called once per test class.
+ */
+ @BeforeClass(alwaysRun = true)
+ public void setupSharedCluster() throws Exception {
+ SharedPulsarCluster cluster = SharedPulsarCluster.get();
+ pulsar = cluster.getPulsarService();
+ admin = cluster.getAdmin();
+ pulsarClient = cluster.getClient();
+ }
+
+ /**
+ * Creates a unique namespace for the current test method. The namespace
is automatically
+ * cleaned up in {@link #cleanupSharedTest()}.
+ */
+ @BeforeMethod(alwaysRun = true)
+ public void setupSharedTest() throws Exception {
+ String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
+ String ns = SharedPulsarCluster.TENANT_NAME + "/" + nsName;
+ namespace = ns;
+ admin.namespaces().createNamespace(ns,
Set.of(SharedPulsarCluster.CLUSTER_NAME));
+ log.info("Created test namespace: {}", ns);
+ }
+
+ /**
+ * Force-deletes the namespace created by {@link #setupSharedTest()},
including all topics in it.
+ */
+ @AfterMethod(alwaysRun = true)
+ public void cleanupSharedTest() throws Exception {
+ String ns = namespace;
+ if (ns != null) {
+ namespace = null;
+ try {
+ admin.namespaces().deleteNamespace(ns, true);
+ log.info("Deleted test namespace: {}", ns);
+ } catch (Exception e) {
+ log.warn("Failed to delete namespace {}: {}", ns,
e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Generates a unique persistent topic name within the current test
namespace.
+ */
+ protected String newTopicName() {
+ return "persistent://" + namespace + "/topic-" +
UUID.randomUUID().toString().substring(0, 8);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
new file mode 100644
index 00000000000..f12ba8b98b1
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Optional;
+import java.util.Set;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.bookkeeper.BKCluster;
+
+/**
+ * JVM-wide singleton that manages a lightweight Pulsar cluster for
integration tests.
+ *
+ * <p>The cluster consists of a single bookie (with minimal memory/thread
configuration and no journal
+ * sync) and a single Pulsar broker, both backed by in-memory metadata stores.
A default cluster
+ * ({@value CLUSTER_NAME}) and tenant ({@value TENANT_NAME}) are created at
startup.
+ *
+ * <p>The singleton is lazily initialized on first call to {@link #get()} and
shut down via a JVM
+ * shutdown hook, which also deletes all temporary bookie data directories.
+ *
+ * @see SharedPulsarBaseTest
+ */
+@Slf4j
+public class SharedPulsarCluster {
+
+ private static final String METADATA_STORE_URL =
"memory:shared-test-cluster";
+ public static final String CLUSTER_NAME = "test-cluster";
+ public static final String TENANT_NAME = "test-tenant";
+
+ private static volatile SharedPulsarCluster instance;
+
+ private BKCluster bkCluster;
+ @Getter
+ private PulsarService pulsarService;
+ @Getter
+ private PulsarAdmin admin;
+ @Getter
+ private PulsarClient client;
+
+ /**
+ * Returns the singleton instance, starting the cluster on first
invocation.
+ * Thread-safe via double-checked locking.
+ */
+ public static SharedPulsarCluster get() throws Exception {
+ if (instance == null) {
+ synchronized (SharedPulsarCluster.class) {
+ if (instance == null) {
+ SharedPulsarCluster cluster = new SharedPulsarCluster();
+ cluster.start();
+ instance = cluster;
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ instance.close();
+ } catch (Exception e) {
+ log.error("Failed to close SharedPulsarCluster",
e);
+ }
+ }));
+ }
+ }
+ }
+ return instance;
+ }
+
+ private void start() throws Exception {
+ log.info("Starting SharedPulsarCluster");
+
+ // Start a single bookie with minimal configuration
+ ServerConfiguration bkConf = new ServerConfiguration();
+ bkConf.setProperty("dbStorage_writeCacheMaxSizeMb", 32);
+ bkConf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 4);
+ bkConf.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 4);
+ bkConf.setProperty("dbStorage_rocksDB_blockCacheSize", 4 * 1024 *
1024);
+ bkConf.setJournalSyncData(false);
+ bkConf.setJournalWriteData(false);
+ bkConf.setProperty("journalMaxGroupWaitMSec", 0L);
+ bkConf.setProperty("journalPreAllocSizeMB", 1);
+ bkConf.setFlushInterval(60000);
+ bkConf.setGcWaitTime(60000);
+ bkConf.setAllowLoopback(true);
+ bkConf.setAdvertisedAddress("127.0.0.1");
+ bkConf.setAllowEphemeralPorts(true);
+ bkConf.setNumAddWorkerThreads(0);
+ bkConf.setNumReadWorkerThreads(0);
+ bkConf.setNumHighPriorityWorkerThreads(0);
+ bkConf.setNumJournalCallbackThreads(0);
+ bkConf.setServerNumIOThreads(1);
+ bkConf.setNumLongPollWorkerThreads(1);
+ bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+
bkConf.setLedgerStorageClass("org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage");
+
+ bkCluster = BKCluster.builder()
+ .baseServerConfiguration(bkConf)
+ .metadataServiceUri(METADATA_STORE_URL)
+ .numBookies(1)
+ .clearOldData(true)
+ .build();
+
+ // Configure and start the Pulsar broker
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setMetadataStoreUrl(METADATA_STORE_URL);
+ config.setConfigurationMetadataStoreUrl(METADATA_STORE_URL);
+ config.setClusterName(CLUSTER_NAME);
+ config.setAdvertisedAddress("localhost");
+ config.setBrokerServicePort(Optional.of(0));
+ config.setWebServicePort(Optional.of(0));
+ config.setManagedLedgerDefaultEnsembleSize(1);
+ config.setManagedLedgerDefaultWriteQuorum(1);
+ config.setManagedLedgerDefaultAckQuorum(1);
+ config.setDefaultNumberOfNamespaceBundles(1);
+ config.setBrokerShutdownTimeoutMs(0L);
+ config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+ config.setNumExecutorThreadPoolSize(5);
+ config.setManagedLedgerCacheSizeMB(8);
+ config.setActiveConsumerFailoverDelayTimeMillis(0);
+ config.setAllowAutoTopicCreationType(
+
org.apache.pulsar.common.policies.data.TopicType.NON_PARTITIONED);
+ config.setBookkeeperNumberOfChannelsPerBookie(1);
+ config.setBookkeeperClientExposeStatsToPrometheus(false);
+ config.setDispatcherRetryBackoffInitialTimeInMs(0);
+ config.setDispatcherRetryBackoffMaxTimeInMs(0);
+ config.setForceDeleteNamespaceAllowed(true);
+ config.setForceDeleteTenantAllowed(true);
+ config.setBrokerDeleteInactiveTopicsEnabled(false);
+
+ // Reduce thread pool sizes for faster startup (fewer threads to
create)
+ config.setNumIOThreads(2);
+ config.setNumOrderedExecutorThreads(1);
+ config.setNumHttpServerThreads(4);
+ config.setBookkeeperClientNumWorkerThreads(1);
+ config.setBookkeeperClientNumIoThreads(2);
+ config.setNumCacheExecutorThreadPoolSize(1);
+ config.setManagedLedgerNumSchedulerThreads(1);
+ config.setTopicOrderedExecutorThreadNum(2);
+
+ // Disable the load balancer — single-broker cluster doesn't need it
+ config.setLoadBalancerEnabled(false);
+
+ pulsarService = new PulsarService(config);
+ pulsarService.start();
+
+ // Create admin and client
+ admin = PulsarAdmin.builder()
+ .serviceHttpUrl(pulsarService.getWebServiceAddress())
+ .build();
+
+ client = PulsarClient.builder()
+ .serviceUrl(pulsarService.getBrokerServiceUrl())
+ .build();
+
+ // Set up default cluster and tenant
+ admin.clusters().createCluster(CLUSTER_NAME,
+ ClusterData.builder()
+ .serviceUrl(pulsarService.getWebServiceAddress())
+ .brokerServiceUrl(pulsarService.getBrokerServiceUrl())
+ .build());
+
+ admin.tenants().createTenant(TENANT_NAME,
+ TenantInfo.builder()
+ .allowedClusters(Set.of(CLUSTER_NAME))
+ .build());
+
+ log.info("SharedPulsarCluster started. broker={} web={}",
+ pulsarService.getBrokerServiceUrl(),
pulsarService.getWebServiceAddress());
+ }
+
+ private void close() throws Exception {
+ log.info("Closing SharedPulsarCluster");
+ if (client != null) {
+ client.close();
+ }
+ if (admin != null) {
+ admin.close();
+ }
+ if (pulsarService != null) {
+ pulsarService.close();
+ }
+ if (bkCluster != null) {
+ bkCluster.close();
+ }
+ }
+}