This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 77b34cc5b01 [improve][test] Improve integration test profiling test
example (#24701)
77b34cc5b01 is described below
commit 77b34cc5b0176ea81e0d358d3b0f79a6e930be82
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Sep 5 13:59:40 2025 +0300
[improve][test] Improve integration test profiling test example (#24701)
---
tests/docker-images/java-test-image/Dockerfile | 2 +-
.../docker-images/latest-version-image/Dockerfile | 2 +-
.../latest-version-image/conf/supervisord.conf | 1 +
.../latest-version-image/scripts/func-lib.sh | 19 ++-
.../latest-version-image/scripts/run-standalone.sh | 5 +-
.../integration/containers/ChaosContainer.java | 1 -
.../integration/profiling/PulsarProfilingTest.java | 155 +++++++++++++++++----
7 files changed, 149 insertions(+), 36 deletions(-)
diff --git a/tests/docker-images/java-test-image/Dockerfile
b/tests/docker-images/java-test-image/Dockerfile
index 167d7324fb8..f6e0d7a472f 100644
--- a/tests/docker-images/java-test-image/Dockerfile
+++ b/tests/docker-images/java-test-image/Dockerfile
@@ -26,7 +26,7 @@ USER root
COPY target/scripts /pulsar/bin
RUN chmod a+rx /pulsar/bin/*
-RUN apk add --no-cache supervisor
+RUN apk add --no-cache supervisor jq
RUN mkdir -p /var/log/pulsar \
&& mkdir -p /var/run/supervisor/ \
diff --git a/tests/docker-images/latest-version-image/Dockerfile
b/tests/docker-images/latest-version-image/Dockerfile
index e4bfa77355c..6fe2ef656a4 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -62,7 +62,7 @@ FROM $PULSAR_ALL_IMAGE
# However, any processes exec'ing into the containers will run as root, by
default.
USER root
-RUN apk add --no-cache supervisor procps curl
+RUN apk add --no-cache supervisor procps curl jq
RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/
diff --git a/tests/docker-images/latest-version-image/conf/supervisord.conf
b/tests/docker-images/latest-version-image/conf/supervisord.conf
index ee48be32658..3a4da862fdb 100644
--- a/tests/docker-images/latest-version-image/conf/supervisord.conf
+++ b/tests/docker-images/latest-version-image/conf/supervisord.conf
@@ -26,6 +26,7 @@ loglevel=info
pidfile=/var/run/supervisord.pid
minfds=1024
minprocs=200
+user=root
[unix_http_server]
file=/var/run/supervisor/supervisor.sock
diff --git a/tests/docker-images/latest-version-image/scripts/func-lib.sh
b/tests/docker-images/latest-version-image/scripts/func-lib.sh
index b8c89bbde6c..90155479fea 100644
--- a/tests/docker-images/latest-version-image/scripts/func-lib.sh
+++ b/tests/docker-images/latest-version-image/scripts/func-lib.sh
@@ -21,21 +21,30 @@
set -e
set -o pipefail
+function set_pulsar_mem() {
+ local maxMem=$1
+ local additionalMemParam=$2
+ local pulsar_test_mem
+ # set into pulsar_test_mem while trimming whitespace
+ read -r pulsar_test_mem <<< "-Xmx${maxMem} ${additionalMemParam}"
+ # prefer PULSAR_MEM, but always append params to perform a heap dump on OOME
+ export PULSAR_MEM="${PULSAR_MEM:-"${pulsar_test_mem}"}
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError"
+}
+
function run_pulsar_component() {
local component=$1
local supervisord_component=$2
local maxMem=$3
local additionalMemParam=$4
- export PULSAR_MEM="${PULSAR_MEM:-"-Xmx${maxMem} ${additionalMemParam}
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError"}"
- export PULSAR_GC="${PULSAR_GC:-"-XX:+UseZGC"}"
+
+ set_pulsar_mem "$maxMem" "$additionalMemParam"
if [[ -f "conf/${component}.conf" ]]; then
bin/apply-config-from-env.py conf/${component}.conf
fi
- bin/apply-config-from-env.py conf/pulsar_env.sh
+ bin/apply-config-from-env.py conf/client.conf
if [[ "$component" == "functions_worker" ]]; then
- bin/apply-config-from-env.py conf/client.conf
bin/gen-yml-from-env.py conf/functions_worker.yml
fi
@@ -46,7 +55,7 @@ function run_pulsar_component() {
fi
if [ -z "$NO_AUTOSTART" ]; then
- sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/${supervisord_component}.conf
+ sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/${supervisord_component}.conf
fi
exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git a/tests/docker-images/latest-version-image/scripts/run-standalone.sh
b/tests/docker-images/latest-version-image/scripts/run-standalone.sh
index 868a198dd9c..77ba18927c2 100755
--- a/tests/docker-images/latest-version-image/scripts/run-standalone.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-standalone.sh
@@ -18,7 +18,8 @@
# under the License.
#
-export PULSAR_MEM="${PULSAR_MEM:-"-Xmx512M -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/pulsar -XX:+ExitOnOutOfMemoryError"}"
-export PULSAR_GC="${PULSAR_GC:-"-XX:+UseZGC"}"
+source /pulsar/bin/func-lib.sh
+
+set_pulsar_mem 512M
bin/pulsar standalone
\ No newline at end of file
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 58658b0a991..dc5c81ebea8 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -44,7 +44,6 @@ public class ChaosContainer<SelfT extends
ChaosContainer<SelfT>> extends Generic
@Override
protected void configure() {
super.configure();
- addEnv("MALLOC_ARENA_MAX", "1");
}
protected void appendToEnv(String key, String value) {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
index ac312f8cfd5..6310a40a31e 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
@@ -18,6 +18,11 @@
*/
package org.apache.pulsar.tests.integration.profiling;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermissions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,6 +36,7 @@ import
org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.pulsar.tests.integration.utils.DockerUtils;
+import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testng.annotations.Test;
@@ -40,9 +46,9 @@ import org.testng.annotations.Test;
* Example usage:
* # This has been tested on Mac with Orbstack (https://orbstack.dev/) docker
* # compile integration test dependencies
- * mvn -am -pl tests/integration -DskipTests install
+ * mvn -am -pl tests/integration -Dcheckstyle.skip=true -Dlicense.skip=true
-Dspotbugs.skip=true -DskipTests install
* # compile apachepulsar/java-test-image with async profiler (add "clean" to
ensure a clean build with recent changes)
- * ./build/build_java_test_image.sh -Ddocker.install.asyncprofiler=true
+ * ./build/build_java_test_image.sh -Ddocker.install.asyncprofiler=true
-Pdocker-wolfi
* # set environment variables
* export PULSAR_TEST_IMAGE_NAME=apachepulsar/java-test-image:latest
* export NETTY_LEAK_DETECTION=off
@@ -92,31 +98,98 @@ public class PulsarProfilingTest extends PulsarTestSuite {
createContainerCmd.withName(clusterName + "-" + hostname);
});
withEnv("PULSAR_MEM", DEFAULT_PULSAR_MEM);
+ withEnv("PULSAR_GC", "-XX:+UseZGC -XX:+ZGenerational");
setCommand("sleep 1000000");
+ File testOutputDir = new File("target");
+ if (!testOutputDir.exists()) {
+ if (!testOutputDir.mkdirs()) {
+ throw new IllegalArgumentException("Test output directory
+ '" + testOutputDir.getAbsolutePath()
+ + "' doesn't exist and cannot be
created.");
+ }
+ }
+ if (!testOutputDir.isDirectory()) {
+ throw new IllegalArgumentException(
+ "Test output directory '" +
testOutputDir.getAbsolutePath() + "' isn't a directory.");
+ }
+ // change access to testOutputDir to allow all access so the the
container user can write to it
+ // This matters only on Linux
+ try {
+ Files.setPosixFilePermissions(testOutputDir.toPath(),
PosixFilePermissions.fromString("rwxrwxrwx"));
+ } catch (IOException e) {
+ throw new UncheckedIOException("Cannot change access to test
output directory", e);
+ }
+ withFileSystemBind(testOutputDir.getAbsolutePath(), "/testoutput",
BindMode.READ_WRITE);
}
public CompletableFuture<Long> consume(String topicName) throws
Exception {
return DockerUtils.runCommandAsyncWithLogging(getDockerClient(),
getContainerId(),
- "/pulsar/bin/pulsar-perf", "consume", topicName,
- "-u", "pulsar://" + brokerHostname + ":6650",
- "-st", "Shared",
- "-aq",
- "-m", String.valueOf(numberOfMessages), "-ml", "400M");
+ "bash", "-c", "echo $$ > /tmp/command.pid; "
+ + "/pulsar/bin/pulsar-perf consume " + topicName +
" "
+ + "-u pulsar://" + brokerHostname + ":6650 "
+ + "-st Shared "
+ + "-q 50000 "
+ + "-m " + numberOfMessages + " -ml 400M "
+ +
"--histogram-file=/testoutput/consume.histogram.$(date +%s).hdr "
+ + "2>&1 | tee /testoutput/consume.$(date
+%s).txt");
}
public CompletableFuture<Long> produce(String topicName) throws
Exception {
return DockerUtils.runCommandAsyncWithLogging(getDockerClient(),
getContainerId(),
- "/pulsar/bin/pulsar-perf", "produce", topicName,
- "-u", "pulsar://" + brokerHostname + ":6650",
- "-au", "http://" + brokerHostname + ":8080",
- "-r", String.valueOf(Integer.MAX_VALUE), // max-rate
- "-s", "8192", // 8kB message size
- "-m", String.valueOf(numberOfMessages), "-ml", "400M");
+ "bash", "-c", "echo $$ > /tmp/command.pid; "
+ + "/pulsar/bin/pulsar-perf produce " + topicName +
" "
+ + "-u pulsar://" + brokerHostname + ":6650 "
+ + "-au http://" + brokerHostname + ":8080 "
+ + "-r " + Integer.MAX_VALUE + " "
+ + "-s 128 -db "
+ + "-o 20000 "
+ + "-m " + numberOfMessages + " -ml 400M "
+ +
"--histogram-file=/testoutput/produce.histogram.$(date +%s).hdr "
+ + "2>&1 | tee /testoutput/produce.$(date
+%s).txt");
+ }
+
+ public CompletableFuture<Long> stats(String topicName) throws
Exception {
+ String basePath = "http://" + brokerHostname + ":8080/admin/v2/" +
topicName.replace("://", "/");
+ // print out stats and internal stats every 10 seconds
+ return DockerUtils.runCommandAsyncWithLogging(getDockerClient(),
getContainerId(),
+ "bash", "-c",
+ String.format("echo $$ > /tmp/command.pid; "
+ + "while [[ 1 ]]; do "
+ + "curl -s %s/stats | jq | tee
/testoutput/stats.$(date +%%s).txt; "
+ + "sleep 1; "
+ + "curl -s %s/internalStats | jq | tee
/testoutput/internal_stats.$(date +%%s).txt; "
+ + "curl -s http://%s:8080/metrics/ >
/testoutput/metrics.$(date +%%s).txt; "
+ + " sleep 10; "
+ + "done",
+ basePath, basePath, brokerHostname));
+ }
+
+ public void triggerShutdown() {
+ if (isRunning()) {
+ // attempt to stop containers gracefully
+ DockerUtils.runCommandAsyncWithLogging(getDockerClient(),
getContainerId(),
+ "bash", "-c", "pkill java; while pgrep -c
java; do "
+ + "echo Waiting for java processes to
stop.; sleep 1; done; "
+ + "kill $(cat /tmp/command.pid)")
+ .orTimeout(10, TimeUnit.SECONDS)
+ .exceptionally(t -> null)
+ .join();
+ }
+ }
+
+ public void stop() {
+ if (isRunning()) {
+ // attempt to stop containers gracefully
+ dockerClient.stopContainerCmd(getContainerId())
+ .withTimeout(15)
+ .exec();
+ }
+ super.stop();
}
}
private PulsarPerfContainer perfConsume;
private PulsarPerfContainer perfProduce;
+ private PulsarPerfContainer printStats;
@Override
public void setupCluster() throws Exception {
@@ -126,14 +199,27 @@ public class PulsarProfilingTest extends PulsarTestSuite {
@Override
public void tearDownCluster() throws Exception {
+ if (printStats != null) {
+ printStats.triggerShutdown();
+ }
+ if (perfProduce != null) {
+ perfProduce.triggerShutdown();
+ }
if (perfConsume != null) {
- perfConsume.stop();
- perfConsume = null;
+ perfConsume.triggerShutdown();
+ }
+ if (printStats != null) {
+ printStats.stop();
+ printStats = null;
}
if (perfProduce != null) {
perfProduce.stop();
perfProduce = null;
}
+ if (perfConsume != null) {
+ perfConsume.stop();
+ perfConsume = null;
+ }
super.tearDownCluster();
}
@@ -142,7 +228,10 @@ public class PulsarProfilingTest extends PulsarTestSuite {
super.beforeStartCluster();
pulsarCluster.forEachContainer(
// This is effective only when -Pdocker-wolfi has been passed
when building java-test-image
- c -> c.withEnv("GLIBC_TUNABLES",
"glibc.malloc.hugetlb=1:glibc.malloc.mmap_threshold=2097152"));
+ // setting mmap_threshold explicitly will avoid it's dynamic
increase
+ //
https://sourceware.org/glibc/manual/latest/html_node/Memory-Allocation-Tunables.html
+ c -> c.withEnv("GLIBC_TUNABLES",
+
"glibc.malloc.hugetlb=1:glibc.malloc.mmap_threshold=131072:glibc.malloc.arena_max=4"));
}
@Override
@@ -160,15 +249,25 @@ public class PulsarProfilingTest extends PulsarTestSuite {
specBuilder.numProxies(0);
// Increase memory for brokers and configure more aggressive rollover
- specBuilder.brokerEnvs(Map.of("PULSAR_MEM", BROKER_PULSAR_MEM,
- "managedLedgerMinLedgerRolloverTimeMinutes", "1",
- "managedLedgerMaxLedgerRolloverTimeMinutes", "5",
- "managedLedgerMaxSizePerLedgerMbytes", "512",
- "managedLedgerDefaultEnsembleSize", "1",
- "managedLedgerDefaultWriteQuorum", "1",
- "managedLedgerDefaultAckQuorum", "1",
- "maxPendingPublishRequestsPerConnection", "100000"
- ));
+ Map<String, String> brokerEnvs = new HashMap<>();
+ brokerEnvs.put("PULSAR_MEM", BROKER_PULSAR_MEM);
+ brokerEnvs.put("managedLedgerMinLedgerRolloverTimeMinutes", "1");
+ brokerEnvs.put("managedLedgerMaxLedgerRolloverTimeMinutes", "5");
+ brokerEnvs.put("managedLedgerMaxSizePerLedgerMbytes", "512");
+ brokerEnvs.put("managedLedgerDefaultEnsembleSize", "1");
+ brokerEnvs.put("managedLedgerDefaultWriteQuorum", "1");
+ brokerEnvs.put("managedLedgerDefaultAckQuorum", "1");
+ //brokerEnvs.put("maxPendingPublishRequestsPerConnection", "1000");
+ brokerEnvs.put("dispatcherRetryBackoffInitialTimeInMs", "0");
+ brokerEnvs.put("dispatcherRetryBackoffMaxTimeInMs", "0");
+ brokerEnvs.put("preciseDispatcherFlowControl", "true");
+
//brokerEnvs.put("PULSAR_PREFIX_subscriptionKeySharedUseClassicPersistentImplementation",
"true");
+
//brokerEnvs.put("PULSAR_PREFIX_subscriptionSharedUseClassicPersistentImplementation",
"true");
+ brokerEnvs.put("dispatcherMaxReadBatchSize", "1000");
+ //brokerEnvs.put("dispatcherMaxReadSizeBytes", "10000000");
+ //brokerEnvs.put("dispatcherDispatchMessagesInSubscriptionThread",
"false");
+ //brokerEnvs.put("dispatcherMaxRoundRobinBatchSize", "1000");
+ specBuilder.brokerEnvs(brokerEnvs);
// Increase memory for bookkeepers and make compaction run more often
Map<String, String> bkEnv = new HashMap<>();
@@ -190,9 +289,11 @@ public class PulsarProfilingTest extends PulsarTestSuite {
String brokerHostname = clusterName + "-pulsar-broker-0";
perfProduce = new PulsarPerfContainer(clusterName, brokerHostname,
"perf-produce");
perfConsume = new PulsarPerfContainer(clusterName, brokerHostname,
"perf-consume");
+ printStats = new PulsarPerfContainer(clusterName, brokerHostname,
"print-stats");
specBuilder.externalServices(Map.of(
"pulsar-produce", perfProduce,
- "pulsar-consume", perfConsume
+ "pulsar-consume", perfConsume,
+ "print-stats", printStats
));
return specBuilder;
@@ -204,6 +305,8 @@ public class PulsarProfilingTest extends PulsarTestSuite {
CompletableFuture<Long> consumeFuture = perfConsume.consume(topicName);
Thread.sleep(1000);
CompletableFuture<Long> produceFuture = perfProduce.produce(topicName);
+ Thread.sleep(4000);
+ printStats.stats(topicName);
FutureUtil.waitForAll(List.of(consumeFuture, produceFuture))
.orTimeout(3, TimeUnit.MINUTES)
.exceptionally(t -> {