Eric Wu created KAFKA-14119: ------------------------------- Summary: Sensor in metrics has potential thread safety issues Key: KAFKA-14119 URL: https://issues.apache.org/jira/browse/KAFKA-14119 Project: Kafka Issue Type: Bug Components: metrics Reporter: Eric Wu
There are potential issues of a `Sensor` not being protected from race conditions when it [records|https://github.com/apache/kafka/blob/6ac58ac6fcd53a512ea0bc0b3dc66f49870ff0cb/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java#L230] a value. It can be reproduced with a unit test, e.g., in `SensorTest`: {code:java} @Test public void testSensorRecordThreadSafety() { Time time = new MockTime(0, System.currentTimeMillis(), 0); Metrics metrics = new Metrics(time); Sensor sensor = metrics.sensor("sensor"); MetricName metric = new MetricName("test", "test", "test", Collections.emptyMap()); sensor.add(metric, new Value()); int totalRequests = 10; AtomicInteger count = new AtomicInteger(); Executor threadPool = Executors.newFixedThreadPool(totalRequests); CompletableFuture[] futures = new CompletableFuture[totalRequests]; for (int i = 0; i < totalRequests; i++) { futures[i] = CompletableFuture.runAsync(() -> { try { Thread.sleep(10); // to make it easier to repro } catch (InterruptedException e) { throw new RuntimeException(e); } sensor.record(count.addAndGet(1)); }, threadPool); } CompletableFuture.allOf(futures).join(); assertEquals(1, sensor.metrics().size()); double value = (double) sensor.metrics().get(0).metricValue(); assertEquals(totalRequests, value); }{code} It needs some tweaks to make the fields visible in the test. Given a few runs, the test should fail which demonstrates the thread safety issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)