[ 
https://issues.apache.org/jira/browse/KAFKA-7136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535187#comment-16535187
 ] 

ASF GitHub Bot commented on KAFKA-7136:
---------------------------------------

guozhangwang closed pull request #5341: KAFKA-7136: Avoid deadlocks in 
synchronized metrics reporters
URL: https://github.com/apache/kafka/pull/5341
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 5bf69b6b65f..e80d5bf24c1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -73,7 +73,7 @@
               files="RequestResponseTest.java"/>
 
     <suppress checks="NPathComplexity"
-              files="MemoryRecordsTest.java"/>
+              files="MemoryRecordsTest|MetricsTest"/>
 
     <!-- Connect -->
     <suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index e4bf1aeee69..ccbe8aad9cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -48,6 +48,7 @@
     private final Time time;
     private volatile long lastRecordTime;
     private final long inactiveSensorExpirationTimeMs;
+    private final Object metricLock;
 
     public enum RecordingLevel {
         INFO(0, "INFO"), DEBUG(1, "DEBUG");
@@ -113,6 +114,7 @@ public boolean shouldRecord(final int configId) {
         this.inactiveSensorExpirationTimeMs = 
TimeUnit.MILLISECONDS.convert(inactiveSensorExpirationTimeSeconds, 
TimeUnit.SECONDS);
         this.lastRecordTime = time.milliseconds();
         this.recordingLevel = recordingLevel;
+        this.metricLock = new Object();
         checkForest(new HashSet<Sensor>());
     }
 
@@ -174,9 +176,11 @@ public void record(double value, long timeMs, boolean 
checkQuotas) {
         if (shouldRecord()) {
             this.lastRecordTime = timeMs;
             synchronized (this) {
-                // increment all the stats
-                for (Stat stat : this.stats)
-                    stat.record(config, value, timeMs);
+                synchronized (metricLock()) {
+                    // increment all the stats
+                    for (Stat stat : this.stats)
+                        stat.record(config, value, timeMs);
+                }
                 if (checkQuotas)
                     checkQuotas(timeMs);
             }
@@ -229,7 +233,7 @@ public synchronized boolean add(CompoundStat stat, 
MetricConfig config) {
             return false;
 
         this.stats.add(Utils.notNull(stat));
-        Object lock = metricLock(stat);
+        Object lock = metricLock();
         for (NamedMeasurable m : stat.stats()) {
             final KafkaMetric metric = new KafkaMetric(lock, m.name(), 
m.stat(), config == null ? this.config : config, time);
             if (!metrics.containsKey(metric.metricName())) {
@@ -265,7 +269,7 @@ public synchronized boolean add(final MetricName 
metricName, final MeasurableSta
             return true;
         } else {
             final KafkaMetric metric = new KafkaMetric(
-                metricLock(stat),
+                metricLock(),
                 Utils.notNull(metricName),
                 Utils.notNull(stat),
                 config == null ? this.config : config,
@@ -291,10 +295,26 @@ public boolean hasExpired() {
     }
 
     /**
-     * KafkaMetrics of sensors which use SampledStat should be synchronized on 
the Sensor object
-     * to allow concurrent reads and updates. For simplicity, all sensors are 
synchronized on Sensor.
+     * KafkaMetrics of sensors which use SampledStat should be synchronized on 
the same lock
+     * for sensor record and metric value read to allow concurrent reads and 
updates. For simplicity,
+     * all sensors are synchronized on this object.
+     * <p>
+     * Sensor object is not used as a lock for reading metric value since 
metrics reporter is
+     * invoked while holding Sensor and Metrics locks to report addition and 
removal of metrics
+     * and synchronized reporters may deadlock if Sensor lock is used for 
reading metrics values.
+     * Note that Sensor object itself is used as a lock to protect the access 
to stats and metrics
+     * while recording metric values, adding and deleting sensors.
+     * </p><p>
+     * Locking order (assume all MetricsReporter methods may be synchronized):
+     * <ul>
+     *   <li>Sensor#add: Sensor -> Metrics -> MetricsReporter</li>
+     *   <li>Metrics#removeSensor: Sensor -> Metrics -> MetricsReporter</li>
+     *   <li>KafkaMetric#metricValue: MetricsReporter -> Sensor#metricLock</li>
+     *   <li>Sensor#record: Sensor -> Sensor#metricLock</li>
+     * </ul>
+     * </p>
      */
-    private Object metricLock(Stat stat) {
-        return this;
+    private Object metricLock() {
+        return metricLock;
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 6acc39d35a6..59bc84e40de 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -26,13 +26,16 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.List;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.kafka.common.Metric;
@@ -54,9 +57,12 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("deprecation")
 public class MetricsTest {
+    private static final Logger log = 
LoggerFactory.getLogger(MetricsTest.class);
 
     private static final double EPS = 0.000001;
     private MockTime time = new MockTime();
@@ -604,8 +610,12 @@ public void testMetricInstances() {
         }
     }
 
+    /**
+     * Verifies that concurrent sensor add, remove, updates and read don't 
result
+     * in errors or deadlock.
+     */
     @Test
-    public void testConcurrentAccess() throws Exception {
+    public void testConcurrentReadUpdate() throws Exception {
         final Random random = new Random();
         final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
         metrics = new Metrics(new MockTime(10));
@@ -613,16 +623,8 @@ public void testConcurrentAccess() throws Exception {
 
         final AtomicBoolean alive = new AtomicBoolean(true);
         executorService = Executors.newSingleThreadExecutor();
-        executorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                while (alive.get()) {
-                    for (Sensor sensor : sensors) {
-                        sensor.record(random.nextInt(10000));
-                    }
-                }
-            }
-        });
+        executorService.submit(new ConcurrentMetricOperation(alive, "record",
+            () -> sensors.forEach(sensor -> 
sensor.record(random.nextInt(10000)))));
 
         for (int i = 0; i < 10000; i++) {
             if (sensors.size() > 5) {
@@ -640,6 +642,97 @@ public void run() {
         alive.set(false);
     }
 
+    /**
+     * Verifies that concurrent sensor add, remove, updates and read with a 
metrics reporter
+     * that synchronizes on every reporter method doesn't result in errors or 
deadlock.
+     */
+    @Test
+    public void testConcurrentReadUpdateReport() throws Exception {
+
+        class LockingReporter implements MetricsReporter {
+            Map<MetricName, KafkaMetric> activeMetrics = new HashMap<>();
+            @Override
+            public synchronized void init(List<KafkaMetric> metrics) {
+            }
+
+            @Override
+            public synchronized void metricChange(KafkaMetric metric) {
+                activeMetrics.put(metric.metricName(), metric);
+            }
+
+            @Override
+            public synchronized void metricRemoval(KafkaMetric metric) {
+                activeMetrics.remove(metric.metricName(), metric);
+            }
+
+            @Override
+            public synchronized void close() {
+            }
+
+            @Override
+            public void configure(Map<String, ?> configs) {
+            }
+
+            synchronized void processMetrics() {
+                for (KafkaMetric metric : activeMetrics.values()) {
+                    assertNotNull("Invalid metric value", 
metric.metricValue());
+                }
+            }
+        }
+
+        final LockingReporter reporter = new LockingReporter();
+        this.metrics.close();
+        this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) 
reporter), new MockTime(10), true);
+        final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>();
+        SensorCreator sensorCreator = new SensorCreator(metrics);
+
+        final Random random = new Random();
+        final AtomicBoolean alive = new AtomicBoolean(true);
+        executorService = Executors.newFixedThreadPool(3);
+
+        Future<?> writeFuture = executorService.submit(new 
ConcurrentMetricOperation(alive, "record",
+            () -> sensors.forEach(sensor -> 
sensor.record(random.nextInt(10000)))));
+        Future<?> readFuture = executorService.submit(new 
ConcurrentMetricOperation(alive, "read",
+            () -> sensors.forEach(sensor -> sensor.metrics().forEach(metric ->
+                assertNotNull("Invalid metric value", 
metric.metricValue())))));
+        Future<?> reportFuture = executorService.submit(new 
ConcurrentMetricOperation(alive, "report",
+            () -> reporter.processMetrics()));
+
+        for (int i = 0; i < 10000; i++) {
+            if (sensors.size() > 10) {
+                Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : 
sensors.removeLast();
+                metrics.removeSensor(sensor.name());
+            }
+            StatType statType = 
StatType.forId(random.nextInt(StatType.values().length));
+            sensors.add(sensorCreator.createSensor(statType, i));
+        }
+        assertFalse("Read failed", readFuture.isDone());
+        assertFalse("Write failed", writeFuture.isDone());
+        assertFalse("Report failed", reportFuture.isDone());
+
+        alive.set(false);
+    }
+
+    private class ConcurrentMetricOperation implements Runnable {
+        private final AtomicBoolean alive;
+        private final String opName;
+        private final Runnable op;
+        ConcurrentMetricOperation(AtomicBoolean alive, String opName, Runnable 
op) {
+            this.alive = alive;
+            this.opName = opName;
+            this.op = op;
+        }
+        public void run() {
+            try {
+                while (alive.get()) {
+                    op.run();
+                }
+            } catch (Throwable t) {
+                log.error("Metric {} failed with exception", opName, t);
+            }
+        }
+    }
+
     enum StatType {
         AVG(0),
         TOTAL(1),
@@ -676,7 +769,7 @@ static StatType forId(int id) {
         }
 
         private Sensor createSensor(StatType statType, int index) {
-            Sensor sensor = metrics.sensor("kafka.requests");
+            Sensor sensor = metrics.sensor("kafka.requests." + index);
             Map<String, String> tags = Collections.singletonMap("tag", "tag" + 
index);
             switch (statType) {
                 case AVG:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> PushHttpMetricsReporter may deadlock when processing metrics changes
> --------------------------------------------------------------------
>
>                 Key: KAFKA-7136
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7136
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics
>    Affects Versions: 1.1.0, 2.0.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> We noticed a deadlock in {{PushHttpMetricsReporter}}. Locking for metrics was 
> changed under KAFKA-6765 to avoid {{NullPointerException}} in metrics 
> reporters due to concurrent read and updates. {{PushHttpMetricsReporter}} 
> requires a lock to process metrics registration that is invoked while holding 
> the sensor lock. It also reads metrics attempting to acquire sensor lock 
> while holding its lock (inverse order). This resulted in the deadlock below.
> {quote}Found one Java-level deadlock:
>  Java stack information for the threads listed above:
>  ===================================================
>  "StreamThread-7":
>  at 
> org.apache.kafka.tools.PushHttpMetricsReporter.metricChange(PushHttpMetricsReporter.java:144)
>  - waiting to lock <0x0000000655a54310> (a java.lang.Object)
>  at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:563)
>  - locked <0x0000000655a44a28> (a org.apache.kafka.common.metrics.Metrics)
>  at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:236)
>  - locked <0x000000065629c170> (a org.apache.kafka.common.metrics.Sensor)
>  at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:217)
>  at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:1016)
>  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:462)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:274)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher.getAllTopicMetadata(Fetcher.java:254)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1820)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1798)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.refreshChangelogInfo(StoreChangelogReader.java:224)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:121)
>  at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:74)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:824)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
>  "pool-17-thread-1":
>  at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:82)
>  - waiting to lock <0x000000065629c170> (a 
> org.apache.kafka.common.metrics.Sensor)
>  at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:58)
>  at 
> org.apache.kafka.tools.PushHttpMetricsReporter$HttpReporter.run(PushHttpMetricsReporter.java:177)
>  - locked <0x0000000655a54310> (a java.lang.Object)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Found 1 deadlock.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to