This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 413bfe71d9e [improve][build] Add Async Profiler integration for
integration tests (#24688)
413bfe71d9e is described below
commit 413bfe71d9e90e79dae985a4f5a417065157fb8b
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Sep 1 09:33:40 2025 +0300
[improve][build] Add Async Profiler integration for integration tests
(#24688)
---
build/build_java_test_image.sh | 2 +-
.../org/apache/pulsar/tests/ManualTestUtil.java | 37 ++++
pom.xml | 31 ++-
.../AbstractBrokerEntryCacheMultiBrokerTest.java | 13 +-
tests/docker-images/java-test-image/Dockerfile | 14 +-
tests/docker-images/java-test-image/pom.xml | 19 +-
.../docker-images/latest-version-image/Dockerfile | 23 ++-
.../latest-version-image/conf/bookie.conf | 1 -
.../latest-version-image/conf/broker.conf | 1 -
.../conf/functions_worker.conf | 1 -
.../latest-version-image/conf/global-zk.conf | 1 -
.../latest-version-image/conf/local-zk.conf | 1 -
.../latest-version-image/conf/proxy.conf | 1 -
.../latest-version-image/conf/websocket.conf | 1 -
tests/docker-images/latest-version-image/pom.xml | 4 +
.../latest-version-image/scripts/func-lib.sh | 53 ++++++
.../latest-version-image/scripts/run-bookie.sh | 12 +-
.../latest-version-image/scripts/run-broker.sh | 10 +-
.../scripts/run-functions-worker.sh | 11 +-
.../latest-version-image/scripts/run-global-zk.sh | 10 +-
.../latest-version-image/scripts/run-local-zk.sh | 10 +-
.../latest-version-image/scripts/run-proxy.sh | 9 +-
.../latest-version-image/scripts/run-standalone.sh | 3 +
.../latest-version-image/scripts/run-websocket.sh | 9 +-
tests/integration/pom.xml | 36 +++-
.../integration/containers/ChaosContainer.java | 2 +-
.../integration/containers/PulsarContainer.java | 80 +++++++-
.../loadbalance/ExtensibleLoadManagerTest.java | 1 -
.../integration/profiling/PulsarProfilingTest.java | 211 +++++++++++++++++++++
.../integration/topologies/PulsarCluster.java | 11 +-
.../integration/topologies/PulsarClusterSpec.java | 15 ++
.../tests/integration/utils/DockerUtils.java | 31 ++-
32 files changed, 546 insertions(+), 118 deletions(-)
diff --git a/build/build_java_test_image.sh b/build/build_java_test_image.sh
index 3869b668805..51257c992dc 100755
--- a/build/build_java_test_image.sh
+++ b/build/build_java_test_image.sh
@@ -21,5 +21,5 @@
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
cd "$SCRIPT_DIR/.."
mvn -am -pl tests/docker-images/java-test-image
-Pcore-modules,-main,integrationTests,docker \
- -Dmaven.test.skip=true -DskipSourceReleaseAssembly=true -Dspotbugs.skip=true
-Dlicense.skip=true \
+ -Dmaven.test.skip=true -DskipSourceReleaseAssembly=true -Dspotbugs.skip=true
-Dcheckstyle.skip=true -Dlicense.skip=true \
"$@" install
\ No newline at end of file
diff --git
a/buildtools/src/main/java/org/apache/pulsar/tests/ManualTestUtil.java
b/buildtools/src/main/java/org/apache/pulsar/tests/ManualTestUtil.java
new file mode 100644
index 00000000000..d64e550f5ca
--- /dev/null
+++ b/buildtools/src/main/java/org/apache/pulsar/tests/ManualTestUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests;
+
+import org.apache.commons.lang3.BooleanUtils;
+import org.testng.SkipException;
+
+public class ManualTestUtil {
+
+ public static void skipManualTestIfNotEnabled() {
+ if (!BooleanUtils.toBoolean(System.getenv("ENABLE_MANUAL_TEST")) &&
!isRunningInIntelliJ()) {
+ throw new SkipException("This test requires setting
ENABLE_MANUAL_TEST=true environment variable.");
+ }
+ }
+
+ public static boolean isRunningInIntelliJ() {
+ // Check for IntelliJ-specific system properties
+ return System.getProperty("idea.test.cyclic.buffer.size") != null
+ || System.getProperty("java.class.path",
"").contains("idea_rt.jar");
+ }
+}
diff --git a/pom.xml b/pom.xml
index c618fad87e2..82efc61b667 100644
--- a/pom.xml
+++ b/pom.xml
@@ -345,7 +345,7 @@ flexible messaging model and an intuitive client
API.</description>
<maven-enforcer-plugin.version>3.3.0</maven-enforcer-plugin.version>
<!-- surefire.version is defined in apache parent pom -->
<!-- it is used for surefire, failsafe and surefire-report plugins -->
- <surefire.version>3.1.0</surefire.version>
+ <surefire.version>3.5.3</surefire.version>
<maven-assembly-plugin.version>3.5.0</maven-assembly-plugin.version>
<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
<maven-dependency-plugin.version>3.5.0</maven-dependency-plugin.version>
@@ -3078,12 +3078,14 @@ flexible messaging model and an intuitive client
API.</description>
-->
<test.asyncprofiler.event>itimer</test.asyncprofiler.event>
<!-- Assumes Async Profiler 4.1+ to be installed, which supports "all"
option -->
-
<test.asyncprofiler.opts>event=${test.asyncprofiler.event},all,jfrsync=profile</test.asyncprofiler.opts>
+ <!-- See
https://github.com/async-profiler/async-profiler/blob/v4.1/src/arguments.cpp#L45
for supported options -->
+
<test.asyncprofiler.opts>event=${test.asyncprofiler.event},all,alloc=2m,jfrsync=profile</test.asyncprofiler.opts>
+ <test.asyncprofiler.outputformat>jfr</test.asyncprofiler.outputformat>
<test.asyncprofiler.args>
-XX:+UnlockDiagnosticVMOptions
-XX:+DebugNonSafepoints
-Xms${testMaxHeapSize} -XX:+AlwaysPreTouch
-
-agentpath:${env.LIBASYNCPROFILER_PATH}=start,${test.asyncprofiler.opts},jfr,file=${pulsar.basedir}/target/test_profile_${maven.build.timestamp}_${surefire.forkNumber}_%p.jfr
+
-agentpath:${env.LIBASYNCPROFILER_PATH}=start,${test.asyncprofiler.opts},quiet,file=${pulsar.basedir}/target/test_profile_${git.commit.id.abbrev}_${maven.build.timestamp}_${surefire.forkNumber}_%p.${test.asyncprofiler.outputformat}
-Dpulsar.test.logging.appender=FILE
-Dpulsar.test.logging.file=target/test_${maven.build.timestamp}_${surefire.forkNumber}.log
</test.asyncprofiler.args>
@@ -3092,6 +3094,29 @@ flexible messaging model and an intuitive client
API.</description>
<redirectTestOutputToFile>false</redirectTestOutputToFile>
<test.netty.leakdetection.args></test.netty.leakdetection.args>
</properties>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ <injectAllReactorProjects>true</injectAllReactorProjects>
+ <skipPoms>false</skipPoms>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <!-- This profile is used in addition to -PtestAsyncProfiler to profile
created exceptions
+ This works only on Linux.
+ -->
+ <id>testAsyncProfilerExceptions</id>
+ <properties>
+
<test.asyncprofiler.opts>event=Java_java_lang_Throwable_fillInStackTrace,tree,reverse</test.asyncprofiler.opts>
+ <test.asyncprofiler.outputformat>html</test.asyncprofiler.outputformat>
+ </properties>
</profile>
<!-- profile that redirects log4j logs to a file -->
<profile>
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
index 862ef6719cb..b525440d163 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/AbstractBrokerEntryCacheMultiBrokerTest.java
@@ -36,7 +36,6 @@ import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.commons.lang3.BooleanUtils;
import org.apache.pulsar.broker.MultiBrokerTestZKBaseTest;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
@@ -46,7 +45,7 @@ import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.netty.ChannelFutures;
-import org.testng.SkipException;
+import org.apache.pulsar.tests.ManualTestUtil;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -140,15 +139,7 @@ public abstract class
AbstractBrokerEntryCacheMultiBrokerTest extends MultiBroke
@Override
protected void beforeSetup() {
- if (!BooleanUtils.toBoolean(System.getenv("ENABLE_MANUAL_TEST")) &&
!isRunningInIntelliJ()) {
- throw new SkipException("This test requires setting
ENABLE_MANUAL_TEST=true environment variable.");
- }
- }
-
- static boolean isRunningInIntelliJ() {
- // Check for IntelliJ-specific system properties
- return System.getProperty("idea.test.cyclic.buffer.size") != null
- || System.getProperty("java.class.path",
"").contains("idea_rt.jar");
+ ManualTestUtil.skipManualTestIfNotEnabled();
}
@Override
diff --git a/tests/docker-images/java-test-image/Dockerfile
b/tests/docker-images/java-test-image/Dockerfile
index 9cf45130d02..167d7324fb8 100644
--- a/tests/docker-images/java-test-image/Dockerfile
+++ b/tests/docker-images/java-test-image/Dockerfile
@@ -41,4 +41,16 @@ COPY target/certificate-authority
/pulsar/certificate-authority/
COPY target/java-test-functions.jar /pulsar/examples/
# copy buildtools.jar to /pulsar/lib so that
org.apache.pulsar.tests.ExtendedNettyLeakDetector can be used
-COPY target/buildtools.jar /pulsar/lib/
\ No newline at end of file
+COPY target/buildtools.jar /pulsar/lib/
+
+ARG INSTALL_ASYNC_PROFILER=false
+# install async-profiler from
https://github.com/async-profiler/async-profiler/releases/tag/v4.1 to
/opt/async-profiler
+RUN if [ "${INSTALL_ASYNC_PROFILER}" = "true" ]; then \
+ mkdir -p /opt/async-profiler \
+ && cd /opt/async-profiler \
+ && curl -L \
+
"https://github.com/async-profiler/async-profiler/releases/download/v4.1/async-profiler-4.1-linux-$([
"$(uname -m)" = "aarch64" ] && echo "arm64" || echo "x64").tar.gz" \
+ | tar -zxvf - --strip-components=1; \
+ else \
+ echo "async-profiler installation skipped"; \
+ fi
\ No newline at end of file
diff --git a/tests/docker-images/java-test-image/pom.xml
b/tests/docker-images/java-test-image/pom.xml
index 3d265d37dba..0512075573a 100644
--- a/tests/docker-images/java-test-image/pom.xml
+++ b/tests/docker-images/java-test-image/pom.xml
@@ -38,18 +38,15 @@
<name>integrationTests</name>
</property>
</activation>
+ <properties>
+ <docker.install.asyncprofiler>false</docker.install.asyncprofiler>
+ </properties>
<dependencies>
- <dependency>
- <groupId>org.apache.pulsar.tests</groupId>
- <artifactId>java-test-functions</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-server-distribution</artifactId>
+ <artifactId>pulsar-docker-image</artifactId>
<version>${project.parent.version}</version>
- <classifier>bin</classifier>
- <type>tar.gz</type>
+ <type>pom</type>
<scope>provided</scope>
<exclusions>
<exclusion>
@@ -58,6 +55,11 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar.tests</groupId>
+ <artifactId>java-test-functions</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
</dependencies>
<build>
<plugins>
@@ -161,6 +163,7 @@
<build>
<args>
<PULSAR_IMAGE>${docker.organization}/${docker.image}:${project.version}-${git.commit.id.abbrev}</PULSAR_IMAGE>
+
<INSTALL_ASYNC_PROFILER>${docker.install.asyncprofiler}</INSTALL_ASYNC_PROFILER>
</args>
<contextDir>${project.basedir}</contextDir>
<noCache>true</noCache>
diff --git a/tests/docker-images/latest-version-image/Dockerfile
b/tests/docker-images/latest-version-image/Dockerfile
index 316f499083d..981c892d60e 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -65,14 +65,11 @@ RUN apk add --no-cache supervisor procps curl
RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/
-COPY conf/supervisord.conf /etc/supervisord.conf
-COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf
conf/functions_worker.conf \
- conf/proxy.conf conf/websocket.conf /etc/supervisord/conf.d/
+COPY conf /etc/supervisord/conf.d/
+RUN mv /etc/supervisord/conf.d/supervisord.conf /etc/supervisord.conf
-COPY scripts/run-global-zk.sh scripts/run-local-zk.sh \
- scripts/run-bookie.sh scripts/run-broker.sh
scripts/run-functions-worker.sh scripts/run-proxy.sh \
- scripts/run-standalone.sh scripts/run-websocket.sh \
- /pulsar/bin/
+COPY scripts /pulsar/bin/
+RUN chmod a+rx /pulsar/bin/*
# copy python test examples
RUN mkdir -p /pulsar/instances/deps
@@ -107,4 +104,16 @@ COPY --from=oracle-jdbc-builder
/pulsar/connectors/pulsar-io-debezium-oracle-*.n
RUN mkdir -p pulsar
RUN chmod g+rwx pulsar
+ARG INSTALL_ASYNC_PROFILER=false
+# install async-profiler from
https://github.com/async-profiler/async-profiler/releases/tag/v4.1 to
/opt/async-profiler
+RUN if [ "${INSTALL_ASYNC_PROFILER}" = "true" ]; then \
+ mkdir -p /opt/async-profiler \
+ && cd /opt/async-profiler \
+ && curl -L \
+
"https://github.com/async-profiler/async-profiler/releases/download/v4.1/async-profiler-4.1-linux-$([
"$(uname -m)" = "aarch64" ] && echo "arm64" || echo "x64").tar.gz" \
+ | tar -zxvf - --strip-components=1; \
+ else \
+ echo "async-profiler installation skipped"; \
+ fi
+
CMD bash
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/conf/bookie.conf
b/tests/docker-images/latest-version-image/conf/bookie.conf
index df7501057a5..8e9dd2dd5bf 100644
--- a/tests/docker-images/latest-version-image/conf/bookie.conf
+++ b/tests/docker-images/latest-version-image/conf/bookie.conf
@@ -22,7 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/bookie.log
directory=/pulsar
-environment=PULSAR_MEM="-Xmx128M -XX:MaxDirectMemorySize=512M
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError",PULSAR_GC="-XX:+UseZGC"
command=/pulsar/bin/pulsar bookie
user=pulsar
stopwaitsecs=15
diff --git a/tests/docker-images/latest-version-image/conf/broker.conf
b/tests/docker-images/latest-version-image/conf/broker.conf
index 790dace8d6d..1aa143b8d54 100644
--- a/tests/docker-images/latest-version-image/conf/broker.conf
+++ b/tests/docker-images/latest-version-image/conf/broker.conf
@@ -22,7 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/broker.log
directory=/pulsar
-environment=PULSAR_MEM="-Xmx150M -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError",PULSAR_GC="-XX:+UseZGC"
command=/pulsar/bin/pulsar broker
user=pulsar
stopwaitsecs=15
diff --git
a/tests/docker-images/latest-version-image/conf/functions_worker.conf
b/tests/docker-images/latest-version-image/conf/functions_worker.conf
index b5d151ce3f9..bd4d65552fd 100644
--- a/tests/docker-images/latest-version-image/conf/functions_worker.conf
+++ b/tests/docker-images/latest-version-image/conf/functions_worker.conf
@@ -22,7 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/functions_worker.log
directory=/pulsar
-environment=PULSAR_MEM="-Xmx150M -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError",PULSAR_GC="-XX:+UseZGC"
command=/pulsar/bin/pulsar functions-worker
user=pulsar
stopwaitsecs=15
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/conf/global-zk.conf
b/tests/docker-images/latest-version-image/conf/global-zk.conf
index ef521506846..3e485e730b0 100644
--- a/tests/docker-images/latest-version-image/conf/global-zk.conf
+++ b/tests/docker-images/latest-version-image/conf/global-zk.conf
@@ -22,7 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/global-zk.log
directory=/pulsar
-environment=PULSAR_MEM="-Xmx128M -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError",PULSAR_GC="-XX:+UseZGC"
command=/pulsar/bin/pulsar configuration-store
user=pulsar
stopwaitsecs=15
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/conf/local-zk.conf
b/tests/docker-images/latest-version-image/conf/local-zk.conf
index d6bfdcb621b..11efaa2a894 100644
--- a/tests/docker-images/latest-version-image/conf/local-zk.conf
+++ b/tests/docker-images/latest-version-image/conf/local-zk.conf
@@ -22,7 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/local-zk.log
directory=/pulsar
-environment=PULSAR_MEM="-Xmx128M -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError",PULSAR_GC="-XX:+UseZGC"
command=/pulsar/bin/pulsar zookeeper
user=pulsar
stopwaitsecs=15
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/conf/proxy.conf
b/tests/docker-images/latest-version-image/conf/proxy.conf
index 17a0a658b42..a6fb19b5a9a 100644
--- a/tests/docker-images/latest-version-image/conf/proxy.conf
+++ b/tests/docker-images/latest-version-image/conf/proxy.conf
@@ -22,7 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/proxy.log
directory=/pulsar
-environment=PULSAR_MEM="-Xmx150M -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError",PULSAR_GC="-XX:+UseZGC"
command=/pulsar/bin/pulsar proxy
user=pulsar
stopwaitsecs=15
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/conf/websocket.conf
b/tests/docker-images/latest-version-image/conf/websocket.conf
index 7625dba3e03..55a52d70dcc 100644
--- a/tests/docker-images/latest-version-image/conf/websocket.conf
+++ b/tests/docker-images/latest-version-image/conf/websocket.conf
@@ -22,7 +22,6 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/pulsar-websocket.log
directory=/pulsar
-environment=PULSAR_MEM="-Xmx150M -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError",PULSAR_GC="-XX:+UseZGC"
command=/pulsar/bin/pulsar websocket
user=pulsar
stopwaitsecs=15
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/pom.xml
b/tests/docker-images/latest-version-image/pom.xml
index 29108e529b7..f50e1518802 100644
--- a/tests/docker-images/latest-version-image/pom.xml
+++ b/tests/docker-images/latest-version-image/pom.xml
@@ -38,6 +38,9 @@
<name>integrationTests</name>
</property>
</activation>
+ <properties>
+ <docker.install.asyncprofiler>false</docker.install.asyncprofiler>
+ </properties>
<dependencies>
<dependency>
<groupId>org.apache.pulsar.tests</groupId>
@@ -166,6 +169,7 @@
<contextDir>${project.basedir}</contextDir>
<args>
<PULSAR_ALL_IMAGE>${docker.organization}/${docker.image}-all:${project.version}-${git.commit.id.abbrev}</PULSAR_ALL_IMAGE>
+
<INSTALL_ASYNC_PROFILER>${docker.install.asyncprofiler}</INSTALL_ASYNC_PROFILER>
</args>
<noCache>true</noCache>
<buildx>
diff --git a/tests/docker-images/latest-version-image/scripts/func-lib.sh
b/tests/docker-images/latest-version-image/scripts/func-lib.sh
new file mode 100644
index 00000000000..b8c89bbde6c
--- /dev/null
+++ b/tests/docker-images/latest-version-image/scripts/func-lib.sh
@@ -0,0 +1,53 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set -e
+set -o pipefail
+
+function run_pulsar_component() {
+ local component=$1
+ local supervisord_component=$2
+ local maxMem=$3
+ local additionalMemParam=$4
+ export PULSAR_MEM="${PULSAR_MEM:-"-Xmx${maxMem} ${additionalMemParam}
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/pulsar
-XX:+ExitOnOutOfMemoryError"}"
+ export PULSAR_GC="${PULSAR_GC:-"-XX:+UseZGC"}"
+
+ if [[ -f "conf/${component}.conf" ]]; then
+ bin/apply-config-from-env.py conf/${component}.conf
+ fi
+ bin/apply-config-from-env.py conf/pulsar_env.sh
+
+ if [[ "$component" == "functions_worker" ]]; then
+ bin/apply-config-from-env.py conf/client.conf
+ bin/gen-yml-from-env.py conf/functions_worker.yml
+ fi
+
+ if [[ "${supervisord_component}" == "global-zk" ]]; then
+ bin/generate-zookeeper-config.sh conf/global_zookeeper.conf
+ elif [[ "${supervisord_component}" == "local-zk" ]]; then
+ bin/generate-zookeeper-config.sh conf/zookeeper.conf
+ fi
+
+ if [ -z "$NO_AUTOSTART" ]; then
+ sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/${supervisord_component}.conf
+ fi
+
+ exec /usr/bin/supervisord -c /etc/supervisord.conf
+}
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/scripts/run-bookie.sh
b/tests/docker-images/latest-version-image/scripts/run-bookie.sh
index e454e667645..047fbd51208 100755
--- a/tests/docker-images/latest-version-image/scripts/run-bookie.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-bookie.sh
@@ -18,16 +18,10 @@
# under the License.
#
+source /pulsar/bin/func-lib.sh
+
# sets dbStorage_writeCacheMaxSizeMb and dbStorage_readAheadCacheMaxSizeMb if
not already defined
export dbStorage_writeCacheMaxSizeMb="${dbStorage_writeCacheMaxSizeMb:-16}"
export
dbStorage_readAheadCacheMaxSizeMb="${dbStorage_readAheadCacheMaxSizeMb:-16}"
-bin/apply-config-from-env.py conf/bookkeeper.conf && \
- bin/apply-config-from-env.py conf/pulsar_env.sh
-
-if [ -z "$NO_AUTOSTART" ]; then
- sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/bookie.conf
-fi
-
-exec /usr/bin/supervisord -c /etc/supervisord.conf
-
+run_pulsar_component bookkeeper bookie 128M -XX:MaxDirectMemorySize=512M
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/scripts/run-broker.sh
b/tests/docker-images/latest-version-image/scripts/run-broker.sh
index 4f89f145f2b..5f6bfd12172 100755
--- a/tests/docker-images/latest-version-image/scripts/run-broker.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-broker.sh
@@ -18,12 +18,6 @@
# under the License.
#
-bin/apply-config-from-env.py conf/broker.conf && \
- bin/apply-config-from-env.py conf/pulsar_env.sh
-
-if [ -z "$NO_AUTOSTART" ]; then
- sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/broker.conf
-fi
-
-exec /usr/bin/supervisord -c /etc/supervisord.conf
+source /pulsar/bin/func-lib.sh
+run_pulsar_component broker broker 150M
\ No newline at end of file
diff --git
a/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
index cd9d7593dbf..1ca2d968585 100755
--- a/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-functions-worker.sh
@@ -18,13 +18,6 @@
# under the License.
#
-bin/apply-config-from-env.py conf/client.conf && \
- bin/gen-yml-from-env.py conf/functions_worker.yml && \
- bin/apply-config-from-env.py conf/pulsar_env.sh
-
-if [ -z "$NO_AUTOSTART" ]; then
- sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/functions_worker.conf
-fi
-
-exec /usr/bin/supervisord -c /etc/supervisord.conf
+source /pulsar/bin/func-lib.sh
+run_pulsar_component functions_worker functions_worker 150M
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/scripts/run-global-zk.sh
b/tests/docker-images/latest-version-image/scripts/run-global-zk.sh
index 783ef2d4680..1b5782d45d7 100755
--- a/tests/docker-images/latest-version-image/scripts/run-global-zk.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-global-zk.sh
@@ -18,12 +18,6 @@
# under the License.
#
-bin/apply-config-from-env.py conf/zookeeper.conf && \
- bin/apply-config-from-env.py conf/pulsar_env.sh && \
- bin/generate-zookeeper-config.sh conf/global_zookeeper.conf
+source /pulsar/bin/func-lib.sh
-if [ -z "$NO_AUTOSTART" ]; then
- sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/global-zk.conf
-fi
-
-exec /usr/bin/supervisord -c /etc/supervisord.conf
+run_pulsar_component zookeeper global-zk 128M
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/scripts/run-local-zk.sh
b/tests/docker-images/latest-version-image/scripts/run-local-zk.sh
index b27212b6ba9..d510ea1c464 100755
--- a/tests/docker-images/latest-version-image/scripts/run-local-zk.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-local-zk.sh
@@ -18,12 +18,6 @@
# under the License.
#
-bin/apply-config-from-env.py conf/zookeeper.conf && \
- bin/apply-config-from-env.py conf/pulsar_env.sh && \
- bin/generate-zookeeper-config.sh conf/zookeeper.conf
+source /pulsar/bin/func-lib.sh
-if [ -z "$NO_AUTOSTART" ]; then
- sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/local-zk.conf
-fi
-
-exec /usr/bin/supervisord -c /etc/supervisord.conf
+run_pulsar_component zookeeper local-zk 128M
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/scripts/run-proxy.sh
b/tests/docker-images/latest-version-image/scripts/run-proxy.sh
index f44ed0bb658..64c9d2a5067 100755
--- a/tests/docker-images/latest-version-image/scripts/run-proxy.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-proxy.sh
@@ -18,11 +18,6 @@
# under the License.
#
-bin/apply-config-from-env.py conf/proxy.conf && \
- bin/apply-config-from-env.py conf/pulsar_env.sh
+source /pulsar/bin/func-lib.sh
-if [ -z "$NO_AUTOSTART" ]; then
- sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/proxy.conf
-fi
-
-exec /usr/bin/supervisord -c /etc/supervisord.conf
+run_pulsar_component proxy proxy 150M
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/scripts/run-standalone.sh
b/tests/docker-images/latest-version-image/scripts/run-standalone.sh
index c29f2ea91ef..868a198dd9c 100755
--- a/tests/docker-images/latest-version-image/scripts/run-standalone.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-standalone.sh
@@ -18,4 +18,7 @@
# under the License.
#
+export PULSAR_MEM="${PULSAR_MEM:-"-Xmx512M -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/pulsar -XX:+ExitOnOutOfMemoryError"}"
+export PULSAR_GC="${PULSAR_GC:-"-XX:+UseZGC"}"
+
bin/pulsar standalone
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/scripts/run-websocket.sh
b/tests/docker-images/latest-version-image/scripts/run-websocket.sh
index 34e4b9016af..cab1abee44c 100755
--- a/tests/docker-images/latest-version-image/scripts/run-websocket.sh
+++ b/tests/docker-images/latest-version-image/scripts/run-websocket.sh
@@ -18,11 +18,6 @@
# under the License.
#
-bin/apply-config-from-env.py conf/websocket.conf && \
- bin/apply-config-from-env.py conf/pulsar_env.sh
+source /pulsar/bin/func-lib.sh
-if [ -z "$NO_AUTOSTART" ]; then
- sed -i 's/autostart=.*/autostart=true/'
/etc/supervisord/conf.d/websocket.conf
-fi
-
-exec /usr/bin/supervisord -c /etc/supervisord.conf
+run_pulsar_component websocket websocket 150M
\ No newline at end of file
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index e10064f5c3d..4b3b34fad58 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -35,6 +35,9 @@
<properties>
<integrationTestSuiteFile>pulsar.xml</integrationTestSuiteFile>
<mongo-reactivestreams.version>4.1.2</mongo-reactivestreams.version>
+
<inttest.asyncprofiler.opts>event=cpu,lock=1ms,alloc=2m,jfrsync=profile</inttest.asyncprofiler.opts>
+
<inttest.asyncprofiler.outputformat>jfr</inttest.asyncprofiler.outputformat>
+ <inttest.asyncprofiler.dir></inttest.asyncprofiler.dir>
</properties>
<dependencies>
@@ -302,10 +305,17 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>${testJacocoAgentArgument} -XX:+ExitOnOutOfMemoryError
-Xmx1G -XX:MaxDirectMemorySize=1G
- -Dconfluent.version=${confluent.version}
-Djacoco.version=${jacoco-maven-plugin.version}
-
-Dintegrationtest.coverage.enabled=${integrationtest.coverage.enabled}
-Dintegrationtest.coverage.dir=${integrationtest.coverage.dir}
${test.additional.args}
</argLine>
+ <systemPropertyVariables>
+ <confluent.version>${confluent.version}</confluent.version>
+ <jacoco.version>${jacoco-maven-plugin.version}</jacoco.version>
+
<integrationtest.coverage.enabled>${integrationtest.coverage.enabled}</integrationtest.coverage.enabled>
+
<integrationtest.coverage.dir>${integrationtest.coverage.dir}</integrationtest.coverage.dir>
+
<inttest.asyncprofiler.opts>${inttest.asyncprofiler.opts}</inttest.asyncprofiler.opts>
+
<inttest.asyncprofiler.outputformat>${inttest.asyncprofiler.outputformat}</inttest.asyncprofiler.outputformat>
+
<inttest.asyncprofiler.dir>${inttest.asyncprofiler.dir}</inttest.asyncprofiler.dir>
+ </systemPropertyVariables>
<skipTests>false</skipTests>
<suiteXmlFiles>
<file>src/test/resources/${integrationTestSuiteFile}</file>
@@ -334,9 +344,31 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ <injectAllReactorProjects>true</injectAllReactorProjects>
+ <skipPoms>false</skipPoms>
+ </configuration>
+ </plugin>
</plugins>
</build>
</profile>
+ <profile>
+ <!--
+ This profile is used in addition to the Integration Tests Async Profiler
support to profile created exceptions
+ Check the
org.apache.pulsar.tests.integration.profiling.PulsarProfilingTest example for
how to profile int tests.
+ Profiling of exceptions requires that perf events have been enabled on
the docker host. PulsarProfilingTest
+ contains instructions.
+ -->
+ <id>inttestAsyncProfilerExceptions</id>
+ <properties>
+
<inttest.asyncprofiler.opts>event=Java_java_lang_Throwable_fillInStackTrace,tree,reverse</inttest.asyncprofiler.opts>
+
<inttest.asyncprofiler.outputformat>html</inttest.asyncprofiler.outputformat>
+ </properties>
+ </profile>
<profile>
<id>apache-release</id>
<build>
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
index 369801d6b64..58658b0a991 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ChaosContainer.java
@@ -51,7 +51,7 @@ public class ChaosContainer<SelfT extends
ChaosContainer<SelfT>> extends Generic
String existingValue = getEnvMap().get(key);
if (existingValue == null) {
addEnv(key, value);
- } else {
+ } else if (!existingValue.contains(value)) {
addEnv(key, existingValue + " " + value);
}
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 104590045ce..4f4a61ca4eb 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -19,13 +19,19 @@
package org.apache.pulsar.tests.integration.containers;
import static java.time.temporal.ChronoUnit.SECONDS;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.github.dockerjava.api.model.Capability;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermissions;
import java.time.Duration;
+import java.util.List;
import java.util.Objects;
import java.util.UUID;
import lombok.Getter;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.tests.ExtendedNettyLeakDetector;
@@ -85,6 +91,8 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
private final int httpPort;
private final int httpsPort;
private final String httpPath;
+ @Setter
+ private boolean enableAsyncProfiler = false;
public PulsarContainer(String clusterName,
String hostname,
@@ -252,6 +260,10 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
configureCodeCoverage();
}
+ if (enableAsyncProfiler) {
+ configureAsyncProfiler();
+ }
+
if (isPassNettyLeakDetectionSystemProperties()) {
passNettyLeakDetectionSystemProperties();
}
@@ -270,8 +282,7 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
if (isPassNettyLeakDetectionSystemProperties()) {
String envKey = "PULSAR_EXTRA_OPTS";
// pass similar defaults as there is in conf/pulsar_env.sh
- appendToEnv("PULSAR_EXTRA_OPTS",
- "-Dpulsar.allocator.exit_on_oom=true
-Dio.netty.recycler.maxCapacityPerThread=4096");
+ initializePulsarExtraOpts();
passSystemPropertyInEnv(envKey,
ExtendedNettyLeakDetector.NETTY_CUSTOM_LEAK_DETECTOR_SYSTEM_PROPERTY_NAME);
if
(ExtendedNettyLeakDetector.isExtendedNettyLeakDetectorEnabled()) {
// enable shutdown hook for extended leak detector in
containers
@@ -288,6 +299,11 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
}
}
+ protected void initializePulsarExtraOpts() {
+ appendToEnv("PULSAR_EXTRA_OPTS",
+ "-Dpulsar.allocator.exit_on_oom=true
-Dio.netty.recycler.maxCapacityPerThread=4096");
+ }
+
protected boolean isCodeCoverageEnabled() {
return Boolean.getBoolean("integrationtest.coverage.enabled");
}
@@ -325,6 +341,66 @@ public abstract class PulsarContainer<SelfT extends
PulsarContainer<SelfT>> exte
}
}
+ protected void configureAsyncProfiler() {
+ // configure privileged container for profiling
+ // in addition to this, it is necessary to separate run
+ // docker run --rm -it --privileged --cap-add SYS_ADMIN --security-opt
seccomp=unconfined \
+ // alpine sh -c "echo 1 > /proc/sys/kernel/perf_event_paranoid \
+ // && echo 0 > /proc/sys/kernel/kptr_restrict \
+ // && echo 1024 > /proc/sys/kernel/perf_event_max_stack \
+ // && echo 2048 > /proc/sys/kernel/perf_event_mlock_kb"
+ // or to run:
+ // echo 1 | sudo tee /proc/sys/kernel/perf_event_paranoid
+ // echo 0 | sudo tee /proc/sys/kernel/kptr_restrict
+ // echo 1024 | sudo tee /proc/sys/kernel/perf_event_max_stack
+ // echo 2048 | sudo tee /proc/sys/kernel/perf_event_mlock_kb
+ withCreateContainerCmdModifier(cmd -> {
+ cmd.getHostConfig()
+ .withCapAdd(Capability.SYS_ADMIN)
+ .withCapAdd(Capability.SYS_PTRACE)
+ .withCapAdd(Capability.PERFMON)
+ .withSecurityOpts(List.of("seccomp=unconfined"))
+ .withPrivileged(true);
+ });
+
+ File asyncProfilerDir;
+ if (isNotBlank(System.getProperty("inttest.asyncprofiler.dir"))) {
+ asyncProfilerDir = new
File(System.getProperty("inttest.asyncprofiler.dir"));
+ } else {
+ asyncProfilerDir = new File("target");
+ }
+ if (!asyncProfilerDir.exists()) {
+ if (!asyncProfilerDir.mkdirs()) {
+ throw new IllegalArgumentException(
+ "inttest.asyncprofiler.dir '" +
asyncProfilerDir.getAbsolutePath()
+ + "' doesn't exist and cannot be created.");
+ }
+ }
+ if (!asyncProfilerDir.isDirectory()) {
+ throw new IllegalArgumentException(
+ "inttest.asyncprofiler.dir '" +
asyncProfilerDir.getAbsolutePath() + "' isn't a directory.");
+ }
+ // change access to asyncProfilerDir to allow all access so the the
container user can write to it
+ // This matters only on Linux
+ try {
+ Files.setPosixFilePermissions(asyncProfilerDir.toPath(),
PosixFilePermissions.fromString("rwxrwxrwx"));
+ } catch (IOException e) {
+ throw new UncheckedIOException("Cannot change access to profiler
directory", e);
+ }
+ withFileSystemBind(asyncProfilerDir.getAbsolutePath(), "/profiles",
BindMode.READ_WRITE);
+
+ // build the async-profiler java agent command line
+ StringBuilder sb = new StringBuilder();
+
sb.append("-agentpath:/opt/async-profiler/lib/libasyncProfiler.so=start,");
+ sb.append(System.getProperty("inttest.asyncprofiler.opts",
"event=cpu,lock=1ms,alloc=2m,jfrsync=profile"));
+
sb.append(",file=/profiles/inttest_profile_").append(System.getProperty("git.commit.id.abbrev",
""));
+ sb.append("_").append(System.getProperty("maven.build.timestamp",
"").replace(' ', '_'));
+ sb.append("_").append(getContainerName());
+
sb.append("_").append("%p.").append(System.getProperty("inttest.asyncprofiler.outputformat",
"jfr"));
+ initializePulsarExtraOpts();
+ appendToEnv("PULSAR_EXTRA_OPTS", "-XX:+UnlockDiagnosticVMOptions
-XX:+DebugNonSafepoints " + sb);
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof PulsarContainer)) {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index d05a1d7c37b..89929644819 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -107,7 +107,6 @@ public class ExtensibleLoadManagerTest extends
TestRetrySupport {
serviceUnitStateTableViewClassName);
brokerEnvs.put("forceDeleteNamespaceAllowed", "true");
brokerEnvs.put("loadBalancerDebugModeEnabled", "true");
- brokerEnvs.put("PULSAR_MEM", "-Xmx512M");
spec.brokerEnvs(brokerEnvs);
pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
new file mode 100644
index 00000000000..1ca4352615e
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/profiling/PulsarProfilingTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.profiling;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.tests.ManualTestUtil;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
+import org.testcontainers.containers.GenericContainer;
+import org.testng.annotations.Test;
+
+/**
+ * Sample test that profiles the broker side with Async Profiler.
+ *
+ * Example usage:
+ * # This has been tested on Mac with Orbstack (https://orbstack.dev/) docker
+ * # compile integration test dependencies
+ * mvn -am -pl tests/integration -DskipTests install
+ * # compile apachepulsar/java-test-image with async profiler (add "clean" to
ensure a clean build with recent changes)
+ * ./build/build_java_test_image.sh -Ddocker.install.asyncprofiler=true
+ * # set environment variables
+ * export PULSAR_TEST_IMAGE_NAME=apachepulsar/java-test-image:latest
+ * export NETTY_LEAK_DETECTION=off
+ * export ENABLE_MANUAL_TEST=true
+ * # enable perf events for profiling and tune it
+ * docker run --rm -it --privileged --cap-add SYS_ADMIN --security-opt
seccomp=unconfined \
+ * alpine sh -c "echo 1 > /proc/sys/kernel/perf_event_paranoid \
+ * && echo 0 > /proc/sys/kernel/kptr_restrict \
+ * && echo 1024 > /proc/sys/kernel/perf_event_max_stack \
+ * && echo 2048 > /proc/sys/kernel/perf_event_mlock_kb"
+ * # translated to sysctl settings (for persistent configuration on Linux
hosts)
+ * kernel.perf_event_paranoid=1
+ * kernel.kptr_restrict=0
+ * kernel.perf_event_max_stack=1024
+ * kernel.perf_event_mlock_kb=2048
+ * # run the test
+ * mvn -DintegrationTests -pl tests/integration -Dtest=PulsarProfilingTest
-DtestRetryCount=0 \
+ * -DredirectTestOutputToFile=false test
+ * By default, the .jfr files will go into tests/integration/target
+ * You can use jfrconv from async profiler to convert them into html
flamegraphs or use other tools such
+ * as Eclipse Mission Control (https://adoptium.net/jmc) or IntelliJ to open
them.
+ */
+@Slf4j
+public class PulsarProfilingTest extends PulsarTestSuite {
+ // this assumes that Transparent Huge Pages are available on the host
machine
+ // Please notice that "madvise" mode is recommended for performance
reasons.
+ // For example:
+ // echo madvise | sudo tee /sys/kernel/mm/transparent_hugepage/enabled
+ // echo madvise | sudo tee /sys/kernel/mm/transparent_hugepage/defrag
+ // More info about -XX:+UseTransparentHugePages at
+ // https://shipilev.net/jvm/anatomy-quarks/2-transparent-huge-pages/
+ private static final String DEFAULT_PULSAR_MEM = "-Xms512m -Xmx1g
-XX:+UseTransparentHugePages -XX:+AlwaysPreTouch";
+ private static final String BROKER_PULSAR_MEM = "-Xms2g -Xmx2g
-XX:+UseTransparentHugePages -XX:+AlwaysPreTouch";
+
+ // A container that runs pulsar-perf, arguments are currently hard-coded
since this is an example
+ static class PulsarPerfContainer extends
GenericContainer<PulsarPerfContainer> {
+ private final String brokerHostname;
+ private final long numberOfMessages = 100_000_000;
+
+ public PulsarPerfContainer(String clusterName,
+ String brokerHostname,
+ String hostname) {
+ super(PulsarContainer.DEFAULT_IMAGE_NAME);
+ this.brokerHostname = brokerHostname;
+ withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(hostname);
+ createContainerCmd.withName(clusterName + "-" + hostname);
+ });
+ withEnv("PULSAR_MEM", DEFAULT_PULSAR_MEM);
+ setCommand("sleep 1000000");
+ }
+
+ public CompletableFuture<Long> consume(String topicName) throws
Exception {
+ return DockerUtils.runCommandAsyncWithLogging(getDockerClient(),
getContainerId(),
+ "/pulsar/bin/pulsar-perf", "consume", topicName,
+ "-u", "pulsar://" + brokerHostname + ":6650",
+ "-st", "Shared",
+ "-m", String.valueOf(numberOfMessages), "-ml", "200M");
+ }
+
+ public CompletableFuture<Long> produce(String topicName) throws
Exception {
+ return DockerUtils.runCommandAsyncWithLogging(getDockerClient(),
getContainerId(),
+ "/pulsar/bin/pulsar-perf", "produce", topicName,
+ "-u", "pulsar://" + brokerHostname + ":6650",
+ "-au", "http://" + brokerHostname + ":8080",
+ "-r", String.valueOf(Integer.MAX_VALUE), // max-rate
+ "-s", "8192", // 8kB message size
+ "-m", String.valueOf(numberOfMessages), "-ml", "200M");
+ }
+ }
+
+ private PulsarPerfContainer perfConsume;
+ private PulsarPerfContainer perfProduce;
+
+ @Override
+ public void setupCluster() throws Exception {
+ ManualTestUtil.skipManualTestIfNotEnabled();
+ super.setupCluster();
+ }
+
+ @Override
+ public void tearDownCluster() throws Exception {
+ if (perfConsume != null) {
+ perfConsume.stop();
+ perfConsume = null;
+ }
+ if (perfProduce != null) {
+ perfProduce.stop();
+ perfProduce = null;
+ }
+ super.tearDownCluster();
+ }
+
+ @Override
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder
beforeSetupCluster(String clusterName,
+ PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
+
+ // Enable profiling on the broker
+ specBuilder.profileBroker(true);
+
+ // Only run one broker so that all load goes to a single broker
+ specBuilder.numBrokers(1);
+ // Have 3 bookies to reduce bottleneck on bookie
+ specBuilder.numBookies(3);
+ // no need for proxy
+ specBuilder.numProxies(0);
+
+ // Increase memory for brokers and configure more aggressive rollover
+ specBuilder.brokerEnvs(Map.of("PULSAR_MEM", BROKER_PULSAR_MEM,
+ "managedLedgerMinLedgerRolloverTimeMinutes", "1",
+ "managedLedgerMaxLedgerRolloverTimeMinutes", "5",
+ "managedLedgerMaxSizePerLedgerMbytes", "512",
+ "managedLedgerDefaultEnsembleSize", "1",
+ "managedLedgerDefaultWriteQuorum", "1",
+ "managedLedgerDefaultAckQuorum", "1"
+ ));
+
+ // Increase memory for bookkeepers and make compaction run more often
+ Map<String, String> bkEnv = new HashMap<>();
+ bkEnv.put("PULSAR_MEM", DEFAULT_PULSAR_MEM);
+ bkEnv.put("dbStorage_writeCacheMaxSizeMb", "64");
+ bkEnv.put("dbStorage_readAheadCacheMaxSizeMb", "96");
+ bkEnv.put("journalMaxSizeMB", "256");
+ bkEnv.put("journalSyncData", "false");
+ bkEnv.put("majorCompactionInterval", "300");
+ bkEnv.put("minorCompactionInterval", "30");
+ bkEnv.put("compactionRateByEntries", "20000");
+ bkEnv.put("gcWaitTime", "30000");
+ bkEnv.put("isForceGCAllowWhenNoSpace", "true");
+ bkEnv.put("diskUsageLwmThreshold", "0.75");
+ bkEnv.put("diskCheckInterval", "60");
+ specBuilder.bookkeeperEnvs(bkEnv);
+
+ // Create pulsar-perf containers
+ String brokerHostname = clusterName + "-pulsar-broker-0";
+ perfProduce = new PulsarPerfContainer(clusterName, brokerHostname,
"perf-produce");
+ perfConsume = new PulsarPerfContainer(clusterName, brokerHostname,
"perf-consume");
+ specBuilder.externalServices(Map.of(
+ "pulsar-produce", perfProduce,
+ "pulsar-consume", perfConsume
+ ));
+
+ return specBuilder;
+ }
+
+ @Test(timeOut = 600_000)
+ public void runPulsarPerf() throws Exception {
+ String topicName = generateTopicName("profiletest", true);
+ CompletableFuture<Long> consumeFuture = perfConsume.consume(topicName);
+ Thread.sleep(1000);
+ CompletableFuture<Long> produceFuture = perfProduce.produce(topicName);
+ FutureUtil.waitForAll(List.of(consumeFuture, produceFuture))
+ .orTimeout(3, TimeUnit.MINUTES)
+ .exceptionally(t -> {
+ if (FutureUtil.unwrapCompletionException(t) instanceof
TimeoutException) {
+ // ignore test timeout
+ log.info("Test timed out, ignoring this in
profiling.");
+ return null;
+ } else {
+ log.error("Failed to run pulsar-perf", t);
+ }
+ throw FutureUtil.wrapToCompletionException(t);
+ })
+ .get();
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 92112960f9c..2ee571dafbb 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -147,6 +147,7 @@ public class PulsarCluster {
.withEnv("pulsarNode",
appendClusterName("pulsar-broker-0"));
metadataStoreUrl = appendClusterName(ZKContainer.NAME);
configurationMetadataStoreUrl = CSContainer.NAME + ":" + CS_PORT;
+ zkContainer.setEnableAsyncProfiler(spec.profileZookeeper);
}
this.csContainer = csContainer;
@@ -161,6 +162,8 @@ public class PulsarCluster {
.withEnv("metadataStoreUrl", metadataStoreUrl)
.withEnv("configurationMetadataStoreUrl",
configurationMetadataStoreUrl)
.withEnv("clusterName", clusterName);
+ proxyContainer.setEnableAsyncProfiler(spec.profileProxy);
+
// enable mTLS
if (spec.enableTls) {
proxyContainer
@@ -217,6 +220,7 @@ public class PulsarCluster {
if (spec.bookieAdditionalPorts != null) {
spec.bookieAdditionalPorts.forEach(bookieContainer::addExposedPort);
}
+ bookieContainer.setEnableAsyncProfiler(spec.profileBookie);
return bookieContainer;
})
);
@@ -265,6 +269,7 @@ public class PulsarCluster {
if (spec.brokerAdditionalPorts() != null) {
spec.brokerAdditionalPorts().forEach(brokerContainer::addExposedPort);
}
+
brokerContainer.setEnableAsyncProfiler(spec.profileBroker);
return brokerContainer;
}
));
@@ -520,13 +525,13 @@ public class PulsarCluster {
private WorkerContainer createWorkerContainer(String name) {
String serviceUrl = "pulsar://pulsar-broker-0:" +
PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" +
PulsarContainer.BROKER_HTTP_PORT;
- return new WorkerContainer(clusterName, name)
+ WorkerContainer workerContainer = new WorkerContainer(clusterName,
name)
.withNetwork(network)
.withNetworkAliases(name)
// worker settings
.withEnv("PF_workerId", name)
.withEnv("PF_workerHostname", name)
- .withEnv("PF_workerPort", "" +
PulsarContainer.BROKER_HTTP_PORT)
+ .withEnv("PF_workerPort", "" + BROKER_HTTP_PORT)
.withEnv("PF_pulsarFunctionsCluster", clusterName)
.withEnv("PF_pulsarServiceUrl", serviceUrl)
.withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
@@ -537,6 +542,8 @@ public class PulsarCluster {
.withEnv("zkServers", ZKContainer.NAME)
.withEnv(functionWorkerEnvs)
.withExposedPorts(functionWorkerAdditionalPorts.toArray(new
Integer[0]));
+ workerContainer.setEnableAsyncProfiler(spec.profileFunctionWorker);
+ return workerContainer;
}
private void startFunctionWorkersWithThreadContainerFactory(String suffix,
int numFunctionWorkers) {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index 6a2259500b6..7e6d272678f 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -199,4 +199,19 @@ public class PulsarClusterSpec {
@Default
boolean enableOxia = false;
+
+ @Default
+ boolean profileBroker = false;
+
+ @Default
+ boolean profileProxy = false;
+
+ @Default
+ boolean profileFunctionWorker = false;
+
+ @Default
+ boolean profileBookie = false;
+
+ @Default
+ boolean profileZookeeper = false;
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
index 262ed5b9676..5b518425df5 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/utils/DockerUtils.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.tests.integration.utils;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.async.ResultCallback;
+import com.github.dockerjava.api.command.ExecCreateCmd;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.command.InspectExecResponse;
import com.github.dockerjava.api.exception.NotFoundException;
@@ -38,6 +39,7 @@ import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -205,10 +207,7 @@ public class DockerUtils {
DockerClient dockerClient,
String containerId,
String... cmd) {
- String execId = dockerClient.execCreateCmd(containerId)
- .withCmd(cmd)
- .withAttachStderr(true)
- .withAttachStdout(true)
+ String execId = createExecCreateCmd(dockerClient, containerId, cmd)
.withUser(userId)
.exec()
.getId();
@@ -218,10 +217,7 @@ public class DockerUtils {
public static CompletableFuture<ContainerExecResult>
runCommandAsync(DockerClient dockerClient,
String containerId,
String... cmd) {
- String execId = dockerClient.execCreateCmd(containerId)
- .withCmd(cmd)
- .withAttachStderr(true)
- .withAttachStdout(true)
+ String execId = createExecCreateCmd(dockerClient, containerId, cmd)
.exec()
.getId();
return runCommandAsync(execId, dockerClient, containerId, cmd);
@@ -294,10 +290,7 @@ public class DockerUtils {
String
containerId,
String...
cmd) throws ContainerExecException {
CompletableFuture<Boolean> future = new CompletableFuture<>();
- String execId = dockerClient.execCreateCmd(containerId)
- .withCmd(cmd)
- .withAttachStderr(true)
- .withAttachStdout(true)
+ String execId = createExecCreateCmd(dockerClient, containerId, cmd)
.exec()
.getId();
final String containerName = getContainerName(dockerClient,
containerId);
@@ -359,10 +352,7 @@ public class DockerUtils {
public static CompletableFuture<Long>
runCommandAsyncWithLogging(DockerClient dockerClient,
String
containerId, String... cmd) {
CompletableFuture<Long> future = new CompletableFuture<>();
- String execId = dockerClient.execCreateCmd(containerId)
- .withCmd(cmd)
- .withAttachStderr(true)
- .withAttachStdout(true)
+ String execId = createExecCreateCmd(dockerClient, containerId, cmd)
.exec()
.getId();
final String containerName = getContainerName(dockerClient,
containerId);
@@ -400,6 +390,15 @@ public class DockerUtils {
return future;
}
+ private static ExecCreateCmd createExecCreateCmd(DockerClient
dockerClient, String containerId, String[] cmd) {
+ return dockerClient.execCreateCmd(containerId)
+ .withCmd(cmd)
+ .withAttachStderr(true)
+ .withAttachStdout(true)
+ // Clear out PULSAR_EXTRA_OPTS and OPTS so that they don't
interfere with the test
+ .withEnv(List.of("PULSAR_EXTRA_OPTS=", "OPTS="));
+ }
+
private static InspectExecResponse waitForExecCmdToFinish(DockerClient
dockerClient, String execId) {
InspectExecResponse resp = dockerClient.inspectExecCmd(execId).exec();
while (resp.isRunning()) {