This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b34cc5b01 [improve][test] Improve integration test profiling test 
example (#24701)
77b34cc5b01 is described below

commit 77b34cc5b0176ea81e0d358d3b0f79a6e930be82
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Sep 5 13:59:40 2025 +0300

    [improve][test] Improve integration test profiling test example (#24701)
---
 tests/docker-images/java-test-image/Dockerfile     |   2 +-
 .../docker-images/latest-version-image/Dockerfile  |   2 +-
 .../latest-version-image/conf/supervisord.conf     |   1 +
 .../latest-version-image/scripts/func-lib.sh       |  19 ++-
 .../latest-version-image/scripts/run-standalone.sh |   5 +-
 .../integration/containers/ChaosContainer.java     |   1 -
 .../integration/profiling/PulsarProfilingTest.java | 155 +++++++++++++++++----
 7 files changed, 149 insertions(+), 36 deletions(-)

diff --git a/tests/docker-images/java-test-image/Dockerfile 
b/tests/docker-images/java-test-image/Dockerfile
index 167d7324fb8..f6e0d7a472f 100644
--- a/tests/docker-images/java-test-image/Dockerfile
+++ b/tests/docker-images/java-test-image/Dockerfile
@@ -26,7 +26,7 @@ USER root
 COPY target/scripts /pulsar/bin
 RUN chmod a+rx /pulsar/bin/*
 
-RUN apk add --no-cache supervisor
+RUN apk add --no-cache supervisor jq
 
 RUN mkdir -p /var/log/pulsar \
     && mkdir -p /var/run/supervisor/ \
diff --git a/tests/docker-images/latest-version-image/Dockerfile 
b/tests/docker-images/latest-version-image/Dockerfile
index e4bfa77355c..6fe2ef656a4 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -62,7 +62,7 @@ FROM $PULSAR_ALL_IMAGE
 # However, any processes exec'ing into the containers will run as root, by 
default.
 USER root
 
-RUN apk add --no-cache supervisor procps curl
+RUN apk add --no-cache supervisor procps curl jq
 
 RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/
 
diff --git a/tests/docker-images/latest-version-image/conf/supervisord.conf 
b/tests/docker-images/latest-version-image/conf/supervisord.conf
index ee48be32658..3a4da862fdb 100644
--- a/tests/docker-images/latest-version-image/conf/supervisord.conf
+++ b/tests/docker-images/latest-version-image/conf/supervisord.conf
@@ -26,6 +26,7 @@ loglevel=info
 pidfile=/var/run/supervisord.pid
 minfds=1024
 minprocs=200
+user=root
 
 [unix_http_server]
 file=/var/run/supervisor/supervisor.sock
diff --git a/tests/docker-images/latest-version-image/scripts/func-lib.sh 
b/tests/docker-images/latest-version-image/scripts/func-lib.sh
index b8c89bbde6c..90155479fea 100644
--- a/tests/docker-images/latest-version-image/scripts/func-lib.sh
+++ b/tests/docker-images/latest-version-image/scripts/func-lib.sh
@@ -21,21 +21,30 @@
 set -e
 set -o pipefail
 
+function set_pulsar_mem() {
+  local maxMem=$1
+  local additionalMemParam=$2
+  local pulsar_test_mem
+  # set into pulsar_test_mem while trimming whitespace
+  read -r pulsar_test_mem <<< "-Xmx${maxMem} ${additionalMemParam}"
+  # prefer PULSAR_MEM, but always append params to perform a heap dump on OOME
+  export PULSAR_MEM="${PULSAR_MEM:-"${pulsar_test_mem}"} 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/pulsar 
-XX:+ExitOnOutOfMemoryError"
+}
+
 function run_pulsar_component() {
   local component=$1
   local supervisord_component=$2
   local maxMem=$3
   local additionalMemParam=$4
-  export PULSAR_MEM="${PULSAR_MEM:-"-Xmx${maxMem} ${additionalMemParam} 
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/pulsar 
-XX:+ExitOnOutOfMemoryError"}"
-  export PULSAR_GC="${PULSAR_GC:-"-XX:+UseZGC"}"
+
+  set_pulsar_mem "$maxMem" "$additionalMemParam"
 
   if [[ -f "conf/${component}.conf" ]]; then
     bin/apply-config-from-env.py conf/${component}.conf
   fi
-  bin/apply-config-from-env.py conf/pulsar_env.sh
+  bin/apply-config-from-env.py conf/client.conf
 
   if [[ "$component" == "functions_worker" ]]; then
-    bin/apply-config-from-env.py conf/client.conf
     bin/gen-yml-from-env.py conf/functions_worker.yml
   fi
 
@@ -46,7 +55,7 @@ function run_pulsar_component() {
   fi
 
   if [ -z "$NO_AUTOSTART" ]; then
-      sed -i 's/autostart=.*/autostart=true/' 
/etc/supervisord/conf.d/${supervisord_component}.conf
+    sed -i 's/autostart=.*/autostart=true/' 
/etc/supervisord/conf.d/${supervisord_component}.conf
   fi
 
   exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git a/tests/docker-images/latest-version-image/scripts/run-standalone.sh 
b/tests/docker-images/latest-version-image/scripts/run-standalone.sh
index 868a198dd9c..77ba18927c2 100755
--- a/tests/docker-images/latest-version-image/scripts/run-standalone.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-standalone.sh
@@ -18,7 +18,8 @@
 # under the License.
 #
 
-export PULSAR_MEM="${PULSAR_MEM:-"-Xmx512M -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/var/log/pulsar -XX:+ExitOnOutOfMemoryError"}"
-export PULSAR_GC="${PULSAR_GC:-"-XX:+UseZGC"}"
+source /pulsar/bin/func-lib.sh
+
+set_pulsar_mem 512M
 
 bin/pulsar standalone
\ No newline at end of file
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 58658b0a991..dc5c81ebea8 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -44,7 +44,6 @@ public class ChaosContainer<SelfT extends 
ChaosContainer<SelfT>> extends Generic
     @Override
     protected void configure() {
         super.configure();
-        addEnv("MALLOC_ARENA_MAX", "1");
     }
 
     protected void appendToEnv(String key, String value) {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
index ac312f8cfd5..6310a40a31e 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
@@ -18,6 +18,11 @@
  */
 package org.apache.pulsar.tests.integration.profiling;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermissions;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +36,7 @@ import 
org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
 import org.apache.pulsar.tests.integration.utils.DockerUtils;
+import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.Test;
 
@@ -40,9 +46,9 @@ import org.testng.annotations.Test;
  * Example usage:
  * # This has been tested on Mac with Orbstack (https://orbstack.dev/) docker
  * # compile integration test dependencies
- * mvn -am -pl tests/integration -DskipTests install
+ * mvn -am -pl tests/integration -Dcheckstyle.skip=true -Dlicense.skip=true 
-Dspotbugs.skip=true -DskipTests install
  * # compile apachepulsar/java-test-image with async profiler (add "clean" to 
ensure a clean build with recent changes)
- * ./build/build_java_test_image.sh -Ddocker.install.asyncprofiler=true
+ * ./build/build_java_test_image.sh -Ddocker.install.asyncprofiler=true 
-Pdocker-wolfi
  * # set environment variables
  * export PULSAR_TEST_IMAGE_NAME=apachepulsar/java-test-image:latest
  * export NETTY_LEAK_DETECTION=off
@@ -92,31 +98,98 @@ public class PulsarProfilingTest extends PulsarTestSuite {
                 createContainerCmd.withName(clusterName + "-" + hostname);
             });
             withEnv("PULSAR_MEM", DEFAULT_PULSAR_MEM);
+            withEnv("PULSAR_GC", "-XX:+UseZGC -XX:+ZGenerational");
             setCommand("sleep 1000000");
+            File testOutputDir = new File("target");
+            if (!testOutputDir.exists()) {
+                if (!testOutputDir.mkdirs()) {
+                    throw new IllegalArgumentException("Test output directory 
+ '" + testOutputDir.getAbsolutePath()
+                                    + "' doesn't exist and cannot be 
created.");
+                }
+            }
+            if (!testOutputDir.isDirectory()) {
+                throw new IllegalArgumentException(
+                        "Test output directory '" + 
testOutputDir.getAbsolutePath() + "' isn't a directory.");
+            }
+            // change access to testOutputDir to allow all access so the the 
container user can write to it
+            // This matters only on Linux
+            try {
+                Files.setPosixFilePermissions(testOutputDir.toPath(), 
PosixFilePermissions.fromString("rwxrwxrwx"));
+            } catch (IOException e) {
+                throw new UncheckedIOException("Cannot change access to test 
output directory", e);
+            }
+            withFileSystemBind(testOutputDir.getAbsolutePath(), "/testoutput", 
BindMode.READ_WRITE);
         }
 
         public CompletableFuture<Long> consume(String topicName) throws 
Exception {
             return DockerUtils.runCommandAsyncWithLogging(getDockerClient(), 
getContainerId(),
-                    "/pulsar/bin/pulsar-perf", "consume", topicName,
-                    "-u", "pulsar://" + brokerHostname + ":6650",
-                    "-st", "Shared",
-                    "-aq",
-                    "-m", String.valueOf(numberOfMessages), "-ml", "400M");
+                    "bash", "-c", "echo $$ > /tmp/command.pid; "
+                            + "/pulsar/bin/pulsar-perf consume " + topicName + 
" "
+                            + "-u pulsar://" + brokerHostname + ":6650 "
+                            + "-st Shared "
+                            + "-q 50000 "
+                            + "-m " + numberOfMessages + " -ml 400M "
+                            + 
"--histogram-file=/testoutput/consume.histogram.$(date +%s).hdr "
+                            + "2>&1 | tee /testoutput/consume.$(date 
+%s).txt");
         }
 
         public CompletableFuture<Long> produce(String topicName) throws 
Exception {
             return DockerUtils.runCommandAsyncWithLogging(getDockerClient(), 
getContainerId(),
-                    "/pulsar/bin/pulsar-perf", "produce", topicName,
-                    "-u", "pulsar://" + brokerHostname + ":6650",
-                    "-au", "http://"; + brokerHostname + ":8080",
-                    "-r", String.valueOf(Integer.MAX_VALUE), // max-rate
-                    "-s", "8192", // 8kB message size
-                    "-m", String.valueOf(numberOfMessages), "-ml", "400M");
+                    "bash", "-c", "echo $$ > /tmp/command.pid; "
+                            + "/pulsar/bin/pulsar-perf produce " + topicName + 
" "
+                            + "-u pulsar://" + brokerHostname + ":6650 "
+                            + "-au http://"; + brokerHostname + ":8080 "
+                            + "-r " + Integer.MAX_VALUE + " "
+                            + "-s 128 -db "
+                            + "-o 20000 "
+                            + "-m " + numberOfMessages + " -ml 400M "
+                            + 
"--histogram-file=/testoutput/produce.histogram.$(date +%s).hdr "
+                            + "2>&1 | tee /testoutput/produce.$(date 
+%s).txt");
+        }
+
+        public CompletableFuture<Long> stats(String topicName) throws 
Exception {
+            String basePath = "http://"; + brokerHostname + ":8080/admin/v2/" + 
topicName.replace("://", "/");
+            // print out stats and internal stats every 10 seconds
+            return DockerUtils.runCommandAsyncWithLogging(getDockerClient(), 
getContainerId(),
+                    "bash", "-c",
+                    String.format("echo $$ > /tmp/command.pid; "
+                            + "while [[ 1 ]]; do "
+                            + "curl -s %s/stats | jq | tee 
/testoutput/stats.$(date +%%s).txt; "
+                            + "sleep 1; "
+                            + "curl -s %s/internalStats | jq | tee 
/testoutput/internal_stats.$(date +%%s).txt; "
+                            + "curl -s http://%s:8080/metrics/ > 
/testoutput/metrics.$(date +%%s).txt; "
+                            + " sleep 10; "
+                            + "done",
+                            basePath, basePath, brokerHostname));
+        }
+
+        public void triggerShutdown() {
+            if (isRunning()) {
+                // attempt to stop containers gracefully
+                DockerUtils.runCommandAsyncWithLogging(getDockerClient(), 
getContainerId(),
+                                "bash", "-c", "pkill java; while pgrep -c 
java; do "
+                                        + "echo Waiting for java processes to 
stop.; sleep 1; done; "
+                                        + "kill $(cat /tmp/command.pid)")
+                        .orTimeout(10, TimeUnit.SECONDS)
+                        .exceptionally(t -> null)
+                        .join();
+            }
+        }
+
+        public void stop() {
+            if (isRunning()) {
+                // attempt to stop containers gracefully
+                dockerClient.stopContainerCmd(getContainerId())
+                        .withTimeout(15)
+                        .exec();
+            }
+            super.stop();
         }
     }
 
     private PulsarPerfContainer perfConsume;
     private PulsarPerfContainer perfProduce;
