This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce5bc1364b1 [improve][test] Introduce shared Pulsar cluster for faster 
integration tests (#25311)
ce5bc1364b1 is described below

commit ce5bc1364b108dfe69fadbaf9d838a44b17aa2ae
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 12 21:54:44 2026 -0700

    [improve][test] Introduce shared Pulsar cluster for faster integration 
tests (#25311)
---
 .../pulsar/broker/service/NullValueTest.java       |  65 +++----
 .../broker/service/SharedPulsarBaseTest.java       | 166 +++++++++++++++++
 .../pulsar/broker/service/SharedPulsarCluster.java | 205 +++++++++++++++++++++
 3 files changed, 396 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
index f79ec3e14a5..059ca86a494 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
@@ -34,8 +34,6 @@ import 
org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -44,41 +42,30 @@ import org.testng.annotations.Test;
  */
 @Slf4j
 @Test(groups = "broker")
-public class NullValueTest extends BrokerTestBase {
+public class NullValueTest extends SharedPulsarBaseTest {
 
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.baseSetup();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-    @DataProvider(name = "topics")
-    public static Object[][] topics() {
+    @DataProvider(name = "partitions")
+    public static Object[][] partitions() {
         return new Object[][]{
-                {"persistent://prop/ns-abc/null-value-test-0", 1},
-                {"persistent://prop/ns-abc/null-value-test-1", 3},
+                {1},
+                {3},
         };
     }
 
-    @Test(dataProvider = "topics")
-    public void nullValueBytesSchemaTest(String topic, int partitions)
+    @Test(dataProvider = "partitions")
+    public void nullValueBytesSchemaTest(int partitions)
             throws PulsarClientException, PulsarAdminException {
+        String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
-        Producer producer = pulsarClient.newProducer()
+        Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
 
         @Cleanup
-        Consumer consumer = pulsarClient.newConsumer()
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .topic(topic)
                 .subscriptionName("test")
                 .subscribe();
@@ -93,7 +80,7 @@ public class NullValueTest extends BrokerTestBase {
         }
 
         for (int i = 0; i < numMessage; i++) {
-            Message message = consumer.receive();
+            Message<byte[]> message = consumer.receive();
             if (i % 2 == 0) {
                 Assert.assertNotNull(message.getData());
                 Assert.assertNotNull(message.getValue());
@@ -114,7 +101,7 @@ public class NullValueTest extends BrokerTestBase {
         }
 
         for (int i = 0; i < numMessage; i++) {
-            CompletableFuture<Message> completableFuture = 
consumer.receiveAsync();
+            CompletableFuture<Message<byte[]>> completableFuture = 
consumer.receiveAsync();
             final int index = i;
             completableFuture.whenComplete((message, throwable) -> {
                 Assert.assertNull(throwable);
@@ -133,12 +120,12 @@ public class NullValueTest extends BrokerTestBase {
                 }
             });
         }
-
     }
 
-    @Test(dataProvider = "topics")
-    public void nullValueBooleanSchemaTest(String topic, int partitions)
+    @Test(dataProvider = "partitions")
+    public void nullValueBooleanSchemaTest(int partitions)
             throws PulsarClientException, PulsarAdminException {
+        String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
@@ -163,12 +150,12 @@ public class NullValueTest extends BrokerTestBase {
             Assert.assertNull(message.getValue());
             Assert.assertNull(message.getData());
         }
-
     }
 
-    @Test(dataProvider = "topics")
-    public void keyValueNullInlineTest(String topic, int partitions)
+    @Test(dataProvider = "partitions")
+    public void keyValueNullInlineTest(int partitions)
             throws PulsarClientException, PulsarAdminException {
+        String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
@@ -198,11 +185,11 @@ public class NullValueTest extends BrokerTestBase {
             message = consumer.receive();
             keyValue = message.getValue();
             Assert.assertNull(keyValue.getKey());
-            Assert.assertEquals("test", keyValue.getValue());
+            Assert.assertEquals(keyValue.getValue(), "test");
 
             message = consumer.receive();
             keyValue = message.getValue();
-            Assert.assertEquals("test", keyValue.getKey());
+            Assert.assertEquals(keyValue.getKey(), "test");
             Assert.assertNull(keyValue.getValue());
 
             message = consumer.receive();
@@ -210,12 +197,12 @@ public class NullValueTest extends BrokerTestBase {
             Assert.assertNull(keyValue.getKey());
             Assert.assertNull(keyValue.getValue());
         }
-
     }
 
-    @Test(dataProvider = "topics")
-    public void keyValueNullSeparatedTest(String topic, int partitions)
+    @Test(dataProvider = "partitions")
+    public void keyValueNullSeparatedTest(int partitions)
             throws PulsarClientException, PulsarAdminException {
+        String topic = newTopicName();
         admin.topics().createPartitionedTopic(topic, partitions);
 
         @Cleanup
@@ -252,11 +239,11 @@ public class NullValueTest extends BrokerTestBase {
             message = consumer.receive();
             keyValue = message.getValue();
             Assert.assertNull(keyValue.getKey());
-            Assert.assertEquals("test", keyValue.getValue());
+            Assert.assertEquals(keyValue.getValue(), "test");
 
             message = consumer.receive();
             keyValue = message.getValue();
-            Assert.assertEquals("test", keyValue.getKey());
+            Assert.assertEquals(keyValue.getKey(), "test");
             Assert.assertNull(keyValue.getValue());
 
             message = consumer.receive();
@@ -264,7 +251,5 @@ public class NullValueTest extends BrokerTestBase {
             Assert.assertNull(keyValue.getKey());
             Assert.assertNull(keyValue.getValue());
         }
-
     }
-
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
new file mode 100644
index 00000000000..dc04e513cca
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarBaseTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Base class for tests that share a single {@link SharedPulsarCluster} 
singleton across all test classes.
+ *
+ * <p>Each test method gets its own unique namespace (created in {@link 
#setupSharedTest()} and
+ * force-deleted in {@link #cleanupSharedTest()}), providing full isolation 
without the overhead
+ * of starting a new broker per test.
+ *
+ * <p>Subclasses get access to {@link #admin} and {@link #pulsarClient} 
(initialized once per class)
+ * and can use {@link #newTopicName()} to generate unique topic names within 
the test namespace.
+ */
+@Slf4j
+public abstract class SharedPulsarBaseTest {
+
+    private PulsarService pulsar;
+    protected PulsarAdmin admin;
+    protected PulsarClient pulsarClient;
+
+    private String namespace;
+
+    /**
+     * Returns the unique namespace assigned to the current test method.
+     */
+    protected String getNamespace() {
+        return namespace;
+    }
+
+    /**
+     * Returns the broker service URL (pulsar://...) for creating dedicated 
PulsarClient instances.
+     */
+    protected String getBrokerServiceUrl() {
+        return pulsar.getBrokerServiceUrl();
+    }
+
+    /**
+     * Returns the web service URL (http://...) for HTTP-based lookups and 
admin operations.
+     */
+    protected String getWebServiceUrl() {
+        return pulsar.getWebServiceAddress();
+    }
+
+    /**
+     * Creates a new PulsarClient connected to the shared cluster. The caller 
is responsible for closing it.
+     */
+    protected PulsarClient newPulsarClient() throws PulsarClientException {
+        return 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+    }
+
+    /**
+     * Returns the shared broker's {@link ServiceConfiguration} for runtime 
config inspection or changes.
+     */
+    protected ServiceConfiguration getConfig() {
+        return pulsar.getConfig();
+    }
+
+    /**
+     * Returns the shared broker's {@link SchemaStorage} instance.
+     */
+    protected SchemaStorage getSchemaStorage() {
+        return pulsar.getSchemaStorage();
+    }
+
+    /**
+     * Returns the topic reference for an already-loaded topic, if present.
+     */
+    protected Optional<Topic> getTopicReference(String topic) {
+        return pulsar.getBrokerService().getTopicReference(topic);
+    }
+
+    /**
+     * Looks up a topic by name, optionally creating it if it doesn't exist.
+     */
+    protected CompletableFuture<Optional<Topic>> getTopic(String topic, 
boolean createIfMissing) {
+        return pulsar.getBrokerService().getTopic(topic, createIfMissing);
+    }
+
+    /**
+     * Returns the topic if it exists in the broker, loading it from storage 
if necessary.
+     */
+    protected CompletableFuture<Optional<Topic>> getTopicIfExists(String 
topic) {
+        return pulsar.getBrokerService().getTopicIfExists(topic);
+    }
+
+    /**
+     * Initializes the shared cluster singleton and sets up {@link #admin} and 
{@link #pulsarClient}.
+     * Called once per test class.
+     */
+    @BeforeClass(alwaysRun = true)
+    public void setupSharedCluster() throws Exception {
+        SharedPulsarCluster cluster = SharedPulsarCluster.get();
+        pulsar = cluster.getPulsarService();
+        admin = cluster.getAdmin();
+        pulsarClient = cluster.getClient();
+    }
+
+    /**
+     * Creates a unique namespace for the current test method. The namespace 
is automatically
+     * cleaned up in {@link #cleanupSharedTest()}.
+     */
+    @BeforeMethod(alwaysRun = true)
+    public void setupSharedTest() throws Exception {
+        String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
+        String ns = SharedPulsarCluster.TENANT_NAME + "/" + nsName;
+        namespace = ns;
+        admin.namespaces().createNamespace(ns, 
Set.of(SharedPulsarCluster.CLUSTER_NAME));
+        log.info("Created test namespace: {}", ns);
+    }
+
+    /**
+     * Force-deletes the namespace created by {@link #setupSharedTest()}, 
including all topics in it.
+     */
+    @AfterMethod(alwaysRun = true)
+    public void cleanupSharedTest() throws Exception {
+        String ns = namespace;
+        if (ns != null) {
+            namespace = null;
+            try {
+                admin.namespaces().deleteNamespace(ns, true);
+                log.info("Deleted test namespace: {}", ns);
+            } catch (Exception e) {
+                log.warn("Failed to delete namespace {}: {}", ns, 
e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Generates a unique persistent topic name within the current test 
namespace.
+     */
+    protected String newTopicName() {
+        return "persistent://" + namespace + "/topic-" + 
UUID.randomUUID().toString().substring(0, 8);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
new file mode 100644
index 00000000000..f12ba8b98b1
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Optional;
+import java.util.Set;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.allocator.PoolingPolicy;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.metadata.bookkeeper.BKCluster;
+
+/**
+ * JVM-wide singleton that manages a lightweight Pulsar cluster for 
integration tests.
+ *
+ * <p>The cluster consists of a single bookie (with minimal memory/thread 
configuration and no journal
+ * sync) and a single Pulsar broker, both backed by in-memory metadata stores. 
A default cluster
+ * ({@value CLUSTER_NAME}) and tenant ({@value TENANT_NAME}) are created at 
startup.
+ *
+ * <p>The singleton is lazily initialized on first call to {@link #get()} and 
shut down via a JVM
+ * shutdown hook, which also deletes all temporary bookie data directories.
+ *
+ * @see SharedPulsarBaseTest
+ */
+@Slf4j
+public class SharedPulsarCluster {
+
+    private static final String METADATA_STORE_URL = 
"memory:shared-test-cluster";
+    public static final String CLUSTER_NAME = "test-cluster";
+    public static final String TENANT_NAME = "test-tenant";
+
+    private static volatile SharedPulsarCluster instance;
+
+    private BKCluster bkCluster;
+    @Getter
+    private PulsarService pulsarService;
+    @Getter
+    private PulsarAdmin admin;
+    @Getter
+    private PulsarClient client;
+
+    /**
+     * Returns the singleton instance, starting the cluster on first 
invocation.
+     * Thread-safe via double-checked locking.
+     */
+    public static SharedPulsarCluster get() throws Exception {
+        if (instance == null) {
+            synchronized (SharedPulsarCluster.class) {
+                if (instance == null) {
+                    SharedPulsarCluster cluster = new SharedPulsarCluster();
+                    cluster.start();
+                    instance = cluster;
+                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                        try {
+                            instance.close();
+                        } catch (Exception e) {
+                            log.error("Failed to close SharedPulsarCluster", 
e);
+                        }
+                    }));
+                }
+            }
+        }
+        return instance;
+    }
+
+    private void start() throws Exception {
+        log.info("Starting SharedPulsarCluster");
+
+        // Start a single bookie with minimal configuration
+        ServerConfiguration bkConf = new ServerConfiguration();
+        bkConf.setProperty("dbStorage_writeCacheMaxSizeMb", 32);
+        bkConf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 4);
+        bkConf.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 4);
+        bkConf.setProperty("dbStorage_rocksDB_blockCacheSize", 4 * 1024 * 
1024);
+        bkConf.setJournalSyncData(false);
+        bkConf.setJournalWriteData(false);
+        bkConf.setProperty("journalMaxGroupWaitMSec", 0L);
+        bkConf.setProperty("journalPreAllocSizeMB", 1);
+        bkConf.setFlushInterval(60000);
+        bkConf.setGcWaitTime(60000);
+        bkConf.setAllowLoopback(true);
+        bkConf.setAdvertisedAddress("127.0.0.1");
+        bkConf.setAllowEphemeralPorts(true);
+        bkConf.setNumAddWorkerThreads(0);
+        bkConf.setNumReadWorkerThreads(0);
+        bkConf.setNumHighPriorityWorkerThreads(0);
+        bkConf.setNumJournalCallbackThreads(0);
+        bkConf.setServerNumIOThreads(1);
+        bkConf.setNumLongPollWorkerThreads(1);
+        bkConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
+        
bkConf.setLedgerStorageClass("org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage");
+
+        bkCluster = BKCluster.builder()
+                .baseServerConfiguration(bkConf)
+                .metadataServiceUri(METADATA_STORE_URL)
+                .numBookies(1)
+                .clearOldData(true)
+                .build();
+
+        // Configure and start the Pulsar broker
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setMetadataStoreUrl(METADATA_STORE_URL);
+        config.setConfigurationMetadataStoreUrl(METADATA_STORE_URL);
+        config.setClusterName(CLUSTER_NAME);
+        config.setAdvertisedAddress("localhost");
+        config.setBrokerServicePort(Optional.of(0));
+        config.setWebServicePort(Optional.of(0));
+        config.setManagedLedgerDefaultEnsembleSize(1);
+        config.setManagedLedgerDefaultWriteQuorum(1);
+        config.setManagedLedgerDefaultAckQuorum(1);
+        config.setDefaultNumberOfNamespaceBundles(1);
+        config.setBrokerShutdownTimeoutMs(0L);
+        config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        config.setNumExecutorThreadPoolSize(5);
+        config.setManagedLedgerCacheSizeMB(8);
+        config.setActiveConsumerFailoverDelayTimeMillis(0);
+        config.setAllowAutoTopicCreationType(
+                
org.apache.pulsar.common.policies.data.TopicType.NON_PARTITIONED);
+        config.setBookkeeperNumberOfChannelsPerBookie(1);
+        config.setBookkeeperClientExposeStatsToPrometheus(false);
+        config.setDispatcherRetryBackoffInitialTimeInMs(0);
+        config.setDispatcherRetryBackoffMaxTimeInMs(0);
+        config.setForceDeleteNamespaceAllowed(true);
+        config.setForceDeleteTenantAllowed(true);
+        config.setBrokerDeleteInactiveTopicsEnabled(false);
+
+        // Reduce thread pool sizes for faster startup (fewer threads to 
create)
+        config.setNumIOThreads(2);
+        config.setNumOrderedExecutorThreads(1);
+        config.setNumHttpServerThreads(4);
+        config.setBookkeeperClientNumWorkerThreads(1);
+        config.setBookkeeperClientNumIoThreads(2);
+        config.setNumCacheExecutorThreadPoolSize(1);
+        config.setManagedLedgerNumSchedulerThreads(1);
+        config.setTopicOrderedExecutorThreadNum(2);
+
+        // Disable the load balancer — single-broker cluster doesn't need it
+        config.setLoadBalancerEnabled(false);
+
+        pulsarService = new PulsarService(config);
+        pulsarService.start();
+
+        // Create admin and client
+        admin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsarService.getWebServiceAddress())
+                .build();
+
+        client = PulsarClient.builder()
+                .serviceUrl(pulsarService.getBrokerServiceUrl())
+                .build();
+
+        // Set up default cluster and tenant
+        admin.clusters().createCluster(CLUSTER_NAME,
+                ClusterData.builder()
+                        .serviceUrl(pulsarService.getWebServiceAddress())
+                        .brokerServiceUrl(pulsarService.getBrokerServiceUrl())
+                        .build());
+
+        admin.tenants().createTenant(TENANT_NAME,
+                TenantInfo.builder()
+                        .allowedClusters(Set.of(CLUSTER_NAME))
+                        .build());
+
+        log.info("SharedPulsarCluster started. broker={} web={}",
+                pulsarService.getBrokerServiceUrl(), 
pulsarService.getWebServiceAddress());
+    }
+
+    private void close() throws Exception {
+        log.info("Closing SharedPulsarCluster");
+        if (client != null) {
+            client.close();
+        }
+        if (admin != null) {
+            admin.close();
+        }
+        if (pulsarService != null) {
+            pulsarService.close();
+        }
+        if (bkCluster != null) {
+            bkCluster.close();
+        }
+    }
+}

Reply via email to