vvcephei commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r477424578
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -396,34 +398,65 @@ private String cacheSensorPrefix(final String threadId, final String taskId, fin + SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + SENSOR_PREFIX_DELIMITER + cacheName; } - public final Sensor storeLevelSensor(final String threadId, - final String taskId, + public final Sensor storeLevelSensor(final String taskId, final String storeName, final String sensorName, - final Sensor.RecordingLevel recordingLevel, + final RecordingLevel recordingLevel, final Sensor... parents) { - final String key = storeSensorPrefix(threadId, taskId, storeName); - synchronized (storeLevelSensors) { - final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.getSensor(fullSensorName); - if (sensor == null) { + final String key = storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName); + final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; + return Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { storeLevelSensors.computeIfAbsent(key, ignored -> new LinkedList<>()).push(fullSensorName); return metrics.sensor(fullSensorName, recordingLevel, parents); - } else { - return sensor; - } + }); Review comment: I'm still mildly concerned about walking back the synchronization here, but I can't think of a realistic scenario in which we'd get a concurrency bug. Then again, the whole point of defaulting to less granular concurrency controls is that it's hard to imagine all the possible scenarios. In this case, it really doesn't seem like there's a good reason to go for super granular concurrency control. Did we spend a lot of time blocked registering sensors before? Actually, one condition comes to mind: LinkedList is not threadsafe, and accessing the ConcurrentHashMap value is only either a CAS or volatile read, so it doesn't create a memory barrier as `synchronized` does. Therefore, different threads will only be looking at their own locally cached list for each value in the map, although they'll all agree on the set of keys in the map. If you want to push the current implementation style, then you should use a ConcurrentLinkedDeque instead of LinkedList, but I'd really prefer to see the `synchronized` blocks come back unless/until there's a compelling performance reason to drop them. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org