+    private PulsarPerfContainer printStats;
 
     @Override
     public void setupCluster() throws Exception {
@@ -126,14 +199,27 @@ public class PulsarProfilingTest extends PulsarTestSuite {
 
     @Override
     public void tearDownCluster() throws Exception {
+        if (printStats != null) {
+            printStats.triggerShutdown();
+        }
+        if (perfProduce != null) {
+            perfProduce.triggerShutdown();
+        }
         if (perfConsume != null) {
-            perfConsume.stop();
-            perfConsume = null;
+            perfConsume.triggerShutdown();
+        }
+        if (printStats != null) {
+            printStats.stop();
+            printStats = null;
         }
         if (perfProduce != null) {
             perfProduce.stop();
             perfProduce = null;
         }
+        if (perfConsume != null) {
+            perfConsume.stop();
+            perfConsume = null;
+        }
         super.tearDownCluster();
     }
 
@@ -142,7 +228,10 @@ public class PulsarProfilingTest extends PulsarTestSuite {
         super.beforeStartCluster();
         pulsarCluster.forEachContainer(
                 // This is effective only when -Pdocker-wolfi has been passed 
when building java-test-image
-                c -> c.withEnv("GLIBC_TUNABLES", 
"glibc.malloc.hugetlb=1:glibc.malloc.mmap_threshold=2097152"));
+                // setting mmap_threshold explicitly will avoid it's dynamic 
increase
+                // 
https://sourceware.org/glibc/manual/latest/html_node/Memory-Allocation-Tunables.html
+                c -> c.withEnv("GLIBC_TUNABLES",
+                        
"glibc.malloc.hugetlb=1:glibc.malloc.mmap_threshold=131072:glibc.malloc.arena_max=4"));
     }
 
     @Override
@@ -160,15 +249,25 @@ public class PulsarProfilingTest extends PulsarTestSuite {
         specBuilder.numProxies(0);
 
         // Increase memory for brokers and configure more aggressive rollover
-        specBuilder.brokerEnvs(Map.of("PULSAR_MEM", BROKER_PULSAR_MEM,
-                "managedLedgerMinLedgerRolloverTimeMinutes", "1",
-                "managedLedgerMaxLedgerRolloverTimeMinutes", "5",
-                "managedLedgerMaxSizePerLedgerMbytes", "512",
-                "managedLedgerDefaultEnsembleSize", "1",
-                "managedLedgerDefaultWriteQuorum", "1",
-                "managedLedgerDefaultAckQuorum", "1",
-                "maxPendingPublishRequestsPerConnection", "100000"
-        ));
+        Map<String, String> brokerEnvs = new HashMap<>();
+        brokerEnvs.put("PULSAR_MEM", BROKER_PULSAR_MEM);
+        brokerEnvs.put("managedLedgerMinLedgerRolloverTimeMinutes", "1");
+        brokerEnvs.put("managedLedgerMaxLedgerRolloverTimeMinutes", "5");
+        brokerEnvs.put("managedLedgerMaxSizePerLedgerMbytes", "512");
+        brokerEnvs.put("managedLedgerDefaultEnsembleSize", "1");
+        brokerEnvs.put("managedLedgerDefaultWriteQuorum", "1");
+        brokerEnvs.put("managedLedgerDefaultAckQuorum", "1");
+        //brokerEnvs.put("maxPendingPublishRequestsPerConnection", "1000");
+        brokerEnvs.put("dispatcherRetryBackoffInitialTimeInMs", "0");
+        brokerEnvs.put("dispatcherRetryBackoffMaxTimeInMs", "0");
+        brokerEnvs.put("preciseDispatcherFlowControl", "true");
+        
//brokerEnvs.put("PULSAR_PREFIX_subscriptionKeySharedUseClassicPersistentImplementation",
 "true");
+        
//brokerEnvs.put("PULSAR_PREFIX_subscriptionSharedUseClassicPersistentImplementation",
 "true");
