This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e241b0a9d09 [fix][broker] Fix memory leak when metrics are updated in
a thread other than FastThreadLocalThread (#24719)
e241b0a9d09 is described below
commit e241b0a9d091aab774cb4c2fdbd5d87c8fa8fcc2
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Sep 10 00:12:36 2025 +0800
[fix][broker] Fix memory leak when metrics are updated in a thread other
than FastThreadLocalThread (#24719)
---
.../metrics/DataSketchesOpStatsLogger.java | 87 ++-----------
.../metrics/DataSketchesSummaryLogger.java | 49 +-------
.../prometheus/metrics/ThreadLocalAccessor.java | 135 +++++++++++++++++++++
.../metrics/ThreadLocalAccessorTest.java | 93 ++++++++++++++
4 files changed, 239 insertions(+), 125 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
index 12f54ba48a4..8973ba6a25c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
@@ -19,15 +19,10 @@
package org.apache.pulsar.broker.stats.prometheus.metrics;
import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
import com.yahoo.sketches.quantiles.DoublesUnion;
import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
-import io.netty.util.concurrent.FastThreadLocal;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.StampedLock;
import org.apache.bookkeeper.stats.OpStatsData;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -65,15 +60,7 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
failCountAdder.increment();
failSumAdder.add((long) valueMillis);
-
- LocalData localData = current.localData.get();
-
- long stamp = localData.lock.readLock();
- try {
- localData.failSketch.update(valueMillis);
- } finally {
- localData.lock.unlockRead(stamp);
- }
+ current.getLocalData().updateFail(valueMillis);
}
@Override
@@ -82,45 +69,21 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
successCountAdder.increment();
successSumAdder.add((long) valueMillis);
-
- LocalData localData = current.localData.get();
-
- long stamp = localData.lock.readLock();
- try {
- localData.successSketch.update(valueMillis);
- } finally {
- localData.lock.unlockRead(stamp);
- }
+ current.getLocalData().updateSuccess(valueMillis);
}
@Override
public void registerSuccessfulValue(long value) {
successCountAdder.increment();
successSumAdder.add(value);
-
- LocalData localData = current.localData.get();
-
- long stamp = localData.lock.readLock();
- try {
- localData.successSketch.update(value);
- } finally {
- localData.lock.unlockRead(stamp);
- }
+ current.getLocalData().updateSuccess(value);
}
@Override
public void registerFailedValue(long value) {
failCountAdder.increment();
failSumAdder.add(value);
-
- LocalData localData = current.localData.get();
-
- long stamp = localData.lock.readLock();
- try {
- localData.failSketch.update(value);
- } finally {
- localData.lock.unlockRead(stamp);
- }
+ current.getLocalData().updateFail(value);
}
@Override
@@ -141,21 +104,11 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
current = replacement;
replacement = local;
- final DoublesUnion aggregateSuccesss = new
DoublesUnionBuilder().build();
+ final DoublesUnion aggregateSuccess = new
DoublesUnionBuilder().build();
final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
- local.map.forEach((localData, b) -> {
- long stamp = localData.lock.writeLock();
- try {
- aggregateSuccesss.update(localData.successSketch);
- localData.successSketch.reset();
- aggregateFail.update(localData.failSketch);
- localData.failSketch.reset();
- } finally {
- localData.lock.unlockWrite(stamp);
- }
- });
-
- successResult = aggregateSuccesss.getResultAndReset();
+ local.record(aggregateSuccess, aggregateFail);
+
+ successResult = aggregateSuccess.getResultAndReset();
failResult = aggregateFail.getResultAndReset();
}
@@ -171,28 +124,4 @@ public class DataSketchesOpStatsLogger implements
OpStatsLogger {
DoublesSketch s = success ? successResult : failResult;
return s != null ? s.getQuantile(quantile) : Double.NaN;
}
-
- private static class LocalData {
- private final DoublesSketch successSketch = new
DoublesSketchBuilder().build();
- private final DoublesSketch failSketch = new
DoublesSketchBuilder().build();
- private final StampedLock lock = new StampedLock();
- }
-
- private static class ThreadLocalAccessor {
- private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>();
- private final FastThreadLocal<LocalData> localData = new
FastThreadLocal<LocalData>() {
-
- @Override
- protected LocalData initialValue() throws Exception {
- LocalData localData = new LocalData();
- map.put(localData, Boolean.TRUE);
- return localData;
- }
-
- @Override
- protected void onRemoval(LocalData value) throws Exception {
- map.remove(value);
- }
- };
- }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
index cc144e0eb69..42c189d4bf3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
@@ -19,15 +19,10 @@
package org.apache.pulsar.broker.stats.prometheus.metrics;
import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
import com.yahoo.sketches.quantiles.DoublesUnion;
import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
-import io.netty.util.concurrent.FastThreadLocal;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.StampedLock;
public class DataSketchesSummaryLogger {
@@ -55,14 +50,7 @@ public class DataSketchesSummaryLogger {
countAdder.increment();
sumAdder.add((long) valueMillis);
- LocalData localData = current.localData.get();
-
- long stamp = localData.lock.readLock();
- try {
- localData.successSketch.update(valueMillis);
- } finally {
- localData.lock.unlockRead(stamp);
- }
+ current.getLocalData().updateSuccess(valueMillis);
}
public void rotateLatencyCollection() {
@@ -72,15 +60,7 @@ public class DataSketchesSummaryLogger {
replacement = local;
final DoublesUnion aggregateValues = new DoublesUnionBuilder().build();
- local.map.forEach((localData, b) -> {
- long stamp = localData.lock.writeLock();
- try {
- aggregateValues.update(localData.successSketch);
- localData.successSketch.reset();
- } finally {
- localData.lock.unlockWrite(stamp);
- }
- });
+ local.record(aggregateValues, null);
values = aggregateValues.getResultAndReset();
}
@@ -97,27 +77,4 @@ public class DataSketchesSummaryLogger {
DoublesSketch s = values;
return s != null ? s.getQuantile(quantile) : Double.NaN;
}
-
- private static class LocalData {
- private final DoublesSketch successSketch = new
DoublesSketchBuilder().build();
- private final StampedLock lock = new StampedLock();
- }
-
- private static class ThreadLocalAccessor {
- private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>();
- private final FastThreadLocal<LocalData> localData = new
FastThreadLocal<LocalData>() {
-
- @Override
- protected LocalData initialValue() throws Exception {
- LocalData localData = new LocalData();
- map.put(localData, Boolean.TRUE);
- return localData;
- }
-
- @Override
- protected void onRemoval(LocalData value) throws Exception {
- map.remove(value);
- }
- };
- }
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
new file mode 100644
index 00000000000..6a32ce5b905
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.yahoo.sketches.quantiles.DoublesSketch;
+import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
+import com.yahoo.sketches.quantiles.DoublesUnion;
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+import java.lang.ref.WeakReference;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.StampedLock;
+import org.jspecify.annotations.Nullable;
+
+class ThreadLocalAccessor {
+
+ private final ConcurrentHashMap<LocalData, Boolean> map = new
ConcurrentHashMap<>();
+ private final FastThreadLocal<LocalData> localData = new
FastThreadLocal<>() {
+
+ @Override
+ protected LocalData initialValue() {
+ LocalData localData = new LocalData(Thread.currentThread());
+ map.put(localData, Boolean.TRUE);
+ return localData;
+ }
+
+ @Override
+ protected void onRemoval(LocalData value) {
+ map.remove(value);
+ }
+ };
+
+ void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion
aggregateFail) {
+ map.keySet().forEach(key -> {
+ key.record(aggregateSuccess, aggregateFail);
+ if (key.shouldRemove()) {
+ map.remove(key);
+ }
+ });
+ }
+
+ LocalData getLocalData() {
+ return localData.get();
+ }
+
+ @VisibleForTesting
+ int getLocalDataCount() {
+ return map.keySet().size();
+ }
+
+ static class LocalData {
+
+ private final DoublesSketch successSketch = new
DoublesSketchBuilder().build();
+ private final DoublesSketch failSketch = new
DoublesSketchBuilder().build();
+ private final StampedLock lock = new StampedLock();
+ // Keep a weak reference to the owner thread so that we can remove the
LocalData when the thread
+ // is not alive anymore or has been garbage collected.
+ // This reference isn't needed when the owner thread is a
FastThreadLocalThread and will be null in that case.
+ // The removal is handled by FastThreadLocal#onRemoval when the owner
thread is a FastThreadLocalThread.
+ private final WeakReference<Thread> ownerThreadReference;
+
+ LocalData(Thread ownerThread) {
+ if (ownerThread instanceof FastThreadLocalThread) {
+ ownerThreadReference = null;
+ } else {
+ ownerThreadReference = new WeakReference<>(ownerThread);
+ }
+ }
+
+ private boolean shouldRemove() {
+ if (ownerThreadReference == null) {
+ // the owner is a FastThreadLocalThread which handles the
removal using FastThreadLocal#onRemoval
+ return false;
+ } else {
+ Thread ownerThread = ownerThreadReference.get();
+ if (ownerThread == null) {
+ // the thread has already been garbage collected,
LocalData should be removed
+ return true;
+ } else {
+ // the thread isn't alive anymore, LocalData should be
removed
+ return !ownerThread.isAlive();
+ }
+ }
+ }
+
+ void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion
aggregateFail) {
+ long stamp = lock.writeLock();
+ try {
+ aggregateSuccess.update(successSketch);
+ successSketch.reset();
+ if (aggregateFail != null) {
+ aggregateFail.update(failSketch);
+ failSketch.reset();
+ }
+ } finally {
+ lock.unlockWrite(stamp);
+ }
+ }
+
+ void updateSuccess(double value) {
+ long stamp = lock.readLock();
+ try {
+ successSketch.update(value);
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ }
+
+ void updateFail(double value) {
+ long stamp = lock.readLock();
+ try {
+ failSketch.update(value);
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
new file mode 100644
index 00000000000..94c8337307d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import static org.testng.Assert.assertEquals;
+import com.yahoo.sketches.quantiles.DoublesUnion;
+import io.netty.util.concurrent.FastThreadLocalThread;
+import java.util.concurrent.Phaser;
+import org.jspecify.annotations.Nullable;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ThreadLocalAccessorTest {
+
+ @DataProvider
+ public static Object[][] provider() {
+ return new Object[][] {
+ // 1st element: whether the thread is a FastThreadLocalThread
+ // 2nd element: the 2nd argument passed to the
`ThreadLocalAccessor#record` method
+ { true, DoublesUnion.builder().build() },
+ { true, null },
+ { false, DoublesUnion.builder().build() },
+ { false, null },
+ };
+ }
+
+ @Test(dataProvider = "provider")
+ public void testShouldRemoveLocalDataWhenOwnerThreadIsNotAlive(
+ boolean fastThreadLocalThread, @Nullable DoublesUnion
aggregateFail) throws Exception {
+ // given a ThreadLocalAccessor instance
+ final var threadLocalAccessor = new ThreadLocalAccessor();
+ DoublesUnion aggregateSuccess = DoublesUnion.builder().build();
+ // using phaser to synchronize threads
+ Phaser phaser = new Phaser(2);
+ Thread thread = getThread(fastThreadLocalThread, () -> {
+ // Create a new LocalData instance for the current thread.
+ threadLocalAccessor.getLocalData();
+ // sync point #1, wait and advance at the same time
+ phaser.arriveAndAwaitAdvance();
+ // sync point #2, wait and advance at the same time
+ phaser.arriveAndAwaitAdvance();
+ });
+ // sync point #1, wait and advance at the same time
+ phaser.arriveAndAwaitAdvance();
+ // and record is called
+ threadLocalAccessor.record(aggregateSuccess, aggregateFail);
+ // then LocalData should exist
+ assertEquals(threadLocalAccessor.getLocalDataCount(), 1);
+
+ // when thread is not alive anymore
+ // sync point #2, wait and advance at the same time
+ phaser.arriveAndAwaitAdvance();
+ // wait for thread to finish
+ thread.join();
+ // and record is called
+ threadLocalAccessor.record(aggregateSuccess, aggregateFail);
+ // then LocalData should be removed
+ assertEquals(threadLocalAccessor.getLocalDataCount(), 0);
+ }
+
+ @Test(dataProvider = "provider")
+ public void testThreadGc(boolean fastThreadLocalThread, @Nullable
DoublesUnion aggregateFail) throws Exception {
+ final var accessor = new ThreadLocalAccessor();
+ getThread(fastThreadLocalThread, accessor::getLocalData).join();
+ System.gc();
+ // FastThreadLocalThread removes the LocalData from the map when the
thread finishes
+ assertEquals(accessor.getLocalDataCount(), fastThreadLocalThread ? 0 :
1);
+ accessor.record(DoublesUnion.builder().build(), aggregateFail);
+ assertEquals(accessor.getLocalDataCount(), 0);
+ }
+
+ private static Thread getThread(boolean fastThreadLocalThread, Runnable
runnable) {
+ final var thread = fastThreadLocalThread ? new
FastThreadLocalThread(runnable) : new Thread(runnable);
+ thread.start(); // when LocalData is created
+ return thread;
+ }
+}