vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r477424578



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -396,34 +398,65 @@ private String cacheSensorPrefix(final String threadId, 
final String taskId, fin
             + SENSOR_PREFIX_DELIMITER + SENSOR_CACHE_LABEL + 
SENSOR_PREFIX_DELIMITER + cacheName;
     }
 
-    public final Sensor storeLevelSensor(final String threadId,
-                                         final String taskId,
+    public final Sensor storeLevelSensor(final String taskId,
                                          final String storeName,
                                          final String sensorName,
-                                         final Sensor.RecordingLevel 
recordingLevel,
+                                         final RecordingLevel recordingLevel,
                                          final Sensor... parents) {
-        final String key = storeSensorPrefix(threadId, taskId, storeName);
-        synchronized (storeLevelSensors) {
-            final String fullSensorName = key + SENSOR_NAME_DELIMITER + 
sensorName;
-            final Sensor sensor = metrics.getSensor(fullSensorName);
-            if (sensor == null) {
+        final String key = storeSensorPrefix(Thread.currentThread().getName(), 
taskId, storeName);
+        final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName;
+        return Optional.ofNullable(metrics.getSensor(fullSensorName))
+            .orElseGet(() -> {
                 storeLevelSensors.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(fullSensorName);
                 return metrics.sensor(fullSensorName, recordingLevel, parents);
-            } else {
-                return sensor;
-            }
+            });

Review comment:
       I'm still mildly concerned about walking back the synchronization here, 
but I can't think of a realistic scenario in which we'd get a concurrency bug. 
Then again, the whole point of defaulting to less granular concurrency controls 
is that it's hard to imagine all the possible scenarios.
   
   In this case, it really doesn't seem like there's a good reason to go for 
super granular concurrency control. Did we spend a lot of time blocked 
registering sensors before?
   
   Actually, one condition comes to mind: LinkedList is not threadsafe, and 
accessing the ConcurrentHashMap value is only either a CAS or volatile read, so 
it doesn't create a memory barrier as `synchronized` does. Therefore, different 
threads will only be looking at their own locally cached list for each value in 
the map, although they'll all agree on the set of keys in the map.
   
   If you want to push the current implementation style, then you should use a 
ConcurrentLinkedDeque instead of LinkedList, but I'd really prefer to see the 
`synchronized` blocks come back unless/until there's a compelling performance 
reason to drop them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to