+        brokerEnvs.put("dispatcherMaxReadBatchSize", "1000");
+        //brokerEnvs.put("dispatcherMaxReadSizeBytes", "10000000");
+        //brokerEnvs.put("dispatcherDispatchMessagesInSubscriptionThread", 
"false");
+        //brokerEnvs.put("dispatcherMaxRoundRobinBatchSize", "1000");
+        specBuilder.brokerEnvs(brokerEnvs);
 
         // Increase memory for bookkeepers and make compaction run more often
         Map<String, String> bkEnv = new HashMap<>();
@@ -190,9 +289,11 @@ public class PulsarProfilingTest extends PulsarTestSuite {
         String brokerHostname = clusterName + "-pulsar-broker-0";
         perfProduce = new PulsarPerfContainer(clusterName, brokerHostname, 
"perf-produce");
         perfConsume = new PulsarPerfContainer(clusterName, brokerHostname, 
"perf-consume");
+        printStats = new PulsarPerfContainer(clusterName, brokerHostname, 
"print-stats");
         specBuilder.externalServices(Map.of(
                 "pulsar-produce", perfProduce,
-                "pulsar-consume", perfConsume
+                "pulsar-consume", perfConsume,
+                "print-stats", printStats
         ));
 
         return specBuilder;
@@ -204,6 +305,8 @@ public class PulsarProfilingTest extends PulsarTestSuite {
         CompletableFuture<Long> consumeFuture = perfConsume.consume(topicName);
         Thread.sleep(1000);
         CompletableFuture<Long> produceFuture = perfProduce.produce(topicName);
+        Thread.sleep(4000);
+        printStats.stats(topicName);
         FutureUtil.waitForAll(List.of(consumeFuture, produceFuture))
                 .orTimeout(3, TimeUnit.MINUTES)
                 .exceptionally(t -> {

Reply via email to