[ https://issues.apache.org/jira/browse/KAFKA-7136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535187#comment-16535187 ]
ASF GitHub Bot commented on KAFKA-7136: --------------------------------------- guozhangwang closed pull request #5341: KAFKA-7136: Avoid deadlocks in synchronized metrics reporters URL: https://github.com/apache/kafka/pull/5341 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 5bf69b6b65f..e80d5bf24c1 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -73,7 +73,7 @@ files="RequestResponseTest.java"/> <suppress checks="NPathComplexity" - files="MemoryRecordsTest.java"/> + files="MemoryRecordsTest|MetricsTest"/> <!-- Connect --> <suppress checks="ClassFanOutComplexity" diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index e4bf1aeee69..ccbe8aad9cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -48,6 +48,7 @@ private final Time time; private volatile long lastRecordTime; private final long inactiveSensorExpirationTimeMs; + private final Object metricLock; public enum RecordingLevel { INFO(0, "INFO"), DEBUG(1, "DEBUG"); @@ -113,6 +114,7 @@ public boolean shouldRecord(final int configId) { this.inactiveSensorExpirationTimeMs = TimeUnit.MILLISECONDS.convert(inactiveSensorExpirationTimeSeconds, TimeUnit.SECONDS); this.lastRecordTime = time.milliseconds(); this.recordingLevel = recordingLevel; + this.metricLock = new Object(); checkForest(new HashSet<Sensor>()); } @@ -174,9 +176,11 @@ public void record(double value, long timeMs, boolean checkQuotas) { if (shouldRecord()) { this.lastRecordTime = timeMs; synchronized (this) { - // increment all the stats - for (Stat stat : this.stats) - stat.record(config, value, timeMs); + synchronized (metricLock()) { + // increment all the stats + for (Stat stat : this.stats) + stat.record(config, value, timeMs); + } if (checkQuotas) checkQuotas(timeMs); } @@ -229,7 +233,7 @@ public synchronized boolean add(CompoundStat stat, MetricConfig config) { return false; this.stats.add(Utils.notNull(stat)); - Object lock = metricLock(stat); + Object lock = metricLock(); for (NamedMeasurable m : stat.stats()) { final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time); if (!metrics.containsKey(metric.metricName())) { @@ -265,7 +269,7 @@ public synchronized boolean add(final MetricName metricName, final MeasurableSta return true; } else { final KafkaMetric metric = new KafkaMetric( - metricLock(stat), + metricLock(), Utils.notNull(metricName), Utils.notNull(stat), config == null ? this.config : config, @@ -291,10 +295,26 @@ public boolean hasExpired() { } /** - * KafkaMetrics of sensors which use SampledStat should be synchronized on the Sensor object - * to allow concurrent reads and updates. For simplicity, all sensors are synchronized on Sensor. + * KafkaMetrics of sensors which use SampledStat should be synchronized on the same lock + * for sensor record and metric value read to allow concurrent reads and updates. For simplicity, + * all sensors are synchronized on this object. + * <p> + * Sensor object is not used as a lock for reading metric value since metrics reporter is + * invoked while holding Sensor and Metrics locks to report addition and removal of metrics + * and synchronized reporters may deadlock if Sensor lock is used for reading metrics values. + * Note that Sensor object itself is used as a lock to protect the access to stats and metrics + * while recording metric values, adding and deleting sensors. + * </p><p> + * Locking order (assume all MetricsReporter methods may be synchronized): + * <ul> + * <li>Sensor#add: Sensor -> Metrics -> MetricsReporter</li> + * <li>Metrics#removeSensor: Sensor -> Metrics -> MetricsReporter</li> + * <li>KafkaMetric#metricValue: MetricsReporter -> Sensor#metricLock</li> + * <li>Sensor#record: Sensor -> Sensor#metricLock</li> + * </ul> + * </p> */ - private Object metricLock(Stat stat) { - return this; + private Object metricLock() { + return metricLock; } } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 6acc39d35a6..59bc84e40de 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -26,13 +26,16 @@ import java.util.Arrays; import java.util.Collections; import java.util.Deque; +import java.util.List; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.Metric; @@ -54,9 +57,12 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @SuppressWarnings("deprecation") public class MetricsTest { + private static final Logger log = LoggerFactory.getLogger(MetricsTest.class); private static final double EPS = 0.000001; private MockTime time = new MockTime(); @@ -604,8 +610,12 @@ public void testMetricInstances() { } } + /** + * Verifies that concurrent sensor add, remove, updates and read don't result + * in errors or deadlock. + */ @Test - public void testConcurrentAccess() throws Exception { + public void testConcurrentReadUpdate() throws Exception { final Random random = new Random(); final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>(); metrics = new Metrics(new MockTime(10)); @@ -613,16 +623,8 @@ public void testConcurrentAccess() throws Exception { final AtomicBoolean alive = new AtomicBoolean(true); executorService = Executors.newSingleThreadExecutor(); - executorService.submit(new Runnable() { - @Override - public void run() { - while (alive.get()) { - for (Sensor sensor : sensors) { - sensor.record(random.nextInt(10000)); - } - } - } - }); + executorService.submit(new ConcurrentMetricOperation(alive, "record", + () -> sensors.forEach(sensor -> sensor.record(random.nextInt(10000))))); for (int i = 0; i < 10000; i++) { if (sensors.size() > 5) { @@ -640,6 +642,97 @@ public void run() { alive.set(false); } + /** + * Verifies that concurrent sensor add, remove, updates and read with a metrics reporter + * that synchronizes on every reporter method doesn't result in errors or deadlock. + */ + @Test + public void testConcurrentReadUpdateReport() throws Exception { + + class LockingReporter implements MetricsReporter { + Map<MetricName, KafkaMetric> activeMetrics = new HashMap<>(); + @Override + public synchronized void init(List<KafkaMetric> metrics) { + } + + @Override + public synchronized void metricChange(KafkaMetric metric) { + activeMetrics.put(metric.metricName(), metric); + } + + @Override + public synchronized void metricRemoval(KafkaMetric metric) { + activeMetrics.remove(metric.metricName(), metric); + } + + @Override + public synchronized void close() { + } + + @Override + public void configure(Map<String, ?> configs) { + } + + synchronized void processMetrics() { + for (KafkaMetric metric : activeMetrics.values()) { + assertNotNull("Invalid metric value", metric.metricValue()); + } + } + } + + final LockingReporter reporter = new LockingReporter(); + this.metrics.close(); + this.metrics = new Metrics(config, Arrays.asList((MetricsReporter) reporter), new MockTime(10), true); + final Deque<Sensor> sensors = new ConcurrentLinkedDeque<>(); + SensorCreator sensorCreator = new SensorCreator(metrics); + + final Random random = new Random(); + final AtomicBoolean alive = new AtomicBoolean(true); + executorService = Executors.newFixedThreadPool(3); + + Future<?> writeFuture = executorService.submit(new ConcurrentMetricOperation(alive, "record", + () -> sensors.forEach(sensor -> sensor.record(random.nextInt(10000))))); + Future<?> readFuture = executorService.submit(new ConcurrentMetricOperation(alive, "read", + () -> sensors.forEach(sensor -> sensor.metrics().forEach(metric -> + assertNotNull("Invalid metric value", metric.metricValue()))))); + Future<?> reportFuture = executorService.submit(new ConcurrentMetricOperation(alive, "report", + () -> reporter.processMetrics())); + + for (int i = 0; i < 10000; i++) { + if (sensors.size() > 10) { + Sensor sensor = random.nextBoolean() ? sensors.removeFirst() : sensors.removeLast(); + metrics.removeSensor(sensor.name()); + } + StatType statType = StatType.forId(random.nextInt(StatType.values().length)); + sensors.add(sensorCreator.createSensor(statType, i)); + } + assertFalse("Read failed", readFuture.isDone()); + assertFalse("Write failed", writeFuture.isDone()); + assertFalse("Report failed", reportFuture.isDone()); + + alive.set(false); + } + + private class ConcurrentMetricOperation implements Runnable { + private final AtomicBoolean alive; + private final String opName; + private final Runnable op; + ConcurrentMetricOperation(AtomicBoolean alive, String opName, Runnable op) { + this.alive = alive; + this.opName = opName; + this.op = op; + } + public void run() { + try { + while (alive.get()) { + op.run(); + } + } catch (Throwable t) { + log.error("Metric {} failed with exception", opName, t); + } + } + } + enum StatType { AVG(0), TOTAL(1), @@ -676,7 +769,7 @@ static StatType forId(int id) { } private Sensor createSensor(StatType statType, int index) { - Sensor sensor = metrics.sensor("kafka.requests"); + Sensor sensor = metrics.sensor("kafka.requests." + index); Map<String, String> tags = Collections.singletonMap("tag", "tag" + index); switch (statType) { case AVG: ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PushHttpMetricsReporter may deadlock when processing metrics changes > -------------------------------------------------------------------- > > Key: KAFKA-7136 > URL: https://issues.apache.org/jira/browse/KAFKA-7136 > Project: Kafka > Issue Type: Bug > Components: metrics > Affects Versions: 1.1.0, 2.0.0 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Blocker > Fix For: 2.0.0 > > > We noticed a deadlock in {{PushHttpMetricsReporter}}. Locking for metrics was > changed under KAFKA-6765 to avoid {{NullPointerException}} in metrics > reporters due to concurrent read and updates. {{PushHttpMetricsReporter}} > requires a lock to process metrics registration that is invoked while holding > the sensor lock. It also reads metrics attempting to acquire sensor lock > while holding its lock (inverse order). This resulted in the deadlock below. > {quote}Found one Java-level deadlock: > Java stack information for the threads listed above: > =================================================== > "StreamThread-7": > at > org.apache.kafka.tools.PushHttpMetricsReporter.metricChange(PushHttpMetricsReporter.java:144) > - waiting to lock <0x0000000655a54310> (a java.lang.Object) > at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:563) > - locked <0x0000000655a44a28> (a org.apache.kafka.common.metrics.Metrics) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:236) > - locked <0x000000065629c170> (a org.apache.kafka.common.metrics.Sensor) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:217) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:1016) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:462) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) > at > org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:274) > at > org.apache.kafka.clients.consumer.internals.Fetcher.getAllTopicMetadata(Fetcher.java:254) > at > org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1820) > at > org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1798) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.refreshChangelogInfo(StoreChangelogReader.java:224) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:121) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:74) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:824) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) > "pool-17-thread-1": > at > org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:82) > - waiting to lock <0x000000065629c170> (a > org.apache.kafka.common.metrics.Sensor) > at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:58) > at > org.apache.kafka.tools.PushHttpMetricsReporter$HttpReporter.run(PushHttpMetricsReporter.java:177) > - locked <0x0000000655a54310> (a java.lang.Object) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Found 1 deadlock. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)